# Amazon SageMaker Model Monitoring And Data Capture
_**Hosting a Model in Amazon SageMaker and Capturing Inference requests, results, and metadata**_

*NOTE - THIS FEATURE IS CONFIDENTIAL AND SHARED UNDER NDA. THE FEATURE IS IN BETA AND SHOULD NOT BE USED FOR PRODUCTION ENDPOINTS. THIS FEATURE IS CURRENTLY IN DEVELOPMENT AND THE API SPECIFICATIONS MAY CHANGE*

---


---
## Background

Amazon SageMaker provides every developer and data scientist with the ability to build, train, and deploy machine learning models quickly. Amazon SageMaker is a fully-managed service that covers the entire machine learning workflow. You can label and prepare your data, choose an algorithm, train a model, and then tune and optimize it for deployment. Amazon SageMaker gets your models into production to make predictions or take actions with less effort and lower costs than was previously possible.

Amazon SageMaker is adding new capabilities that monitor ML models while in production and that detect deviations in data quality in comparison to a baseline dataset (e.g. training data set). They enable you to capture the metadata and the input and output for invocations of the models that you deploy with Amazon SageMaker. They also enable you to analyze the data and monitor its quality. 

This notebook shows you how to capture the model invocation data from an endpoint and then view that data in S3. Soon, we plan to provide an additional example that shows how to analyze the data collected. 

---

TODO : Add usecase details


## Seciton I - Setup

Let's start by specifying:

* The AWS region used to host you model.
* The IAM role ARN used to give learning and hosting access to your data. See the documentation for how to specify these.
* The S3 bucket used to store the data used to train your model, any additional model data, and the data captured from model invocations.

#### Add the SageMaker Internal Model
This step is required to enable the data capture feature for beta.

TODO : ADD CEll NUMBERS.  BASE IT ON FRAUD DETECTION EXAMPLE

In [1]:
!aws configure add-model --service-model file://sagemaker-internal-model.json --service-name sagemaker

In [2]:
%%time

import os
import boto3
import re
import json
from sagemaker import get_execution_role, session

region = boto3.Session().region_name

role = get_execution_role()
print("RoleArn: {}".format(role))

# You can modify the bucket to be your own, but make sure the role you chose for this notebook
# has s3:PutObject permissions. This is the bucket into which the data will be captured
bucket =  session.Session(boto3.Session()).default_bucket()
print("Recommendations Bucket: {}".format(bucket))
prefix = 'sagemaker/Recommendations-ModelMonitor'

data_capture_prefix = '{}/datacapture'.format(prefix)
s3_capture_upload_path = 's3://{}/{}'.format(bucket, data_capture_prefix)
reports_prefix = '{}/reports'.format(prefix)
s3_report_path = 's3://{}/{}'.format(bucket,reports_prefix)
code_prefix = '{}/code'.format(prefix)
s3_code_preprocessor_uri = 's3://{}/{}/{}'.format(bucket,code_prefix, 'preprocessor.py')
s3_code_postprocessor_uri = 's3://{}/{}/{}'.format(bucket,code_prefix, 'postprocessor.py')

print("Capture path: {}".format(s3_capture_upload_path))
print("Report path: {}".format(s3_report_path))
print("Preproc Code path: {}".format(s3_code_preprocessor_uri))
print("Postproc Code path: {}".format(s3_code_postprocessor_uri))

sm_client = boto3.client('sagemaker')
 

RoleArn: arn:aws:iam::555360056434:role/SageMakerNotebookInstance-ServiceRolereinvent-dcfc53b0
Recommendations Bucket: sagemaker-us-west-2-555360056434
Capture path: s3://sagemaker-us-west-2-555360056434/sagemaker/Recommendations-ModelMonitor/datacapture
Report path: s3://sagemaker-us-west-2-555360056434/sagemaker/Recommendations-ModelMonitor/reports
Preproc Code path: s3://sagemaker-us-west-2-555360056434/sagemaker/Recommendations-ModelMonitor/code/preprocessor.py
Postproc Code path: s3://sagemaker-us-west-2-555360056434/sagemaker/Recommendations-ModelMonitor/code/postprocessor.py
CPU times: user 1.01 s, sys: 166 ms, total: 1.17 s
Wall time: 6.43 s


In [3]:
## Variables to the pretrained models, baseline and test data
LOCAL_MODELS_DIR='../../models'
LOCAL_DATA_DIR='../../data'

MOVIE_RECOMMENDATION_MODEL='model.tar.gz'
MUSIC_RECOMMENDATION_MODEL='music-rec-kiran.tar.gz'

MOVIE_RECOMMENDATION_TRAIN_DATA='movie_train_age.csv'

MOVIE_RECOMMENDATION_TEST_DATA='movie_test_age.csv'
MUSIC_RECOMMENDATION_TEST_DATA='music_test.csv'

MOVIE_RECOMMENDATION_BASELINE_DATA='baseline.csv'

Let's quickly test the notebook has the right permissions needed for the demo. We will put a simple test object into the S3 bucket we specified above. If this command fails, then the demo will not work as intended. You can fix this by updating the role associated with this notebook to have "s3:PutObject" permissions and try this validation again

##TODO : DESCRIBE DATA AND FEATURES 

In [4]:
# One time execution of this cell is good enough---
# let's go ahead and upload some test scripts
boto3.Session().resource('s3').Bucket(bucket).Object("test_upload/test.txt").upload_file('test_data/upload-test-file.txt')
print("Success! You are all set to proceed.")

Success! You are all set to proceed.


## Section II - Deploy pre-trained model with model data capture enabled

