# 5. Monitor a Model Endpoint using SageMaker Model Monitor

* Goals
    * Create a baselining job to learn dataset contraints and statistics
    * Enable data capture to store input data
    * Compare input data to constraints and statistics from the training set
    * Visualize differences in the distributions

In [32]:
%cd /root/sagemaker-workshop-420/notebooks

/root/sagemaker-workshop-420/notebooks


In [33]:
import boto3
import numpy as np
import pandas as pd
import sagemaker
from sagemaker import get_execution_role
from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker.model import Model

In [34]:
boto_session = boto3.Session()
region = boto_session.region_name
sagemaker_session = sagemaker.Session()
role = get_execution_role()
print(role)

arn:aws:iam::209970524256:role/service-role/AmazonSageMaker-ExecutionRole-20200402T065938


In [35]:
BUCKET = 'sagemaker-workshop-420'
PREFIX = 'xgb-churn'

LOCAL_DATA_DIRECTORY = f'../data/{PREFIX}'

print(f"\nArtifacts will be written to/read from s3://{BUCKET}/{PREFIX}")


Artifacts will be written to/read from s3://sagemaker-workshop-420/xgb-churn


## 1. Load the pretrained XGBoost model

In [36]:
xgboost_image_name = get_image_uri(boto_session.region_name, 'xgboost', repo_version='0.90-2')
xgboost_image_name

'257758044811.dkr.ecr.us-east-2.amazonaws.com/sagemaker-xgboost:0.90-2-cpu-py3'

In [6]:
model_s3_path = 's3://sagemaker-workshop-420/xgb-churn/output/xgboost-customer-churn-2020-04-11-12-15-49-863/output/model.tar.gz'

In [9]:
xgboost_model = Model(model_data=model_s3_path,
                      image=xgboost_image_name,
                      role=role,
                      sagemaker_session=sagemaker_session)

---
## 2. Host the Model and Enable Data Capture

Now that we've trained the model, let's deploy it to a hosted endpoint. To monitor the model after it's hosted and serving requests, we'll also add configurations to capture data that is being sent to the endpoint.

In [10]:
import time
from time import strftime, gmtime
from sagemaker.model_monitor import DataCaptureConfig, DatasetFormat, DefaultModelMonitor
from sagemaker.predictor import csv_serializer, RealTimePredictor

In [12]:
data_capture_config = DataCaptureConfig(enable_capture=True,
                                        sampling_percentage=100,
                                        destination_s3_uri=f's3://{BUCKET}/{PREFIX}/model-monitor/data-capture')

In [13]:
endpoint_name = "xgboost-customer-churn-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(f"EndpointName = {endpoint_name}")

EndpointName = xgboost-customer-churn-2020-04-12-20-57-13


In [14]:
xgb_predictor = xgboost_model.deploy(initial_instance_count=1, 
                                     instance_type='ml.m4.xlarge',
                                     endpoint_name=endpoint_name,
                                     data_capture_config=data_capture_config)

--------------!

In [17]:
xgboost_model.name

'sagemaker-xgboost-2020-04-12-20-57-18-515'

In [18]:
xgboost_model.endpoint_name

'xgboost-customer-churn-2020-04-12-20-57-13'

---
## 3. Initialize a Predictor 

Initialize a `sagemaker.predictor.RealTimePredictor` so we can make real-time predictions from our model by making an HTTP POST request. We also set up serializers and deserializers for passing our NumPy arrays to the model behind the endpoint.

In [19]:
xgb_predictor = RealTimePredictor(endpoint=xgboost_model.endpoint_name,
                                  sagemaker_session=sagemaker_session,
                                  serializer = csv_serializer,
                                  deserializer=None,
                                  content_type='text/csv')

---
## 4. Invoke the deployed model

Now that we have a hosted endpoint running, we can make real-time predictions. First let's load in our data.

Now, we'll loop over our test dataset and collect predictions by invoking the XGBoost endpoint:

In [30]:
!head -5 ../data/xgb-churn/test_sample.csv

