# Chapter 5: Integration and Workflows

This chapter focuses on integrating various AWS services to create end-to-end machine learning workflows. We'll cover a complete customer churn prediction example, the use of AWS Step Functions for ML workflows, and best practices for integrating multiple AWS services.

## Prerequisites

- An AWS account with appropriate permissions
- AWS CLI configured with your credentials
- Python 3.7 or later
- Required Python packages: boto3, pandas, numpy, sagemaker, awswrangler

Install required packages:

In [None]:
%%bash
pip install boto3 pandas numpy sagemaker awswrangler

## 1. End-to-End ML Workflow: Customer Churn Prediction

We'll create a complete workflow for predicting customer churn, integrating several AWS services.

### 1.1 Data Preparation with AWS Glue DataBrew

First, we'll use AWS Glue DataBrew to clean and prepare our data.

In [None]:
import boto3
import awswrangler as wr
import pandas as pd

# Create a sample dataset
data = {
    'customer_id': range(1, 1001),
    'age': np.random.randint(18, 80, 1000),
    'tenure': np.random.randint(0, 10, 1000),
    'balance': np.random.uniform(0, 250000, 1000),
    'num_products': np.random.randint(1, 5, 1000),
    'is_active': np.random.choice([0, 1], 1000),
    'churn': np.random.choice([0, 1], 1000, p=[0.8, 0.2])  # 20% churn rate
}
df = pd.DataFrame(data)

# Upload the dataset to S3
bucket_name = 'your-bucket-name'  # Replace with your S3 bucket name
wr.s3.to_csv(df, f's3://{bucket_name}/raw_data/customer_data.csv', index=False)

# Set up Glue DataBrew client
databrew = boto3.client('databrew')

# Create DataBrew dataset
response = databrew.create_dataset(
    Name='customer_churn_dataset',
    Format='CSV',
    FormatOptions={
        'Csv': {
            'Delimiter': ',',
            'HeaderRow': True
        }
    },
    Input={
        'S3InputDefinition': {
            'Bucket': bucket_name,
            'Key': 'raw_data/customer_data.csv'
        }
    }
)

# Create DataBrew project
response = databrew.create_project(
    Name='customer_churn_project',
    DatasetName='customer_churn_dataset',
    RecipeName='customer_churn_recipe'
)

# Define and create recipe
recipe_steps = [
    {
        'Action': {
            'Operation': 'CAST_COLUMN_TYPE',
            'Parameters': {
                'columnName': 'churn',
                'newType': 'INTEGER'
            }
        }
    },
    {
        'Action': {
            'Operation': 'NORMALIZE_COLUMN',
            'Parameters': {
                'columnName': 'balance',
                'normalizeType': 'MIN_MAX'
            }
        }
    }
]

response = databrew.create_recipe(
    Name='customer_churn_recipe',
    Steps=recipe_steps
)

# Create and start job
response = databrew.create_job(
    Name='customer_churn_job',
    ProjectName='customer_churn_project',
    RoleArn='your-databrew-role-arn',  # Replace with your DataBrew role ARN
    OutputLocation={
        'Bucket': bucket_name,
        'Key': 'prepared_data/'
    }
)

response = databrew.start_job_run(Name='customer_churn_job')

print("DataBrew job started. Check AWS console for progress.")

### 1.2 Feature Management with SageMaker Feature Store

Next, we'll use SageMaker Feature Store to manage our features.

In [None]:
import boto3
import sagemaker
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.feature_store.feature_definition import FeatureDefinition, FeatureTypeEnum

# Set up SageMaker session
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
feature_store_session = sagemaker_session

# Load prepared data
prepared_data = wr.s3.read_csv(f's3://{bucket_name}/prepared_data/')

# Define features
feature_definitions = [
    FeatureDefinition(feature_name="customer_id", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="age", feature_type=FeatureTypeEnum.INTEGRAL),
    FeatureDefinition(feature_name="tenure", feature_type=FeatureTypeEnum.INTEGRAL),
    FeatureDefinition(feature_name="balance", feature_type=FeatureTypeEnum.FRACTIONAL),
    FeatureDefinition(feature_name="num_products", feature_type=FeatureTypeEnum.INTEGRAL),
    FeatureDefinition(feature_name="is_active", feature_type=FeatureTypeEnum.INTEGRAL),
    FeatureDefinition(feature_name="churn", feature_type=FeatureTypeEnum.INTEGRAL)
]

# Create Feature Group
feature_group = FeatureGroup(
    name="customer_churn_features",
    feature_definitions=feature_definitions,
    sagemaker_session=feature_store_session
)