### Deploy model on a SageMaker Endpoint

#### Upload the pre-trained model to S3

This code uploads a pre-trained XGBoost movie recommendation model that is ready for you to deploy.

In [5]:
##Copy model to S3 bucket.
def copy_model_to_s3(model_name):
    key = prefix + "/" + model_name
    with open(LOCAL_MODELS_DIR+'/'+model_name, 'rb') as file_obj:
        print("Uploading ", file_obj , " to bucket ",bucket, " as " , key)
        boto3.Session().resource('s3').Bucket(bucket).Object(key).upload_fileobj(file_obj)

In [6]:
##Copy movie recommendation model to S3
copy_model_to_s3(MOVIE_RECOMMENDATION_MODEL)

Uploading  <_io.BufferedReader name='../../models/model.tar.gz'>  to bucket  sagemaker-us-west-2-555360056434  as  sagemaker/Recommendations-ModelMonitor/model.tar.gz


#### Enable capturing real-time inference data from SageMaker Endpoints
Now let's create a SageMaker Endpoint to showcase the data capture capability in action.

#### Create SageMaker Model entity

This step creates an Amazon SageMaker model from the movie recommendations model file previously uploaded to S3.

In [7]:
from sagemaker.amazon.amazon_estimator import get_image_uri
container = get_image_uri(boto3.Session().region_name, 'xgboost', '0.90-1')

container

'246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-xgboost:0.90-1-cpu-py3'

In [8]:
%%time
from time import gmtime, strftime

model_name = "MoviePredictions-EndpointDataCaptureModel-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())

sm_client = boto3.client('sagemaker')

model_url = 'https://{}.s3-{}.amazonaws.com/{}/{}'.format(bucket, region, prefix,MOVIE_RECOMMENDATION_MODEL)

print (model_url)

primary_container = {
    'Image': container,
    'ModelDataUrl': model_url,
}

create_model_response = sm_client.create_model(
    ModelName = model_name,
    ExecutionRoleArn = role,
    PrimaryContainer = primary_container)

print(create_model_response['ModelArn'])

https://sagemaker-us-west-2-555360056434.s3-us-west-2.amazonaws.com/sagemaker/Recommendations-ModelMonitor/model.tar.gz
arn:aws:sagemaker:us-west-2:555360056434:model/moviepredictions-endpointdatacapturemodel-2019-11-27-18-03-08
CPU times: user 17.8 ms, sys: 112 µs, total: 17.9 ms
Wall time: 263 ms


#### Create Endpoint Configuration

This step is required to deploy an Amazon SageMaker model on an endpoint. To enable data capture for monitoring the model data quality, you specify the new capture option called "DataCaptureConfig". You can capture the request payload, the response payload or both with this configuration. The data capture is supported at the endpoint configuration level and applies to all variants. The captured data is stored in a json format. The comments highlight the new API parameters for data capture.

Can be used independent of Model monitoring.  
Can be used for further labeling efforts.
Data in S3, secure with policies.  All in YOUR S3 buckets.

In [9]:
from time import gmtime, strftime

data_capture_sub_folder = "datacapture-xgboost-movie-recommendations"
s3_capture_upload_path = 's3://{}/{}'.format(bucket, data_capture_sub_folder)

print("Capture path:"+ s3_capture_upload_path)

data_capture_configuration = {
    "EnableCapture": True, # flag turns data capture on and off
    "InitialSamplingPercentage": 90, # sampling rate to capture data. max is 100%
    "DestinationS3Uri": s3_capture_upload_path, # s3 location where captured data is saved
    "CaptureOptions": [
        {
            "CaptureMode": "Output" # The type of capture this option enables. Values can be: [Output/Input]
        },
        {
            "CaptureMode": "Input" # The type of capture this option enables. Values can be: [Output/Input]
        }
    ],
    "CaptureContentTypeHeader": {
       "CsvContentTypes": ["text/csv"], # headers which should signal to decode the payload into CSV format 
       "JsonContentTypes": ["application/json"] # headers which should signal to decode the payload into JSON format 
     }
}

endpoint_config_name = 'XGBoost-MovieRec-DataCaptureEndpointConfig-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_config_name)
create_endpoint_config_response = sm_client.create_endpoint_config(
    EndpointConfigName = endpoint_config_name,
    ProductionVariants=[{
        'InstanceType':'ml.m5.xlarge',
        'InitialInstanceCount':1,
        'InitialVariantWeight':1,
        'ModelName':model_name,
        'VariantName':'AllTrafficVariant'
    }],
    DataCaptureConfig = data_capture_configuration) # This is where the new capture options are applied

print("Endpoint Config Arn: " + create_endpoint_config_response['EndpointConfigArn'])

Capture path:s3://sagemaker-us-west-2-555360056434/datacapture-xgboost-movie-recommendations
XGBoost-MovieRec-DataCaptureEndpointConfig-2019-11-27-18-03-11
Endpoint Config Arn: arn:aws:sagemaker:us-west-2:555360056434:endpoint-config/xgboost-movierec-datacaptureendpointconfig-2019-11-27-18-03-11


#### Create Endpoint
This step uses the endpoint configuration specified above to create an endpoint. This takes a few minutes (approximately 9 minutes) to complete.

In [10]:
%%time
import time

endpoint_name = 'XGBoost-SM-MovieRec-DataCaptureEndpoint-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_name)
create_endpoint_response = sm_client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name)
print(create_endpoint_response['EndpointArn'])

resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
status = resp['EndpointStatus']
print("Status: " + status)

while status=='Creating':
    time.sleep(60)
    resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
    status = resp['EndpointStatus']
    print("Status: " + status)