186,0.1,137.8,97,187.7,118,146.4,85,8.7,6,1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,0.10,0.11,0.12,0.13,0.14,0.15,0.16,0.17,1.1,0.18,0.19,0.20,0.21,0.22,0.23,0.24,0.25,0.26,0.27,0.28,0.29,0.30,0.31,0.32,0.33,0.34,0.35,0.36,0.37,0.38,0.39,0.40,0.41,0.42,0.43,0.44,0.45,0.46,0.47,0.48,0.49,0.50,0.51,0.52,0.53,1.2,1.3,0.54,1.4,0.55
132,25,113.2,96,269.9,107,229.1,87,7.1,7,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,1,0,0,1
112,17,183.2,95,252.8,125,156.7,95,9.7,3,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,1,0,0,1
91,24,93.5,112,183.4,128,240.7,133,9.9,3,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,1,0,1
22,0,110.3,107,166.5,93,202.3,96,9.5,5,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,0,0,1,0,1,0


In [21]:
print("Sending test traffic to the endpoint {}. \nPlease wait for a minute...".format(endpoint_name))

with open(f'{LOCAL_DATA_DIRECTORY}/test_sample.csv', 'r') as f:
    for row in f:
        payload = row.rstrip('\n')
        #print(payload)
        #break
        response = xgb_predictor.predict(data=payload)
        time.sleep(0.5)

Sending test traffic to the endpoint xgboost-customer-churn-2020-04-12-20-57-13. 
Please wait for a minute...


### Verify that data is captured in Amazon S3

When we made some real-time predictions by sending data to our endpoint, we should have also captured that data for monitoring purposes. 

Let's list the data capture files stored in Amazon S3. Expect to see different files from different time periods organized based on the hour in which the invocation occurred. The format of the Amazon S3 path is:

`s3://{destination-bucket-prefix}/{endpoint-name}/{variant-name}/yyyy/mm/dd/hh/filename.jsonl`

In [23]:
import json
from sagemaker.s3 import S3Uploader, S3Downloader

In [26]:
current_endpoint_capture_prefix

'xgb-churn/datacapture/xgboost-customer-churn-2020-04-12-20-57-13'

In [27]:
current_endpoint_capture_prefix = f'{PREFIX}/{endpoint_name}'
print("Found Data Capture Files:")
capture_files = S3Downloader.list(f"s3://{BUCKET}/{current_endpoint_capture_prefix}")
print(capture_files)

Found Data Capture Files:
['s3://sagemaker-workshop-420/xgb-churn/xgboost-customer-churn-2020-04-12-20-57-13/AllTraffic/2020/04/12/21/05-21-406-c96a56c4-8bc2-407d-88cc-2785f9e354e6.jsonl', 's3://sagemaker-workshop-420/xgb-churn/xgboost-customer-churn-2020-04-12-20-57-13/AllTraffic/2020/04/12/21/06-21-675-c2c2d0c4-f136-4954-aecd-66066e76b373.jsonl']


All the data captured is stored in a SageMaker specific json-line formatted file. Next, Let's take a quick peek at the contents of a single line in a pretty formatted json so that we can observe the format a little better.

In [28]:
capture_file = S3Downloader.read_file(capture_files[-1])

print("=====Single Data Capture====")
print(json.dumps(json.loads(capture_file.split('\n')[0]), indent=2)[:2000])

=====Single Data Capture====
{
  "captureData": {
    "endpointInput": {
      "observedContentType": "text/csv",
      "mode": "INPUT",
      "data": "92,0,176.3,85,93.4,125,207.2,107,9.6,1,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0,1,0",
      "encoding": "CSV"
    },
    "endpointOutput": {
      "observedContentType": "text/csv; charset=utf-8",
      "mode": "OUTPUT",
      "data": "0.041860517114400864",
      "encoding": "CSV"
    }
  },
  "eventMetadata": {
    "eventId": "eacd5b42-561a-46cf-bc82-99d8c21c4625",
    "inferenceTime": "2020-04-12T21:06:21Z"
  },
  "eventVersion": "0"
}


---
## Amazon SageMaker Model Monitor

Amazon SageMaker Model Monitor lets you monitor and evaluate the data observed by endpoints. It works like this:
1. We need to create a baseline that we can use to compare real-time traffic against. 
1. When a baseline is ready, we can set up a schedule to continously evaluate and compare against the baseline.
1. We can send synthetic traffic to trigger alarms.

**Important**: It takes an hour or more to complete this section because the shortest monitoring polling time is one hour.

### Baselining and continous monitoring

#### 1. Constraint suggestion with the baseline (training) dataset

