In [5]:
# Handful of configuration

import os
import boto3
import json
from sagemaker import get_execution_role, session

region= boto3.Session().region_name

sm_client = boto3.client('sagemaker')

role = get_execution_role()
print("RoleArn: {}".format(role))

# You can use a different bucket, but make sure the role you chose for this notebook
# has the s3:PutObject permissions. This is the bucket into which the data is captured
bucket =  session.Session(boto3.Session()).default_bucket()
print("Demo Bucket: {}".format(bucket))
prefix = 'sagemaker/DEMO-ModelMonitor'

s3_capture_upload_path = f's3://{bucket}/{prefix}/datacapture'
s3_report_path = f's3://{bucket}/{prefix}/reports'

print(f"Capture path: {s3_capture_upload_path}")
print(f"Report path: {s3_report_path}")

RoleArn: arn:aws:iam::806174985048:role/service-role/AmazonSageMaker-ExecutionRole-20201218T151409
Demo Bucket: sagemaker-ap-northeast-2-806174985048
Capture path: s3://sagemaker-ap-northeast-2-806174985048/sagemaker/DEMO-ModelMonitor/datacapture
Report path: s3://sagemaker-ap-northeast-2-806174985048/sagemaker/DEMO-ModelMonitor/reports


In [6]:
model_file = open("model/model.tar.gz", 'rb')
train_file = open("data/train.csv", 'rb')
test_file = open("data/test.csv", 'rb')

s3_model_key = os.path.join(prefix, 'model.tar.gz')
s3_train_key = os.path.join(prefix, 'train.csv')
s3_test_key = os.path.join(prefix, 'test.csv')

boto3.Session().resource('s3').Bucket(bucket).Object(s3_model_key).upload_fileobj(model_file)
boto3.Session().resource('s3').Bucket(bucket).Object(s3_train_key).upload_fileobj(train_file)
boto3.Session().resource('s3').Bucket(bucket).Object(s3_test_key).upload_fileobj(test_file)

print("Success! You are all set to proceed.")

Success! You are all set to proceed.


In [5]:
!pygmentize Dockerfile