print("Arn: " + resp['EndpointArn'])
print("Status: " + status)

XGBoost-SM-MovieRec-DataCaptureEndpoint-2019-11-27-18-03-13
arn:aws:sagemaker:us-west-2:555360056434:endpoint/xgboost-sm-movierec-datacaptureendpoint-2019-11-27-18-03-13
Status: Creating
Status: Creating
Status: Creating
Status: Creating
Status: Creating
Status: Creating
Status: Creating
Status: Creating
Status: Creating
Status: InService
Arn: arn:aws:sagemaker:us-west-2:555360056434:endpoint/xgboost-sm-movierec-datacaptureendpoint-2019-11-27-18-03-13
Status: InService
CPU times: user 129 ms, sys: 14 ms, total: 143 ms
Wall time: 9min 1s


## Section III - Execute movie predictions and analyze data captured from the deployed movie recommendation model

### Invoke the Deployed Model

You can now send data to this endpoint to get inferences in realtime. Because we have enabled the data capture in the previous steps, the request and response payload along with some additional metadata will be saved in the S3 location you have specified.

In [13]:
##For data capturing we willperform predictions with a subset of test data.
runtime_client = boto3.client('runtime.sagemaker')

with open(LOCAL_DATA_DIR+"/"+MOVIE_RECOMMENDATION_TEST_DATA, 'r') as f:
    contents = f.readlines()
    
for i in range(0, 30):
    line = contents[i]
    split_data = line.split(',')
    #Remove the original rating value from data used for prediction
    original_value = split_data.pop(0)
    
    payload = ','.join(split_data)
    
    response = runtime_client.invoke_endpoint(EndpointName=endpoint_name,
                                              ContentType='text/csv', 
                                              Body=payload)
    prediction = response['Body'].read().decode('utf-8')

    print("Original Value ", original_value , "Prediction : ", float(prediction))
    

Original Value  4 Prediction :  3.5007636547088623
Original Value  4 Prediction :  3.7794837951660156
Original Value  4 Prediction :  3.9953973293304443
Original Value  4 Prediction :  2.6379570960998535
Original Value  5 Prediction :  4.199619293212891
Original Value  3 Prediction :  4.081928253173828
Original Value  4 Prediction :  3.7032620906829834
Original Value  5 Prediction :  3.0114622116088867
Original Value  4 Prediction :  4.1562089920043945
Original Value  4 Prediction :  3.8722405433654785
Original Value  1 Prediction :  3.506897211074829
Original Value  5 Prediction :  4.235185623168945
Original Value  3 Prediction :  3.904029369354248
Original Value  4 Prediction :  3.734802007675171
Original Value  4 Prediction :  3.9492900371551514
Original Value  3 Prediction :  3.069108724594116
Original Value  3 Prediction :  3.409313917160034
Original Value  3 Prediction :  3.2433159351348877
Original Value  5 Prediction :  4.112346649169922
Original Value  5 Prediction :  3.440938

TODO : CAN THIS BE DELETED?? This step invokes the endpoint with included sample data for ~2 minutes. Data is captured based on the sampling percentage specified. If your endpoint runs for a long time, the data from your endpoint will continue to be captured and saved.

### View Captured Data

Now let's list the data capture files stored in S3. You should expect to see different files from different time periods organized based on the hour in which the invocation occurred. The format of the s3 path is:

s3://bucket-name/endpoint-name/year/month/day/hour/variant-name/filename.jsonl

**NOTE:** This path is subject to change during the beta period

In [37]:
s3_client = boto3.Session().client('s3')
result = s3_client.list_objects(Bucket=bucket, Prefix=data_capture_sub_folder)
print("data_capture_sub_folder : " , data_capture_sub_folder)
capture_files = [capture_file.get("Key") for capture_file in result.get('Contents')]
print("Found Capture Files:")
print("\n ".join(capture_files))

data_capture_sub_folder :  datacapture-xgboost-movie-recommendations
Found Capture Files:
datacapture-xgboost-movie-recommendations/XGBoost-SM-MovieRec-DataCaptureEndpoint-2019-11-27-15-12-12/AllTrafficVariant/2019/11/27/16/04-18-145-823c7585-ada9-46c6-a30b-105f66fe8acb.jsonl
 datacapture-xgboost-movie-recommendations/XGBoost-SM-MovieRec-DataCaptureEndpoint-2019-11-27-15-12-12/AllTrafficVariant/2019/11/27/16/05-18-853-bd262a0f-2046-4541-a1a4-514879058c6f.jsonl
 datacapture-xgboost-movie-recommendations/XGBoost-SM-MovieRec-DataCaptureEndpoint-2019-11-27-15-12-12/AllTrafficVariant/2019/11/27/16/06-19-510-41dc05ca-6759-438c-997a-fac3cea1f7ff.jsonl
 datacapture-xgboost-movie-recommendations/XGBoost-SM-MovieRec-DataCaptureEndpoint-2019-11-27-15-12-12/AllTrafficVariant/2019/11/27/16/07-21-178-68d8b68a-4bdd-4805-9963-bca94681cfa8.jsonl
 datacapture-xgboost-movie-recommendations/XGBoost-SM-MovieRec-DataCaptureEndpoint-2019-11-27-15-12-12/AllTrafficVariant/2019/11/27/16/08-21-873-a9bfea1a-a267-

Next, let's view the contents of a single capture file. Here you should see all the data captured in a json-line formatted file.

In [15]:
def get_obj_body(obj_key):
    return s3_client.get_object(Bucket=bucket, Key=obj_key).get('Body').read().decode("utf-8")