The training dataset that you use to train a model is usually a good baseline dataset. Note that the training dataset data schema and the inference dataset schema must match exactly (for example, they should have the same number and type of features).

Using our training dataset, let's ask Amazon SageMaker Model Monitor to suggest a set of baseline `constraints` and generate descriptive `statistics` so we can explore the data. For this example, let's upload the training dataset, which we used to train model. We'll use the dataset file with column headers so we have descriptive feature names.

In [44]:
baseline_prefix = PREFIX + '/baselining'
baseline_data_prefix = baseline_prefix + '/data'
baseline_results_prefix = baseline_prefix + '/results'

baseline_data_uri = 's3://{}/{}'.format(BUCKET, baseline_data_prefix)
baseline_results_uri = 's3://{}/{}'.format(BUCKET, baseline_results_prefix)

print('Baseline data uri: {}'.format(baseline_data_uri))
print('Baseline results uri: {}'.format(baseline_results_uri))

baseline_data_path = S3Uploader.upload(f"{LOCAL_DATA_DIRECTORY}/training-dataset-with-header.csv", baseline_data_uri)

Baseline data uri: s3://sagemaker-workshop-420/xgb-churn/baselining/data
Baseline results uri: s3://sagemaker-workshop-420/xgb-churn/baselining/results


##### Create a baselining job with the training dataset

Now that we have the training data ready in S3, let's start a job to `suggest` constraints. To generate the constraints, the convenient helper starts a `ProcessingJob` using a ProcessingJob container provided by Amazon SageMaker.

In [None]:
my_default_monitor = DefaultModelMonitor(role=role,
                                         instance_count=1,
                                         instance_type='ml.m5.xlarge',
                                         volume_size_in_gb=20,
                                         max_runtime_in_seconds=3600)

baseline_job = my_default_monitor.suggest_baseline(baseline_dataset=baseline_data_path,
                                                   dataset_format=DatasetFormat.csv(header=True),
                                                   output_s3_uri=baseline_results_uri,
                                                   wait=True)

Once the job succeeds, we can explore the `baseline_results_uri` location in s3 to see what files where stored there.

In [None]:
print("Found Files:")
S3Downloader.list(f"s3://{BUCKET}/{baseline_results_prefix}")

We have a`constraints.json` file that has information about suggested constraints. We also have a `statistics.json` which contains statistical information about the data in the baseline.

In [None]:
baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.io.json.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df.head(10)

In [None]:
constraints_df = pd.io.json.json_normalize(baseline_job.suggested_constraints().body_dict["features"])
constraints_df.head(10)

#### 2. Analyzing subsequent captures for data quality issues

Now that we've generated a baseline dataset and processed it to get baseline statistics and constraints, let's monitor and analyze the data being sent to the endpoint with monitoring schedules.

##### Create a schedule

First, let's create a monitoring schedule for the endpoint. The schedule specifies the cadence at which we want to run a new processing job so that we can compare recent data captures to the baseline. This schedule will apply to the Endpoint created before and also the baseline resources (constraints and statistics) which were generated above.

In [None]:
from sagemaker.model_monitor import CronExpressionGenerator
from time import gmtime, strftime

reports_prefix = f'{PREFIX}/reports'
s3_report_path = f's3://{BUCKET}/{reports_prefix}'

mon_schedule_name = 'xgboost-customer-churn-model-schedule-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())

my_default_monitor.create_monitoring_schedule(monitor_schedule_name=mon_schedule_name,
                                              endpoint_input=xgb_predictor.endpoint,
                                              output_s3_uri=s3_report_path,
                                              statistics=my_default_monitor.baseline_statistics(),
                                              constraints=my_default_monitor.suggested_constraints(),
                                              schedule_cron_expression=CronExpressionGenerator.hourly(),
                                              enable_cloudwatch_metrics=True)

#### 3. Start generating some artificial traffic
The following block starts a thread to send some traffic to the endpoint. This allows us to continue to send traffic to the endpoint so that we'll have data continually captured for analysis. If there is no traffic, the monitoring jobs will start to fail later.

To terminate this thread, you need to stop the kernel.

In [56]:
from threading import Thread
from time import sleep
import time

runtime_client = boto3.client('runtime.sagemaker')

