# Model Training and Deployment Pipeline

The pipeline is aiming to meet below needs:
* Data source change to trigger model retraining and deployment
* Model training code change to trigger model retraining and deployment
* Define a process to evaluate trained model and promotion to production environment

To achieve the needs, we design the pipeline with AWS services:
* Step Functions with [Step Functions Data Science SDK v2.0.0rc1](https://aws-step-functions-data-science-sdk.readthedocs.io/en/v2.0.0rc1/) to orchestrate model training and deployment in Amazon SageMaker platform.
* AWS CodePipeline to define the high-level orchestration from source code and data source changes triggering model training & deployment on Dev and Production environments
* AWS CodeBuild to process model training and deployment workflow creation and data source update.
* AWS Lambda to process basic functions in model training / deployment
* AWS Simple Notification Service to process notification.
* AWS CloudFormation to create the demo ML pipeline stack.

Below is the design diagram:

![ML Pipeline Design](./images/ml_pipeline_design.png)


## Pipeline Demo

We demo how to trigger source code and data source change, which will trigger ML Pipeline build.

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
from pipeline.ml_pipeline_dependencies import *

In [None]:
import pandas as pd
import numpy as np

### Submit Data Source Change

Download source file

In [None]:
!wget https://sagemaker-sample-data-us-west-2.s3-us-west-2.amazonaws.com/autopilot/direct_marketing/bank-additional.zip

In [None]:
import zipfile
with zipfile.ZipFile("./bank-additional.zip", 'r') as zip_ref:
    zip_ref.extractall(".")

Upload data source to target S3 location.

In [None]:
ssm = boto3.client('ssm')
response = ssm.get_parameter(Name = "/ml_pipeline/pipeline_artifact_s3_bucket_name")
pipeline_artifact_bucket_name = response['Parameter']['Value']
# model name must match with ml pipeline stack parameter - ModelName
model_name = "directmarketing"

In [None]:
data_source_file = "./bank-additional/bank-additional-full.csv"

target_s3_uri = f's3://${pipeline_artifact_bucket_name}/{model_name}'
    
sagemaker.s3.S3Uploader.upload(
    data_source_file, 
    target_s3_uri, 
    sagemaker_session = sagemaker_session)


In [None]:
code_pipeline_name = model_name
display_codepipeline_advice(code_pipeline_name)

### Submit Source Code Change

Please click 'Save' button to persist the notebook changes and we shall push the changes to repo.

In [None]:
# Once the changes are saved, run this cell to trigger ML pipeline
!git add 03_model_training_and_deployment_pipeline.ipynb
!git commit -m "I push code change to trigger ML Pipeline."
!git push

### Model Evaluation on Dev

We shall download 'test.csv' file from processing output and use it to evaluate Dev Model performance.

In [None]:
ssm = boto3.client('ssm')
response = ssm.get_parameter(Name = "/ml_pipeline/model_training_s3_bucket_name")
model_training_bucket_name = response['Parameter']['Value']

In [None]:
sagemaker.s3.S3Downloader().download(f's3://{model_training_bucket_name}/preprocessing/output/test/test.csv', './data')
test_data = pd.read_csv('./data/test.csv')
test_data.head()

In [None]:
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer

dev_endpoint_name = "direct-marketing-endpoint-dev"
dev_predictor = Predictor(dev_endpoint_name, 
                      sagemaker_session = sagemaker_session, 
                      serializer = CSVSerializer())

In [None]:
def predict(predictor, data, rows=500):
    split_array = np.array_split(data, int(data.shape[0] / float(rows) + 1))
    predictions = ''
    for array in split_array:
        predictions = ','.join([predictions, predictor.predict(array).decode('utf-8')])

    return np.fromstring(predictions[1:], sep=',')

In [None]:
predictions = predict(dev_predictor, test_data.drop(['y_no', 'y_yes'], axis=1).to_numpy())

In [None]:
pd.crosstab(index=test_data['y_yes'], columns=np.round(predictions), rownames=['actuals'], colnames=['predictions'])

### Production Variants Evaluation

Evaluate production endpoint performance against `blue` and `green` variants.
* `blue` - it doesn't exist if prodcution endpoint has not been deployed before; or, it will be the production existing endpoint's model.
* `green` - it's from dev endpoint, which is the release candidate model.

In [None]:
BLUE_VARIANT_NAME = 'blue-variant'
GREEN_VARIANT_NAME = 'green-variant'

#### Proceed if 2 variants are deployed.

In [None]:
prod_endpoint_name = "direct-marketing-endpoint-prd"

endpoint_response = sm.describe_endpoint(
    EndpointName = prod_endpoint_name
)
variant_names = [ variant['VariantName'] for variant in endpoint_response['ProductionVariants'] ]
variant_names

#### Evaluate variants' performance

In [None]:
prod_predictor = Predictor(prod_endpoint_name, 
                      sagemaker_session = sagemaker_session, 
                      serializer = CSVSerializer())

In [None]:
predictions = predict(prod_predictor, test_data.drop(['y_no', 'y_yes'], axis=1).to_numpy())

In [None]:
cw = boto3.Session().client("cloudwatch")

def get_invocation_metrics_for_endpoint_variant(endpoint_name,
                                                variant_name,
                                                start_time,
                                                end_time):
    metrics = cw.get_metric_statistics(
        Namespace="AWS/SageMaker",
        MetricName="Invocations",
        StartTime=start_time,
        EndTime=end_time,
        Period=60,
        Statistics=["Sum"],
        Dimensions=[
            {
                "Name": "EndpointName",
                "Value": endpoint_name
            },
            {
                "Name": "VariantName",
                "Value": variant_name
            }
        ]
    )
    return pd.DataFrame(metrics["Datapoints"])\
            .sort_values("Timestamp")\
            .set_index("Timestamp")\
            .drop("Unit", axis=1)\
            .rename(columns={"Sum": variant_name})

def plot_endpoint_metrics(endpoint_name, variant_names, start_time=None):
    start_time = start_time or datetime.now() - timedelta(minutes=30)
    end_time = datetime.now()
    metrics_variant1 = get_invocation_metrics_for_endpoint_variant(endpoint_name, variant_names[0], start_time, end_time)
    metrics_variants = metrics_variant1
    if len(variant_names) > 1:
        metrics_variant2 = get_invocation_metrics_for_endpoint_variant(endpoint_name, variant_names[1], start_time, end_time)
        metrics_variants = metrics_variant1.join(metrics_variant2, how="outer")
    metrics_variants.plot()
    return metrics_variants

In [None]:
time.sleep(30) #let metrics catch up
plot_endpoint_metrics(prod_endpoint_name, variant_names)

#### More Detail on Variant Evaluation

Please refer to SageMaker example notebook - [a_b_testing.ipynb](https://github.com/aws/amazon-sagemaker-examples/blob/master/sagemaker_endpoints/a_b_testing/a_b_testing.ipynb) for more detail.