capture_file = get_obj_body(capture_files[-1])

print(capture_file)

{"captureData":{"endpointInput":{"observedContentType":"text/csv","mode":"INPUT","data":"87,47,879876637,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,47.0,89503,0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0\n","encoding":"CSV"},"endpointOutput":{"observedContentType":"text/csv; charset=utf-8","mode":"OUTPUT","data":"3.904029369354248","encoding":"CSV"}},"eventMetadata":{"eventId":"b755cb23-b578-4b7d-a04f-b6de5289e14b","inferenceTime":"2019-11-27T17:48:05Z"},"eventVersion":"0"}
{"captureData":{"endpointInput":{"observedContentType":"text/csv","mode":"INPUT","data":"886,1,876031433,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,20.0,61820,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0\n","encoding":"CSV"},"endpointOutput":{"observedContentType":"text/csv; charset=utf-8","mode":"OUTPUT","data":"3.734802007675171","encoding":"CSV"}},"eventMetadata":{"eventId":"c0ca9d2e-d381-41a0-92d6-e34900e9597e","inferenceTime":"2019-11-27T17:48:06Z"},"e

Finally, let's view the contents of a single line. This follows the data capture naming convention that you provided during the Endpoint Config setup.

In [16]:
import json
print(json.dumps(json.loads(capture_file.split('\n')[0]), indent=2))

{
  "captureData": {
    "endpointInput": {
      "observedContentType": "text/csv",
      "mode": "INPUT",
      "data": "87,47,879876637,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,47.0,89503,0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0\n",
      "encoding": "CSV"
    },
    "endpointOutput": {
      "observedContentType": "text/csv; charset=utf-8",
      "mode": "OUTPUT",
      "data": "3.904029369354248",
      "encoding": "CSV"
    }
  },
  "eventMetadata": {
    "eventId": "b755cb23-b578-4b7d-a04f-b6de5289e14b",
    "inferenceTime": "2019-11-27T17:48:05Z"
  },
  "eventVersion": "0"
}


## Section IV – Generate baseline statistics and constraints for continous model monitoring

### Generate suggested constraints with baseline/training dataset

From our training dataset let's ask SageMaker to suggest a set of baseline constraints and generate descriptive statistics to explore the data. But first, let's upload the training dataset. In this case, we have the training dataset which was used for training the xgboost movie recommendation model packaged in this example for convenience.

In [17]:
# copy over the training dataset to S3 
baseline_prefix = prefix + '/baselining'
baseline_data_prefix = baseline_prefix + '/data'
baseline_results_prefix = baseline_prefix + '/results'

baseline_data_uri = 's3://{}/{}'.format(bucket,baseline_data_prefix)
baseline_results_uri = 's3://{}/{}'.format(bucket, baseline_results_prefix)
print(baseline_data_uri)
print(baseline_results_uri)

s3://sagemaker-us-west-2-555360056434/sagemaker/Recommendations-ModelMonitor/baselining/data
s3://sagemaker-us-west-2-555360056434/sagemaker/Recommendations-ModelMonitor/baselining/results


In [18]:
baseline_data_file = open(LOCAL_DATA_DIR+"/"+MOVIE_RECOMMENDATION_BASELINE_DATA, 'rb')
s3_key = os.path.join(baseline_prefix, 'data', 'baseline.csv')
boto3.Session().resource('s3').Bucket(bucket).Object(s3_key).upload_fileobj(baseline_data_file)

### Create a baselining job with baseline dataset
(This step will take approximately 6 min)

(CALL THIS OUT IN THE DECK)

In [19]:
%%time
import time

from processingjob_wrapper import ProcessingJob
from time import gmtime, strftime

job_name = 'MOVIE-REC-baseline-xgb-model-monitor-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
processing_job = ProcessingJob(sm_client, role).create(job_name, baseline_data_uri, baseline_results_uri)

resp = sm_client.describe_processing_job(ProcessingJobName=job_name)
status = resp['ProcessingJobStatus']
print("Status: " + status)

while status=='InProgress':
    time.sleep(60)
    resp = sm_client.describe_processing_job(ProcessingJobName=job_name)
    status = resp['ProcessingJobStatus']
    print("Status: " + status)

if status=='Failed':
    print(resp)  

Creating ProcessingJob MOVIE-REC-baseline-xgb-model-monitor-2019-11-27-18-14-08...
Status: InProgress
Status: InProgress
Status: InProgress
Status: InProgress
Status: Completed
CPU times: user 72.7 ms, sys: 4.78 ms, total: 77.4 ms
Wall time: 4min


### Explore the generated constraints and statistics

In [20]:
s3_client = boto3.Session().client('s3')
result = s3_client.list_objects(Bucket=bucket, Prefix=baseline_results_prefix)
report_files = [report_file.get("Key") for report_file in result.get('Contents')]
print("Found Files:")
print("\n ".join(report_files))

def get_obj_body(obj_key):
    return s3_client.get_object(Bucket=bucket, Key=obj_key).get('Body').read().decode("utf-8")


Found Files:
sagemaker/Recommendations-ModelMonitor/baselining/results/constraints.json
 sagemaker/Recommendations-ModelMonitor/baselining/results/statistics.json


In [21]:
constraints_file = get_obj_body(baseline_results_prefix+'/constraints.json')
#print(constraints_file[:4000])
print(constraints_file)
##Point out the various inferred_type.  Fractional, Integral, String (ZipCode),
##TODO : Why are the genres treated as fractional?  
##TODO : Would making rating fractional makes more sense?
##TODO : What are all available inferred_types available and can we show them all?
##TODO : set age level constraint ?  Gender level constraint
##TODO : ADD explanation of the constraints
###TODO : Better rendering from the pysdk sample notebook.  Can we render as histograms as well??

##Lets add details of these monitoring config settings.  Arun to forward the document.
##Featurre level override is also available.

##Base line 20 cols, but inference has 21 cols -- extra column check (what happens if we send 1 less column)
## How about if one col is null ?? Completeness constraint.


## Good to include in deck
## Other Drift examples : loan applicaiton model.  loan approved/not; how much.  csutomers start using mobile and apply high amount.
## could have caught in the drift.

## DS retrained model based on a pivon in a single dimension.  now data formats coming in new inference.
## new applicaiton inference traffic formats.  



{
  "version" : 0.0,
  "features" : [ {
    "name" : "Rating",
    "inferred_type" : "Integral",
    "completeness" : 1.0,
    "num_constraints" : {
      "is_non_negative" : true
    }
  }, {
    "name" : "User",
    "inferred_type" : "Integral",
    "completeness" : 1.0,
    "num_constraints" : {
      "is_non_negative" : true
    }
  }, {
    "name" : "Item",
    "inferred_type" : "Integral",
    "completeness" : 1.0,
    "num_constraints" : {
      "is_non_negative" : true
    }
  }, {
    "name" : "TimeStamp",
    "inferred_type" : "Integral",
    "completeness" : 1.0,
    "num_constraints" : {
      "is_non_negative" : true
    }
  }, {
    "name" : "unknown",
    "inferred_type" : "Fractional",
    "completeness" : 1.0,
    "num_constraints" : {
      "is_non_negative" : true
    }
  }, {
    "name" : "Action",
    "inferred_type" : "Fractional",
    "completeness" : 1.0,
    "num_constraints" : {
      "is_non_negative" : true
    }
  }, {
    "name" : "Adventure",
    "inferre

In [22]:
##TODO : ADD explanation of the statistics
statistics_file = get_obj_body(baseline_results_prefix+'/statistics.json')
#print(statistics_file[:4000])
print(statistics_file)

##Talk about how the constraints can be modified.  Generated version is a suggestion.  you can accept as is or choose to modify according to your business needs.

{
  "version" : 0.0,
  "dataset" : {
    "item_count" : 63398
  },
  "features" : [ {
    "name" : "Rating",
    "inferred_type" : "Integral",
    "numerical_statistics" : {
      "common" : {
        "num_present" : 63398,
        "num_missing" : 0
      },
      "mean" : 3.5196536168333385,
      "sum" : 223139.0,
      "std_dev" : 1.128394340517946,
      "min" : 1.0,
      "max" : 5.0,
      "distribution" : {
        "kll" : {
          "buckets" : [ {
            "lower_bound" : 1.0,
            "upper_bound" : 1.4,
            "count" : 3955.0
          }, {
            "lower_bound" : 1.4,
            "upper_bound" : 1.8,
            "count" : 0.0
          }, {
            "lower_bound" : 1.8,
            "upper_bound" : 2.2,
            "count" : 7260.0
          }, {
            "lower_bound" : 2.2,
            "upper_bound" : 2.6,
            "count" : 0.0
          }, {
            "lower_bound" : 2.6,
            "upper_bound" : 3.0,
            "count" : 0.0
          },

## Section V – Monitor and analyze model for data drift

We have collected the data above, let's proceed to monitor and analyze data for quality issues with Monitoring Schedules.

### Create a schedule
Let's create a Monitoring schedule for the previously created Endpoint

In [23]:
# first copy over some test scripts to the S3 bucket so that they can be used for pre and post processing
## HOW SHOULD WE USE preprocessor /postprocessor FOR THE WORKSHOP??
## IS This for baseline data? or inference data?

## OPTIONAL /ADVANCED SECTION 
#multiple schedules
#BYOC
#infereance data capture --> should match the baseline
#independent of the data capture
#Prebuilt rules assume certain data structure/format.

## Pre processing -- 
## Post processing -- write additonal file as are report?
boto3.Session().resource('s3').Bucket(bucket).Object(code_prefix+"/preprocessor.py").upload_file('preprocessor.py')
boto3.Session().resource('s3').Bucket(bucket).Object(code_prefix+"/postprocessor.py").upload_file('postprocessor.py')

In [24]:
from schedule_wrapper import MonitoringSchedule
from time import gmtime, strftime

# MonitoringSchedule is just a python helper to hide the large CreateMonitoringSchedule input payload. You can find it
# in scheduler_wrapper.py in this package

ms = MonitoringSchedule(sm_client, role)
mon_schedule_name = 'REC-SM-xgb-movie-rec-model-monitor-schedule-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
schedule = ms.create(mon_schedule_name, endpoint_name, s3_report_path, 
                     record_preprocessor_source_uri=s3_code_preprocessor_uri, 
                     post_analytics_source_uri=s3_code_postprocessor_uri,
                     baseline_statistics_uri=baseline_results_uri + '/statistics.json',
                     baseline_constraints_uri=baseline_results_uri+ '/constraints.json')
schedule

Creating Monitoring Schedule REC-SM-xgb-movie-rec-model-monitor-schedule-2019-11-27-18-18-40...


{'MonitoringScheduleArn': 'arn:aws:sagemaker:us-west-2:555360056434:monitoring-schedule/rec-sm-xgb-movie-rec-model-monitor-schedule-2019-11-27-18-18-40',
 'ResponseMetadata': {'RequestId': 'cdf35616-4a2b-4180-a157-8dc389d03fcd',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'cdf35616-4a2b-4180-a157-8dc389d03fcd',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '152',
   'date': 'Wed, 27 Nov 2019 18:18:40 GMT'},
  'RetryAttempts': 0}}

In [25]:
desc_schedule_result = sm_client.describe_monitoring_schedule( MonitoringScheduleName=mon_schedule_name)
print('Schedule status: {}'.format(desc_schedule_result['MonitoringScheduleStatus']))

Schedule status: Scheduled


### Generate inference traffic 

Start generating some inference traffic. The block below kicks off a thread to send traffic to the created endpoint. Note that you need to stop the kernel to terminate this thread. Just having this here so that it can continue to generate traffic. If there is no traffic, the monitoring jobs will start to fail later on.

In [26]:
from threading import Thread
from time import sleep
import time

runtime_client = boto3.client('runtime.sagemaker')

# (just repeating code from above for convenience/ able to run this section independently)
## CAN WE USE DIFFERENT TEST CSVs to FIND DIFFERENT DRIFT
def invoke_endpoint(ep_name, file_name, runtime_client):
    with open(file_name, 'r') as f:
        contents = f.readlines()
    
    for i in range(0, 30):
        line = contents[i]
        split_data = line.split(',')
        #Remove the original rating value from data used for prediction
        original_value = split_data.pop(0)

        payload = ','.join(split_data)

        response = runtime_client.invoke_endpoint(EndpointName=endpoint_name,
                                                  ContentType='text/csv', 
                                                  Body=payload)
        prediction = response['Body'].read().decode('utf-8')

        #print("Original Value ", original_value , "Prediction : ", float(prediction))
    
        time.sleep(1)
            

def invoke_endpoint_forever():
    while True:
        invoke_endpoint(endpoint_name, LOCAL_DATA_DIR+"/"+MOVIE_RECOMMENDATION_TEST_DATA, runtime_client)
        
thread = Thread(target = invoke_endpoint_forever)
thread.start()

# Note that you need to stop the kernel to stop the invocations
##TODO : CAN WE INVOKE THIS TRAFFIC IN A SCRIPT, that can be kicked off in with the cloudformation script??

TODO : CAN THIS BE DELETED. Describe and inspect the schedule
Once you describe, see MonitoringScheduleStatus changes to Scheduled

List executions
Once the schedule is scheduled, it will kick of jobs at specified intervals. Here we are listing the latest 5 executions. Note that if you are kicking this off after creating the hourly schedule, you might find the executions empty. You might have to wait till you cross the hour boundary (in UTC) to see executions kick off. The code below has the logic for waiting.

Note that you may see the first few executions of the monitoring schedule fail due to data unavailability in the S3 bucket.  This is because the execution is looking for data captured in the previous hour.  

Once the execution starts looking for the data in the current hour, you will start seeing execution completions.

##TODO : For now copying files to the right folder.  But should fix this.

Examples of drift.  Why is it important to monitor this?  What is the imapact.  -- Deck.

In [59]:
mon_executions = sm_client.list_monitoring_executions(MonitoringScheduleName=mon_schedule_name.lower(), MaxResults=5)
latest_execution=None

# Wait till an execution occurs
while not mon_executions['MonitoringExecutionSummaries']:
    print("Waiting for the 1st execution to happen...")
    time.sleep(60)
    mon_executions = sm_client.list_monitoring_executions(MonitoringScheduleName=mon_schedule_name.lower(), MaxResults=5)


In [60]:
##TODO : THIS CELL GAVE ME ERRORS FIRST COUPLE OF TIMES. FIX 

#mon_executions = sm_client.list_monitoring_executions(MonitoringScheduleName=mon_schedule_name.lower(), MaxResults=5)
#latest_execution=None

# Wait till an execution occurs
#while not mon_executions['MonitoringExecutionSummaries']:
 #   print("Waiting for the 1st execution to happen...")
  #  time.sleep(60)
   # mon_executions = sm_client.list_monitoring_executions(MonitoringScheduleName=mon_schedule_name.lower(), MaxResults=5)

# if it is one exection, let's wait for it to reach terminal state
if len(mon_executions['MonitoringExecutionSummaries']) == 1: 
    execution = mon_executions['MonitoringExecutionSummaries'][0]
    while True:
        if execution['ProcessingJobArn']:
            job_name = execution['ProcessingJobArn'].split('/')[1]    
            resp = sm_client.describe_processing_job(ProcessingJobName=job_name)
            status = resp['ProcessingJobStatus']
            print("Status: " + status)
            if status != 'InProgress':
                break
        time.sleep(60)
        
print("1")        
    
# now get the latest execution details    
for execution_summary in mon_executions['MonitoringExecutionSummaries']:
    print("ProcessingJob: {}".format(execution_summary['ProcessingJobArn'].split('/')[1]))
    print('MonitoringExecutionStatus: {} \n'.format(execution_summary['MonitoringExecutionStatus']))
    if not latest_execution:
        exec_status = execution_summary['MonitoringExecutionStatus']
        if  exec_status == 'Completed' or exec_status == 'Failed' or exec_status == 'CompletedWithViolations':
            latest_execution = execution_summary
            
print("latest_executions is " , latest_execution)          

1
ProcessingJob: model-monitoring-201911271830-4279db19
MonitoringExecutionStatus: CompletedWithViolations 

ProcessingJob: model-monitoring-201911271820-1e668a5b
MonitoringExecutionStatus: Failed 

latest_executions is  {'MonitoringScheduleName': 'REC-SM-xgb-movie-rec-model-monitor-schedule-2019-11-27-18-18-40', 'ScheduledTime': datetime.datetime(2019, 11, 27, 18, 30, tzinfo=tzlocal()), 'CreationTime': datetime.datetime(2019, 11, 27, 18, 30, 39, 683000, tzinfo=tzlocal()), 'LastModifiedTime': datetime.datetime(2019, 11, 27, 18, 34, 44, 88000, tzinfo=tzlocal()), 'MonitoringExecutionStatus': 'CompletedWithViolations', 'ProcessingJobArn': 'arn:aws:sagemaker:us-west-2:555360056434:processing-job/model-monitoring-201911271830-4279db19', 'EndpointName': 'XGBoost-SM-MovieRec-DataCaptureEndpoint-2019-11-27-18-03-13'}


In [52]:
sm_client.list_monitoring_executions(MonitoringScheduleName=mon_schedule_name.lower(), MaxResults=5)


{'MonitoringExecutionSummaries': [{'MonitoringScheduleName': 'REC-SM-xgb-movie-rec-model-monitor-schedule-2019-11-27-18-18-40',
   'ScheduledTime': datetime.datetime(2019, 11, 27, 18, 30, tzinfo=tzlocal()),
   'CreationTime': datetime.datetime(2019, 11, 27, 18, 30, 39, 683000, tzinfo=tzlocal()),
   'LastModifiedTime': datetime.datetime(2019, 11, 27, 18, 34, 44, 88000, tzinfo=tzlocal()),
   'MonitoringExecutionStatus': 'CompletedWithViolations',
   'ProcessingJobArn': 'arn:aws:sagemaker:us-west-2:555360056434:processing-job/model-monitoring-201911271830-4279db19',
   'EndpointName': 'XGBoost-SM-MovieRec-DataCaptureEndpoint-2019-11-27-18-03-13'},
  {'MonitoringScheduleName': 'REC-SM-xgb-movie-rec-model-monitor-schedule-2019-11-27-18-18-40',
   'ScheduledTime': datetime.datetime(2019, 11, 27, 18, 20, tzinfo=tzlocal()),
   'CreationTime': datetime.datetime(2019, 11, 27, 18, 20, 39, 181000, tzinfo=tzlocal()),
   'LastModifiedTime': datetime.datetime(2019, 11, 27, 18, 23, 39, 39000, tzinfo=t

Inspect a specific execution (latest execution here)¶
In the previous cell, we picked up the latest Completed/Failed scheduled execution. Let's explore what went good or wrong. Here are the possible terminal states and what each of them mean:

Completed - Monitoring execution is completed and no issues were found in the violations report

CompletedWithIssues - Monitoring execution is completed, but constraint violations were detected

Failed - Monitoring execution failed, may be due to client error (say role issues) or infrastructure issues. 
Further examination of FailureReason and ExitMessage is necessary to identify what exactly happened.

In [61]:
if latest_execution:
    job_name=latest_execution['ProcessingJobArn'].split('/')[1]
    job_status=latest_execution['MonitoringExecutionStatus']
    desc_analytics_job_result=sm_client.describe_processing_job(ProcessingJobName=job_name)
    
    if job_status == 'Completed' or job_status == 'CompletedWithViolations':
        report_uri=desc_analytics_job_result['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']
        print('Report Uri: {}'.format(report_uri))
    else:
        print('Job failed, todo: print failure reason and more details..')
else:
    print("====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures.")

Report Uri: s3://sagemaker-us-west-2-555360056434/sagemaker/Recommendations-ModelMonitor/reports/XGBoost-SM-MovieRec-DataCaptureEndpoint-2019-11-27-18-03-13/REC-SM-xgb-movie-rec-model-monitor-schedule-2019-11-27-18-18-40/2019/11/27/18


#### List the generated reports

In [62]:
from urllib.parse import urlparse
s3uri = urlparse(report_uri)
report_bucket = s3uri.netloc
report_key = s3uri.path.lstrip('/')
print('Report bucket: {}'.format(report_bucket))
print('Report key: {}'.format(report_key))

s3_client = boto3.Session().client('s3')
result = s3_client.list_objects(Bucket=report_bucket, Prefix=report_key)
report_files = [report_file.get("Key") for report_file in result.get('Contents')]
print("Found Report Files:")
print("\n ".join(report_files))

Report bucket: sagemaker-us-west-2-555360056434
Report key: sagemaker/Recommendations-ModelMonitor/reports/XGBoost-SM-MovieRec-DataCaptureEndpoint-2019-11-27-18-03-13/REC-SM-xgb-movie-rec-model-monitor-schedule-2019-11-27-18-18-40/2019/11/27/18
Found Report Files:
sagemaker/Recommendations-ModelMonitor/reports/XGBoost-SM-MovieRec-DataCaptureEndpoint-2019-11-27-18-03-13/REC-SM-xgb-movie-rec-model-monitor-schedule-2019-11-27-18-18-40/2019/11/27/18/constraint_violations.json
 sagemaker/Recommendations-ModelMonitor/reports/XGBoost-SM-MovieRec-DataCaptureEndpoint-2019-11-27-18-03-13/REC-SM-xgb-movie-rec-model-monitor-schedule-2019-11-27-18-18-40/2019/11/27/18/constraints.json
 sagemaker/Recommendations-ModelMonitor/reports/XGBoost-SM-MovieRec-DataCaptureEndpoint-2019-11-27-18-03-13/REC-SM-xgb-movie-rec-model-monitor-schedule-2019-11-27-18-18-40/2019/11/27/18/statistics.json


#### Violations report

If there are any violations compared to the baseline, it will be generated here. Let's list the violations.

TODO : Add explanation to one or two features : 'Age' and 'Gender'?

In [63]:
def get_obj_body(obj_key):
    return s3_client.get_object(Bucket=bucket, Key=obj_key).get('Body').read().decode("utf-8")

violations_file = get_obj_body(report_key+'/constraint_violations.json')
print(violations_file)

{
  "violations" : [ {
    "feature_name" : "Zip Code",
    "constraint_check_type" : "data_type_check",
    "description" : "Data type match requirement is not met. Expected data type: String, Expected match: 100.0%. Observed: Only 0.0% of data is String."
  }, {
    "feature_name" : "Rating",
    "constraint_check_type" : "data_type_check",
    "description" : "Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 0.0% of data is Integral."
  }, {
    "feature_name" : "Occupation_doctor",
    "constraint_check_type" : "baseline_drift_check",
    "description" : "Numerical distance: 0.5 exceeds numerical threshold: 0.1"
  }, {
    "feature_name" : "User",
    "constraint_check_type" : "baseline_drift_check",
    "description" : "Numerical distance: 0.11611157165845809 exceeds numerical threshold: 0.1"
  }, {
    "feature_name" : "Occupation_entertainment",
    "constraint_check_type" : "baseline_drift_check",
    "description" : "

Other commands
List, stop, start on the schedule object are available

In [None]:
sm_client.list_monitoring_schedules()

## Section VI - Retrigger Training

#### Create Cloud Watch Alarms

Now that we know how to capture the violations, we can go a step further and automatically trigger retraining of the model.  For this, we use cloud watch metrics and alarms

TODO : EXPLAIN.  What is the training data used for retraining?  

List all the SNS topics available.
Make note of the topic starting 'SAGEMAKER-DEPLOYMENT-OPTIONS' (Note : This is match the cloudformation stack name used in prep.)

In [64]:
client = boto3.client('sns')

client.list_topics()

{'Topics': [{'TopicArn': 'arn:aws:sns:us-west-2:555360056434:Default_CloudWatch_Alarms_Topic'},
  {'TopicArn': 'arn:aws:sns:us-west-2:555360056434:MC-TEST-RetrainSNSTopic-1GESYHRUIHRT4'},
  {'TopicArn': 'arn:aws:sns:us-west-2:555360056434:dynamodb'}],
 'ResponseMetadata': {'RequestId': '76151e1f-f368-5f65-914e-484736285234',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '76151e1f-f368-5f65-914e-484736285234',
   'content-type': 'text/xml',
   'content-length': '633',
   'date': 'Wed, 27 Nov 2019 18:40:00 GMT'},
  'RetryAttempts': 0}}

In [65]:
##sns_notifications_topic='arn:aws:sns:us-west-2:555360056434:MC-TEST-RetrainSNSTopic-1GESYHRUIHRT4'

In [66]:
##Threshold for cloudwatch is independent of violations threshold.
## Show the sequence of 0.1,0.2, but only trigger at 0.5

##Email notification -- Talk about this.  Add to additions or optional section.

cw_client = boto3.Session().client('cloudwatch')

alarm_name='BASELINE_DRIFT_FEATURE_ALARM_AGAIN'
alarm_desc='Trigger an cloudwatch alarm when the feature age drifts away from the baseline'
feature_age_drift_threshold=0.1
metric_name='feature_baseline_drift_Age'
namespace='aws/sagemaker/Endpoints/data-metrics'

#endpoint_name="XGBoost-SM-MovieRec-DataCaptureEndpoint-2019-11-26-23-30-33"
endpoint_name=endpoint_name
#monitoring_schedule_name="REC-SM-xgb-movie-rec-model-monitor-schedule-2019-11-26-23-51-07"
monitoring_schedule_name=mon_schedule_name

cw_client.put_metric_alarm(
    AlarmName=alarm_name,
    AlarmDescription=alarm_desc,
    ActionsEnabled=True,
    AlarmActions=[sns_notifications_topic],
    MetricName=metric_name,
    Namespace=namespace,
    Statistic='Average',
    Dimensions=[
        {
            'Name': 'Endpoint',
            'Value': endpoint_name
        },
        {
            'Name': 'MonitoringSchedule',
            'Value': monitoring_schedule_name
        }
    ],
    Period=600,
    EvaluationPeriods=1,
    DatapointsToAlarm=1,
    Threshold=feature_age_drift_threshold,
    ComparisonOperator='GreaterThanOrEqualToThreshold',
    TreatMissingData='breaching'
)


{'ResponseMetadata': {'RequestId': 'edaa8891-0474-4193-b2f1-8b3dc059dc88',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'edaa8891-0474-4193-b2f1-8b3dc059dc88',
   'content-type': 'text/xml',
   'content-length': '214',
   'date': 'Wed, 27 Nov 2019 18:40:44 GMT'},
  'RetryAttempts': 0}}

### (Optional) Delete the Resources

You can keep your Endpoint running to continue capturing data. If you do not plan to collect more data or use this endpoint further, you should delete the endpoint to avoid incurring additional charges. Note that deleting your endpoint does not delete the data that was captured during the model invocaations. That data is persisted in S3 until you delete it yourself.

In [None]:
#mon_schedule_name='REC-SM-xgb-movie-rec-model-monitor-schedule-2019-11-27-04-21-13'
sm_client.delete_monitoring_schedule(MonitoringScheduleName=mon_schedule_name)

In [None]:
sm_client.delete_endpoint(EndpointName=endpoint_name)

In [None]:
sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)

In [None]:
sm_client.delete_model(ModelName=model_name)