# (just repeating code from above for convenience/ able to run this section independently)
def invoke_endpoint(ep_name, file_name, runtime_client):
    with open(file_name, 'r') as f:
        for row in f:
            payload = row.rstrip('\n')
            response = runtime_client.invoke_endpoint(EndpointName=ep_name,
                                                      ContentType='text/csv', 
                                                      Body=payload)
            time.sleep(1)
            
def invoke_endpoint_forever():
    while True:
        invoke_endpoint(endpoint_name, f'{LOCAL_DATA_DIRECTORY}/test-dataset-input-cols.csv', runtime_client)
        
thread = Thread(target = invoke_endpoint_forever)
thread.start()

# Note that you need to stop the kernel to stop the invocations

##### List executions
Once the schedule is set up, jobs start at the specified intervals. The following code lists the last five executions. If you run this code soon after creating the hourly schedule, you might not see any executions listed. To see executions, you might have to wait until you cross the hour boundary (in UTC). The code includes the logic for waiting.

In [None]:
mon_executions = my_default_monitor.list_executions()
if len(mon_executions) == 0:
    print("We created a hourly schedule above and it will kick off executions ON the hour.\nWe will have to wait till we hit the hour...")

while len(mon_executions) == 0:
    print("Waiting for the 1st execution to happen...")
    time.sleep(60)
    mon_executions = my_default_monitor.list_executions()  

##### Evaluate the latest execution and list the generated reports

In [None]:
latest_execution = mon_executions[-1]
print("Latest execution result: {}".format(latest_execution.describe()['ExitMessage']))
report_uri = latest_execution.output.destination

print("Found Report Files:")
S3Downloader.list(report_uri)

##### List violations

If there are any violations compared to the baseline, they will be generated here. Let's list the violations.

In [None]:
violations = my_default_monitor.latest_monitoring_constraint_violations()
pd.set_option('display.max_colwidth', -1)
constraints_df = pd.io.json.json_normalize(violations.body_dict["violations"])
constraints_df.head(10)

## Get Execution and Baseline details from Processing Job Arn

Enter the ProcessingJob arn for an execution of a MonitoringSchedule below to get the result files associated with that execution

In [67]:
processing_job_arn = latest_execution.describe()['ProcessingJobArn']
processing_job_arn

'arn:aws:sagemaker:us-east-2:209970524256:processing-job/model-monitoring-202004111100-a71a88f315417d8d74d5f73f'

In [72]:
import os

from sagemaker.model_monitor import MonitoringExecution

In [70]:
execution = MonitoringExecution.from_processing_arn(sagemaker_session=sagemaker_session, processing_job_arn=processing_job_arn)
exec_inputs = {inp['InputName']: inp for inp in execution.describe()['ProcessingInputs']}
exec_results = execution.output.destination

In [73]:
baseline_statistics_filepath = exec_inputs['baseline']['S3Input']['S3Uri'] if 'baseline' in exec_inputs else None
execution_statistics_filepath = os.path.join(exec_results, 'statistics.json')
violations_filepath = os.path.join(exec_results, 'constraint_violations.json')

baseline_statistics = json.loads(S3Downloader.read_file(baseline_statistics_filepath)) if baseline_statistics_filepath is not None else None
execution_statistics = json.loads(S3Downloader.read_file(execution_statistics_filepath))
violations = json.loads(S3Downloader.read_file(violations_filepath))['violations']

## Overview

The code below shows the violations and constraichecks across all features in a simple table.

In [74]:
import utils as mu

In [None]:
mu.show_violation_df(baseline_statistics=baseline_statistics, latest_statistics=execution_statistics, violations=violations)

## Distributions

This section visualizes the distribution and renders the distribution statistics for all features

In [76]:
features = mu.get_features(execution_statistics)
feature_baselines = mu.get_features(baseline_statistics)

In [None]:
mu.show_distributions(features)

### Execution Stats vs Baseline

In [None]:
mu.show_distributions(features, feature_baselines)

## Clean up

If you no longer need this notebook, clean up your environment by running the following cell. It removes the hosted endpoint that you created for this walkthrough and prevents you from incurring charges for running an instance that you no longer need. It also cleans up all artifacts related to the experiments. 

You'll also want to delete artifacts stored in the S3 bucket used in this notebook.

In [None]:
#sess.delete_monitoring_schedule(mon_schedule_name)
sagemaker_session.delete_endpoint(xgb_predictor.endpoint)