# Create the Feature Group
feature_group.create(
    s3_uri=f"s3://{bucket_name}/feature_store",
    record_identifier_name="customer_id",
    event_time_feature_name="event_time",
    role_arn="your-sagemaker-role-arn",  # Replace with your SageMaker role ARN
    enable_online_store=True
)

# Ingest data into the Feature Store
import time
current_time = int(round(time.time() * 1000))
prepared_data['event_time'] = current_time

feature_group.ingest(
    data_frame=prepared_data,
    max_workers=3,
    wait=True
)

print("Data ingested into Feature Store")

### 1.3 Model Training with SageMaker

Now we'll train a model using SageMaker.

In [None]:
from sagemaker.xgboost import XGBoost

# Prepare data for training
train_data = sagemaker_session.upload_data(
    path="train.csv",
    bucket=bucket_name,
    key_prefix="train"
)

# Set up XGBoost estimator
xgb = XGBoost(
    entry_point="train.py",
    framework_version="1.0-1",
    hyperparameters={
        "max_depth": 5,
        "eta": 0.2,
        "gamma": 4,
        "min_child_weight": 6,
        "subsample": 0.8,
        "objective": "binary:logistic",
        "num_round": 100
    },
    role="your-sagemaker-role-arn",  # Replace with your SageMaker role ARN
    instance_count=1,
    instance_type="ml.m5.xlarge",
    output_path=f"s3://{bucket_name}/model"
)

# Train the model
xgb.fit({"train": train_data})

print("Model training completed")

### 1.4 Model Deployment and Monitoring

We'll deploy the trained model and set up monitoring.

In [None]:
from sagemaker.model_monitor import DataCaptureConfig, DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

# Deploy the model
predictor = xgb.deploy(
    initial_instance_count=1,
    instance_type="ml.m4.xlarge",
    data_capture_config=DataCaptureConfig(
        enable_capture=True,
        sampling_percentage=100,
        destination_s3_uri=f"s3://{bucket_name}/data-capture"
    )
)