[34mFROM[39;49;00m [33mpython:3.8-slim-buster[39;49;00m

[34mRUN[39;49;00m pip3 install [31mpandas[39;49;00m==[34m1[39;49;00m.1.4 [31mnumpy[39;49;00m==[34m1[39;49;00m.19.4 scikit-learn==[34m0[39;49;00m.23.2 [31mscipy[39;49;00m==[34m1[39;49;00m.5.4 [31mboto3[39;49;00m==[34m1[39;49;00m.17.12

[34mWORKDIR[39;49;00m[33m /home[39;49;00m

[34mCOPY[39;49;00m src/* /home/

[34mENTRYPOINT[39;49;00m [[33m"python3"[39;49;00m, [33m"drift_detector.py"[39;49;00m]


In [1]:
from docker_utils import build_and_push_docker_image

repository_short_name = 'custom-model-monitor'

image_name = build_and_push_docker_image(repository_short_name)

Building docker image custom-model-monitor from Dockerfile
$ docker build -t custom-model-monitor -f Dockerfile .
Sending build context to Docker daemon  38.43MB
Step 1/5 : FROM python:3.8-slim-buster
3.8-slim-buster: Pulling from library/python
f6e04ba65310: Pulling fs layer
b87859229fd4: Pulling fs layer
598a5d7238e9: Pulling fs layer
be1d88d97f44: Pulling fs layer
22a630315c5a: Pulling fs layer
22a630315c5a: Waiting
be1d88d97f44: Waiting
b87859229fd4: Verifying Checksum
b87859229fd4: Download complete
598a5d7238e9: Verifying Checksum
598a5d7238e9: Download complete
f6e04ba65310: Verifying Checksum
f6e04ba65310: Download complete
be1d88d97f44: Verifying Checksum
be1d88d97f44: Download complete
22a630315c5a: Verifying Checksum
22a630315c5a: Download complete
f6e04ba65310: Pull complete
b87859229fd4: Pull complete
598a5d7238e9: Pull complete
be1d88d97f44: Pull complete
22a630315c5a: Pull complete
Digest: sha256:03c12f7bbd977120133b73e4b3ef5c5707ca09be338156dc02306d41633db4c0
Status: Do

In [3]:
!pygmentize src/inference.py

[37m#  Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.[39;49;00m
[37m#[39;49;00m
[37m#  Licensed under the Apache License, Version 2.0 (the "License").[39;49;00m
[37m#  You may not use this file except in compliance with the License.[39;49;00m
[37m#  A copy of the License is located at[39;49;00m
[37m#[39;49;00m
[37m#      http://www.apache.org/licenses/LICENSE-2.0[39;49;00m
[37m#[39;49;00m
[37m#  or in the "license" file accompanying this file. This file is distributed[39;49;00m
[37m#  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either[39;49;00m
[37m#  express or implied. See the License for the specific language governing[39;49;00m
[37m#  permissions and limitations under the License.[39;49;00m

[34mimport[39;49;00m [04m[36mos[39;49;00m
[34mimport[39;49;00m [04m[36mpickle[39;49;00m
[34mimport[39;49;00m [04m[36mpathlib[39;49;00m

[34mfrom[39;49;00m [04m[36mio[39;49;00m [34mimport[39;49;00m Stri

In [None]:
from sagemaker.xgboost.model import XGBoostModel
from sagemaker.serializers import CSVSerializer
from sagemaker.model_monitor import DataCaptureConfig

model_url = f's3://{bucket}/{s3_model_key}'

xgb_inference_model = XGBoostModel(
    model_data=model_url,
    role=role,
    entry_point='inference.py',
    source_dir='src',
    framework_version='1.2-1',
)

data_capture_config = DataCaptureConfig(
                        enable_capture=True,
                        sampling_percentage=100,
                        destination_s3_uri=s3_capture_upload_path)

predictor = xgb_inference_model.deploy(
    initial_instance_count=1,
    instance_type="ml.c5.xlarge",
    serializer=CSVSerializer(),
    data_capture_config=data_capture_config)

--

In [None]:
s3_train_path = f's3://{bucket}/{s3_train_key}'
s3_test_path = f's3://{bucket}/{s3_test_key}'
s3_result_path = f's3://{bucket}/{prefix}/result/{predictor.endpoint_name}'

sm_client.create_monitoring_schedule(
    MonitoringScheduleName=predictor.endpoint_name,
    MonitoringScheduleConfig={
        'ScheduleConfig': {
            'ScheduleExpression': 'cron(0 * ? * * *)'
        },
        'MonitoringJobDefinition': {
            'MonitoringInputs': [
                {
                    'EndpointInput': {
                        'EndpointName': predictor.endpoint_name,
                        'LocalPath': '/opt/ml/processing/endpointdata'
                    }
                },
            ],
            'MonitoringOutputConfig': {
                'MonitoringOutputs': [
                    {
                        'S3Output': {
                            'S3Uri': s3_result_path,
                            'LocalPath': '/opt/ml/processing/resultdata',
                            'S3UploadMode': 'EndOfJob'
                        }
                    },
                ]
            },
            'MonitoringResources': {
                'ClusterConfig': {
                    'InstanceCount': 1,
                    'InstanceType': 'ml.c5.xlarge',
                    'VolumeSizeInGB': 10
                }
            },
            'MonitoringAppSpecification': {
                'ImageUri': image_name,
                'ContainerArguments': [
                    '--train_s3_uri',
                    s3_train_path,
                    '--test_s3_uri',
                    s3_test_path,
                    '--target_label',
                    'income'
                ]
            },
            'StoppingCondition': {
                'MaxRuntimeInSeconds': 600
            },
            'Environment': {
                'string': 'string'
            },
            'RoleArn': role
        }
    }
)

In [None]:
from threading import Thread
from time import time, sleep

def invoke_endpoint(ep_name, file_name, runtime_client):
    pre_time = time()
    with open(file_name) as f:
        count = len(f.read().split('\n')) - 2 # Remove EOF and header
    
    # Calculate time needed to sleep between inference calls if we need to have a constant rate of calls for 10 hours
    ten_hours_in_sec = 10*60*60
    sleep_time = ten_hours_in_sec/count
    
    with open(file_name, 'r') as f:
        next(f) # Skip header
        
        for ind, row in enumerate(f):   
            start_time = time()
            payload = row.rstrip('\n')
            response = runtime_client(data=payload)
            
            # Print every 15 minutes (900 seconds)
            if (ind+1) % int(count/ten_hours_in_sec*900) == 0:
                print(f'Finished sending {ind+1} records.')
            
            # Sleep to ensure constant rate. Time spent for inference is subtracted
            sleep(max(sleep_time - (time() - start_time), 0))
                
    print("Done!")
    
print(f"Sending test traffic to the endpoint {predictor.endpoint_name}. \nPlease wait...")

thread = Thread(target = invoke_endpoint, args=(predictor.endpoint, 'data/infer.csv', predictor.predict))
thread.start()

In [None]:
%load_ext autoreload

%autoreload 1

import sys
from threading import Timer

sys.path.append('src')

%aimport drift_visualizer
%aimport utils

In [None]:
def plot_accuracy():
    df = utils.construct_df_from_result(s3_result_path)
    if df is not None:    
        drift_visualizer.plot_accuracy(df)
    Timer(3600, plot_accuracy)
    
plot_accuracy()


In [None]:
def plot_drift_score():
    df = utils.construct_df_from_result(s3_result_path) 
    if df is not None:    
        drift_visualizer.plot_drift_score(df)
    Timer(3600, plot_drift_score)
    
plot_drift_score()

In [None]:
def plot_p_values():
    df = utils.construct_df_from_result(s3_result_path)   
    if df is not None:            
        drift_visualizer.plot_p_values(df)
    Timer(3600, plot_p_values)
    
plot_p_values()