# Set up model monitoring
my_monitor = DefaultModelMonitor(
    role="your-sagemaker-role-arn",  # Replace with your SageMaker role ARN
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

my_monitor.suggest_baseline(
    baseline_dataset=f"s3://{bucket_name}/train/train.csv",
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=f"s3://{bucket_name}/monitoring/baseline",
    wait=True
)

from sagemaker.model_monitor import CronExpressionGenerator

my_monitor.create_monitoring_schedule(
    monitor_schedule_name="churn-prediction-monitor",
    endpoint_input=predictor.endpoint_name,
    output_s3_uri=f"s3://{bucket_name}/monitoring/output",
    statistics=my_monitor.baseline_statistics(),
    constraints=my_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

print("Model deployed and monitoring set up")

### 1.5 Making Predictions

Finally, let's use our deployed model to make predictions.

In [None]:
import numpy as np

# Prepare a sample input
sample_input = np.array([[40, 5, 100000, 2, 1]])  # age, tenure, balance, num_products, is_active

# Make a prediction
result = predictor.predict(sample_input)

print(f"Churn Prediction: {result}")

# Clean up
predictor.delete_endpoint()

## 2. AWS Step Functions for ML Workflows

AWS Step Functions allow you to coordinate multiple AWS services into serverless workflows. Here's an example of how to create a Step Functions workflow for our ML pipeline:

In [None]:
import boto3
import json

stepfunctions = boto3.client('stepfunctions')

# Define the state machine
definition = {
    "Comment": "Customer Churn Prediction Workflow",
    "StartAt": "DataPrep",
    "States": {
        "DataPrep": {
            "Type": "Task",
            "Resource": "arn:aws:states:::glue:startJobRun.sync",
            "Parameters": {
                "JobName": "customer_churn_job"
            },
            "Next": "TrainModel"
        },
        "TrainModel": {
            "Type": "Task",
            "Resource": "arn:aws:states:::sagemaker:createTrainingJob.sync",
            "Parameters": {
                "TrainingJobName.$": "$$.Execution.Name",
                "AlgorithmSpecification": {
                    "TrainingImage": "...xgboost image arn...",
                    "TrainingInputMode": "File"
                },
                "RoleArn": "your-sagemaker-role-arn",
                "InputDataConfig": [{
                    "ChannelName": "train",
                    "DataSource": {
                        "S3DataSource": {
                            "S3Uri": f"s3://{bucket_name}/prepared_data/",
                            "S3DataType": "S3Prefix"
                        }
                    }
                }],
                "OutputDataConfig": {
                    "S3OutputPath": f"s3://{bucket_name}/model_output/"
                },
                "ResourceConfig": {
                    "InstanceCount": 1,
                    "InstanceType": "ml.m5.xlarge",
                    "VolumeSizeInGB": 30
                },
                "HyperParameters": {
                    "max_depth": "5",
                    "eta": "0.2",
                    "gamma": "4",
                    "min_child_weight": "6",
                    "subsample": "0.8",
                    "objective": "binary:logistic",
                    "num_round": "100"
                },
                "StoppingCondition": {
                    "MaxRuntimeInSeconds": 86400
                }
            },
            "Next": "DeployModel"
        },
        "DeployModel": {
            "Type": "Task",
            "Resource": "arn:aws:states:::sagemaker:createModel",
            "Parameters": {
                "ModelName.$": "$$.Execution.Name",
                "PrimaryContainer": {
                    "Image": "...xgboost image arn...",
                    "ModelDataUrl.$": "$.ModelArtifacts.S3ModelArtifacts"
                },
                "ExecutionRoleArn": "your-sagemaker-role-arn"
            },
            "Next": "ConfigureEndpoint"
        },
        "ConfigureEndpoint": {
            "Type": "Task",
            "Resource": "arn:aws:states:::sagemaker:createEndpointConfig",
            "Parameters": {
                "EndpointConfigName.$": "$$.Execution.Name",
                "ProductionVariants": [{
                    "InitialInstanceCount": 1,
                    "InstanceType": "ml.m4.xlarge",
                    "ModelName.$": "$$.Execution.Name",
                    "VariantName": "AllTraffic"
                }]
            },
            "Next": "Deploy"
        },
        "Deploy": {
            "Type": "Task",
            "Resource": "arn:aws:states:::sagemaker:createEndpoint",
            "Parameters": {
                "EndpointName.$": "$$.Execution.Name",
                "EndpointConfigName.$": "$$.Execution.Name"
            },
            "End": true
        }
    }
}

# Create the state machine
response = stepfunctions.create_state_machine(
    name='CustomerChurnPredictionWorkflow',
    definition=json.dumps(definition),
    roleArn='your-step-functions-role-arn'  # Replace with your Step Functions role ARN
)

print(f"State Machine ARN: {response['stateMachineArn']}")

# Start execution
execution = stepfunctions.start_execution(
    stateMachineArn=response['stateMachineArn'],
    name='CustomerChurnPrediction-' + str(int(time.time()))
)

print(f"Execution ARN: {execution['executionArn']}")

## 3. Integrating AWS Services

### 3.1 Combining Multiple Services in a Single Pipeline

The examples above demonstrate how to combine multiple AWS services in a single ML pipeline:

1. AWS Glue DataBrew for data preparation
2. Amazon S3 for data storage
3. SageMaker Feature Store for feature management
4. SageMaker for model training and deployment
5. CloudWatch for monitoring
6. Step Functions for workflow orchestration

### 3.2 Best Practices for Service Integration

When integrating multiple AWS services, consider the following best practices:

1. **Use IAM roles**: Create specific IAM roles for each service with the minimum required permissions.

2. **Implement error handling**: Use try-except blocks and Step Functions error handling to manage failures gracefully.

3. **Use environment variables**: Store configuration details in environment variables or AWS Systems Manager Parameter Store.

4. **Implement logging**: Use CloudWatch Logs for centralized logging across all services.

5. **Use versioning**: Version your data, models, and code to ensure reproducibility.

6. **Implement monitoring**: Set up CloudWatch alarms and dashboards to monitor your entire pipeline.

7. **Use infrastructure as code**: Utilize AWS CloudFormation or AWS CDK to define and manage your AWS resources.

In [None]:
Example using AWS CDK:

```python
from aws_cdk import (
    core,
    aws_s3 as s3,
    aws_sagemaker as sagemaker,
    aws_iam as iam,
)

class MLPipelineStack(core.Stack):
    def __init__(self, scope: core.Construct, id: str, **kwargs) -> None:
        super().__init__(scope, id, **kwargs)

        # Create an S3 bucket
        bucket = s3.Bucket(self, "MLDataBucket")

        # Create a SageMaker execution role
        sagemaker_role = iam.Role(self, "SageMakerExecutionRole",
            assumed_by=iam.ServicePrincipal("sagemaker.amazonaws.com"),
            managed_policies=[iam.ManagedPolicy.from_aws_managed_policy_name("AmazonSageMakerFullAccess")]
        )

        # Create a SageMaker notebook instance
        notebook = sagemaker.CfnNotebookInstance(self, "MLNotebookInstance",
            instance_type="ml.t3.medium",
            role_arn=sagemaker_role.role_arn,
            root_access="Enabled"
        )

app = core.App()
MLPipelineStack(app, "MLPipelineStack")
app.synth()
```

8. **Implement data validation**: Validate your data at each step of the pipeline to ensure data quality and consistency.

In [None]:
Example using Great Expectations:

```python
import great_expectations as ge

# Load your data
df = ge.read_csv("s3://your-bucket/your-data.csv")

# Create an expectation suite
df.expect_column_values_to_be_between("age", min_value=0, max_value=120)
df.expect_column_values_to_be_in_set("churn", [0, 1])

# Validate the data
results = df.validate()

if not results["success"]:
    raise ValueError("Data validation failed")
```

9. **Use AWS Step Functions for complex workflows**: For pipelines with multiple steps and decision points, use AWS Step Functions to orchestrate your workflow.

10. **Implement CI/CD for ML**: Use services like AWS CodePipeline and AWS CodeBuild to implement continuous integration and deployment for your ML models.

In [None]:
Example buildspec.yml for CodeBuild:

```yaml
version: 0.2

phases:
  install:
    runtime-versions:
      python: 3.8
    commands:
      - pip install pytest sagemaker boto3
  build:
    commands:
      - pytest tests/
      - python train.py
  post_build:
    commands:
      - aws sagemaker create-model --model-name $MODEL_NAME --primary-container file://container.json --execution-role-arn $SAGEMAKER_ROLE_ARN
```

11. **Use SageMaker Pipelines for ML-specific workflows**: For ML-specific workflows, consider using SageMaker Pipelines, which is purpose-built for ML orchestration.

In [None]:
```python
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.model_step import ModelStep

preprocessing_step = ProcessingStep(
    name="PreprocessData",
    processor=sklearn_processor,
    inputs=[ProcessingInput(source=input_data, destination="/opt/ml/processing/input")],
    outputs=[ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
             ProcessingOutput(output_name="test", source="/opt/ml/processing/test")],
    code="preprocess.py"
)

training_step = TrainingStep(
    name="TrainModel",
    estimator=xgb,
    inputs={"train": TrainingInput(s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
                                    content_type="text/csv")}
)

model_step = ModelStep(
    name="CreateModel",
    model=xgb.create_model(),
    inputs=training_step.properties.ModelArtifacts.S3ModelArtifacts
)

pipeline = Pipeline(
    name="CustomerChurnPipeline",
    steps=[preprocessing_step, training_step, model_step]
)

pipeline.upsert(role_arn=role)
execution = pipeline.start()
```

12. **Implement proper security measures**: Use encryption at rest and in transit, implement network isolation with VPCs, and use AWS Secrets Manager for managing sensitive information.

13. **Optimize for cost**: Use auto-scaling for SageMaker endpoints, leverage Spot Instances where appropriate, and implement lifecycle policies for S3 to manage storage costs.

## Conclusion

Integrating multiple AWS services to create end-to-end ML workflows allows you to leverage the best tools for each part of your pipeline while maintaining a cohesive and manageable system. By following best practices and using services like AWS Step Functions or SageMaker Pipelines, you can create robust, scalable, and maintainable ML workflows.

Key takeaways from this chapter:

1. **End-to-end integration**: We demonstrated how to create a complete ML workflow for customer churn prediction, integrating services like Glue DataBrew, SageMaker Feature Store, SageMaker for training and deployment, and CloudWatch for monitoring.

2. **Workflow orchestration**: We explored how to use AWS Step Functions to coordinate complex ML workflows, allowing for better error handling, parallel execution, and visual representation of your pipeline.

3. **Best practices**: We covered numerous best practices for integrating AWS services, including using IAM roles, implementing error handling, using infrastructure as code, and implementing CI/CD for ML.

4. **Flexibility and scalability**: The approaches discussed in this chapter allow for flexible and scalable ML workflows that can be easily modified and expanded as your needs grow.

5. **Managed services**: By leveraging AWS managed services, you can focus on your ML problems rather than infrastructure management.

As you build your own ML workflows, remember that the specific services and integration patterns you choose will depend on your unique requirements, data characteristics, and team expertise. Always consider factors such as data volume, model complexity, inference latency requirements, and operational constraints when designing your ML pipelines.

In the next chapters, we'll explore more advanced topics such as MLOps practices, handling big data in ML workflows, and implementing advanced monitoring and explainability for your ML models.