### Monitoring data quality in third-party models from the AWS Marketplace

**Overview:**

This notebook demonstrates how to configure an Amazon SageMaker Data Quality monitoring schedule for a pre-trained third-party model from the AWS Marketplace.

**Contents:**
- Pre-requisites
- Step 1. Initial setup
    - 1.1 [Import packages and modules](#section_1_1)
    - 1.2 [Define global variables](#section_1_2)
    - 1.3 [Uploading sample datasets to your S3 bucket](#section_1_3)
- Step 2. Create and deploy the model endpoint with data capture
    - 2.1 [Create the model](#section_2_1)
    - 2.2 [Create the endpoint configuration with DataCapture](#section_2_2)
    - 2.3 [Create the model endpoint](#section_2_3)
    - 2.4 [Periodically check if the model's endpoint has changed from 'Creating' to 'InService'](#section_2_4)
- Step 3. Create a baselining job to suggest a set of baseline constraints
    - 3.1 [Create baselining job](#section_3_1)
    - 3.2 [Periodically check if the baseline processing job has changed from 'InProgress' to 'Completed'](#section_3_2)
- Step 4. Setup a monitoring schedule to monitor the data captured for the model's endpoint
    - 4.1 [Create a monitoring schedule](#section_4_1)
- Step 5. Invoking the inference endpoint with anomalous data
    - 5.1 [Initialize a Predictor to make prediction requests to the model's endpoint](#section_5_1)
    - 5.2 [Create a data quality constraint violations](#section_5_2)


**Pre-requisites**

This sample notebook requires a subscription to the [Propensity-Planning to Buy a House](https://aws.amazon.com/marketplace/pp/prodview-vzofptk4lnxii) model, a pre-trained machine learning model package from AWS Marketplace.

<a id=section_1_1></a>
#### 1.1 Import packages and modules

In [None]:
import boto3
import pandas as pd
import requests
import time
import io

from time import gmtime, strftime

from sagemaker import get_execution_role
from sagemaker import session
from sagemaker import ModelPackage

from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer

from sagemaker.model_monitor import CronExpressionGenerator
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

<a id=section_1_2></a>
#### 1.2 Globals

In [None]:
# Get the execution role for the notebook instance.
role = get_execution_role()

# Stores configuration state and allows you to create service clients and resources
session = session.Session()

# Create a low-level client representing the Amazon S3 service
s3_client = boto3.client('s3')

# Create a low-level client representing the Amazon SageMaker service
sm_client = boto3.client('sagemaker')

# Create a low-level client representing the Amazon SageMaker Runtime
smr_client = boto3.client('sagemaker-runtime')

In [None]:
# S3
BUCKET = session.default_bucket() # Update as needed
PREFIX = 'third-party-model-seller-name' # Update as needed

DATASETS = ['train.csv', 'datadrift.csv']

S3_DATA_CAPTURE_URI = 's3://{}/{}/datacapture'.format(BUCKET, PREFIX)
S3_BASELINE_DATASET_URI = 's3://{}/{}/train/{}'.format(BUCKET, PREFIX, TRAINING_KEYS[0])
S3_BASELINE_ANALYSIS_RESULTS_URI = 's3://{}/{}/baselining'.format(BUCKET, PREFIX)
S3_DATA_QUALITY_RPT_URI = 's3://{}/{}/reports'.format(BUCKET, PREFIX)

# Model
MODEL_PACKAGE_ARN = 'arn:aws:sagemaker:us-east-1:865070037744:model-package/planning-to-buy-house-basic-28fcb3ca751705854a7171b255d8ef43'  # Update as needed
MODEL_NAME = 'third-party-model-2'
MODEL_ENDPOINT = 'third-party-model-endpoint-2'
MODEL_ENDPOINT_CONFIG = 'third-party-model-endpoint-config-2'
MODEL_BASELINE_JOB = 'third-party-model-baseline-job-2'
MODEL_MONITOR_SCHEDULE_NAME = 'third-party-model-data-quality-schedule-2'
MODEL_MONITOR_INSTANCE_TYPE = 'ml.m4.xlarge'
MODEL_INFERENCE_INSTANCE_TYPE = 'ml.m4.xlarge'
MODEL_INSTANCE_COUNT = 1

# Training and testing dataset url path
TRAINING_DATASET_URI = "https://raw.githubusercontent.com/william-screen/model-monitor/main/third-party-models/prosper/data"

<a id=section_1_3></a>
#### 1.3 Uploading sample datasets to your S3 bucket

In [None]:
# Sample training datasets for this demo
for file_name in DATASETS:
   
    dataset = '{}/{}'.format(TRAINING_DATASET_URI, file_name)

    # Sends a GET request to the specified url
    response = requests.get(dataset, stream=True)

    # S3 folder prefix and key
    key = '{0}/train/{1}'.format(PREFIX, file_name)

    # Upload data to this sessions default S3 bucket
    response = s3_client.put_object(Body = response.content,
                         Bucket = BUCKET,
                         Key = key,
                         ContentType = 'text/csv')
    print(response)

<a id=section_2_1></a>
#### 2.1 Create the model

Creates a model in Amazon SageMaker from a model package

In [None]:
def create_model(model_name, model_package_name, execution_role):
    '''
    Creates a model in Amazon SageMaker from a model package

        Parameters:
            model_name (str): The name of the new model
            model_package_name (str): The name or ARN of the model package to use
            execution_role (str): The ARN of the IAM role that Amazon SageMaker can assume

        Returns:
            response (str): The ARN of the model created in Amazon SageMaker
    '''    
    
    # PrimaryContainer parameter
    model_container_params = { 
        "ModelPackageName": model_package_name
    }
    
    # Creates a model and returns the ModelArn
    response = sm_client.create_model(
        ModelName = model_name,
        PrimaryContainer = model_container_params,
        ExecutionRoleArn = execution_role,
        EnableNetworkIsolation = True)
    
    # Get the ARN
    response = response['ModelArn']    
    
    # Return
    return response

In [None]:
# Call function to create the model
model_arn = create_model(MODEL_NAME, MODEL_PACKAGE_ARN, role)
print('>> The ARN of the model is: {}'.format(model_arn))

<a id=section_2_2></a>
#### 2.2 Create the endpoint configuration with DataCapture

Creates an endpoint configuration that Amazon SageMaker hosting services uses to deploy models.

In [None]:
def create_model_endpoint_config(enpoint_config_name, model_name, s3_data_catpure_uri, int_sampling_pct=100):
    '''
    Creates an endpoint configuration that Amazon SageMaker hosting services uses to deploy models.

        Parameters:
            enpoint_config_name (str): The name of the endpoint configuration
            model_name (str): The name of the new model 
            s3_data_catpure_uri (str): S3 URI for datacapture data
            int_sampling_pct (int): The amount of data to sample when the app has just started

        Returns:
            response (str): The ARN of the endpoint configuration
    '''

    # Describes the resources that you want Amazon SageMaker to provision
    product_variant_params = {
        'VariantName': 'AllTraffic',
        'ModelName': model_name,
        'InitialInstanceCount': MODEL_INSTANCE_COUNT,
        'InstanceType': MODEL_INFERENCE_INSTANCE_TYPE
    }

    # Specifies the configuration of your endpoint for model monitor data capture.
    data_capture_params = {   
        'EnableCapture': True,
        'InitialSamplingPercentage': int_sampling_pct,
        'DestinationS3Uri': s3_data_catpure_uri,
        'CaptureOptions': [
            {
                'CaptureMode': 'Input'
            },
            {
                'CaptureMode': 'Output'
            }        
        ],
        'CaptureContentTypeHeader': {
            'CsvContentTypes': ['text/csv']
        }
    }
  
    # Creates an endpoint configuration and returns the EndpointConfigArn    
    response = sm_client.create_endpoint_config(
        EndpointConfigName = enpoint_config_name,
        ProductionVariants=[
            product_variant_params
        ],
        DataCaptureConfig=data_capture_params
    )
    
    # Get the ARN
    response = response['EndpointConfigArn']
    
    # Return
    return response

In [None]:
# Call function to create the endpoint config
endpoint_config_arn = create_model_endpoint_config(MODEL_ENDPOINT_CONFIG, MODEL_NAME, S3_DATA_CAPTURE_URI)
print('>> The ARN of the endpoint config is: {}'.format(endpoint_config_arn))

<a id=section_2_3></a>
#### 2.3 Create the model endpoint

Creates an endpoint using the endpoint configuration

In [None]:
def create_model_endpoint(enpoint_name, enpoint_config_name):
    '''
    Creates an endpoint using the endpoint configuration

        Parameters:
            enpoint_name (str): The name of the endpoint
            enpoint_config_name (str): The name of the endpoint configuration

        Returns:
            response (str): The ARN of the endpoint
    '''    
    
    # Creates an endpoint and returns the EndpointArn  
    response = sm_client.create_endpoint(
        EndpointName = enpoint_name,
        EndpointConfigName = enpoint_config_name
    )
    
    # Get the ARN
    response = response['EndpointArn']
    
    # Return
    return response    

In [None]:
# Call function to create the endpoint
endpoint_arn = create_model_endpoint(MODEL_ENDPOINT, MODEL_ENDPOINT_CONFIG)
print('>> The ARN of the endpoint is: {}'.format(endpoint_arn))

<a id=section_2_4></a>
#### 2.4 Periodically check if the model's endpoint has changed from 'Creating' to 'InService'

In [None]:
%%time

# Initialize
model_endpoint_status = None

# Get the model endpoint status descriptors
response = sm_client.describe_endpoint(
    EndpointName=MODEL_ENDPOINT
)

# Set the Endpoint Status value
model_endpoint_status = response['EndpointStatus']

# Check for status updates every 45 seconds
while model_endpoint_status == 'Creating':
    
    # Pause execution for 45 seconds
    time.sleep(45)
    
    # Get the model endpoint status descriptors
    response = sm_client.describe_endpoint(
        EndpointName=MODEL_ENDPOINT
    )

    # Set the Endpoint Status value
    model_endpoint_status = response['EndpointStatus']
    
    # Print the current status of model endpoint
    print('>> The current status of model endpoint "{0}" is {1}'.format(MODEL_ENDPOINT, model_endpoint_status))

<a id=section_3_1></a>
#### 3.1 Create a baselining job

In [None]:
def create_baselining_job(job_name, wait_for_job_finish=False, show_logs=False):
    '''
    SageMaker to suggest a set of baseline constraints and generate descriptive statistics 
    (constraint_violations.json and statistics.json) based on the baseline training dataset

        Parameters:
            job_name (str): Name of processing job
            wait_for_job_finish (bool): Whether the call should wait until the job completes
            show_logs (bool): Whether to show the logs produced by the job

        Returns:
            default_model_monitor (ProcessingJob): The ProcessingJob object representing the baselining job.
            job_name (str): job_name with appended timestamp.
    '''  

    # Initializes a Monitor instance
    default_model_monitor = DefaultModelMonitor(
        role=role,
        instance_count=MODEL_INSTANCE_COUNT,
        instance_type=MODEL_MONITOR_INSTANCE_TYPE
    )
    
    # Append timestamp to job name
    job_name = '{}-{}'.format(job_name, strftime("%Y-%m-%d-%H-%M-%S", gmtime()))

    # Suggest baselines for use with Amazon SageMaker Model Monitoring Schedules
    default_model_monitor.suggest_baseline(
        job_name = job_name,
        baseline_dataset=S3_BASELINE_DATASET_URI,
        dataset_format=DatasetFormat.csv(header=True),
        output_s3_uri=S3_BASELINE_ANALYSIS_RESULTS_URI,
        wait=wait_for_job_finish,
        logs=show_logs
    )
    
    # Return    
    return default_model_monitor, job_name

In [None]:
# Call function to create baseline job
default_model_monitor, job_name = create_baselining_job(MODEL_BASELINE_JOB)

<a id=section_3_2></a>
#### 3.2 Periodically check if the baseline processing job has changed from 'InProgress' to 'Completed'

In [None]:
%%time

# Initialize
processing_job_status = None

# Get the Processing Job Status descriptors
response = sm_client.describe_processing_job(
    ProcessingJobName=job_name
)

# Set the Processing Job Status value
processing_job_status = response['ProcessingJobStatus']

# Check for status updates every 45 seconds
while processing_job_status == 'InProgress':
    
    # Pause execution for 45 seconds
    time.sleep(45)
    
    # Get the Processing Job Status descriptors
    response = sm_client.describe_processing_job(
        ProcessingJobName=job_name
    )

    # Set the Processing Job Status value
    processing_job_status = response['ProcessingJobStatus']
    
    # Print the current status
    print('>> The current status of processing_job "{0}" is {1}'.format(job_name, processing_job_status))

<a id=section_4_1></a>
#### 4.1 Create a monitoring schedule

Note: Even for an hourly schedule, Amazon SageMaker has a buffer period of 20 minutes to schedule your execution. You might see your execution start anywhere between the first ~20 minutes after the hour boundary (i.e. 00:00 – 00:20). This is expected and done for load balancing on the backend.

In [None]:
def create_monitoring_schedule(default_model_monitor, monitor_schedule_name):
    '''
        Creates a schedule that regularly starts Amazon SageMaker Processing Jobs
        to monitor the data captured for an Amazon SageMaker Endoint.

        Parameters:
            default_model_monitor (ProcessingJob): The ProcessingJob object representing the baselining job.
            monitor_schedule_name (str): Schedule name
    '''  
    
    default_model_monitor.create_monitoring_schedule(
        monitor_schedule_name = monitor_schedule_name,
        endpoint_input = MODEL_ENDPOINT,
        output_s3_uri = S3_DATA_QUALITY_RPT_URI,
        statistics = default_model_monitor.baseline_statistics(),
        constraints = default_model_monitor.suggested_constraints(),
        schedule_cron_expression = CronExpressionGenerator.hourly()
    )

In [None]:
# Call function to create monitoring schedule
create_monitoring_schedule(default_model_monitor, MODEL_MONITOR_SCHEDULE_NAME)

# Allow time for processing
time.sleep(30)

# Print the current status
monitor_schedule_details = default_model_monitor.describe_schedule()['MonitoringScheduleStatus']
print('>> The current status of monitoring schedule "{0}" is {1}'.format(MODEL_MONITOR_SCHEDULE_NAME, monitor_schedule_details))

<a id=section_5_1></a>
#### 5.1 Initialize a Predictor

Make prediction requests to the model's endpoint.

In [None]:
# Create predictor endpoint
predictor = Predictor(endpoint_name=MODEL_ENDPOINT, 
                      sagemaker_session=None, 
                      serializer=CSVSerializer())   

In [None]:
def predict(sample, delay=0.5):
   
    # Defensive coding
    if(len(sample) > 0):    

        # Invoke the model's inference endpoint
        response = predictor.predict(data=sample)

        # Decode bytes to string
        response = response.decode('utf-8')
        
        # Suspends execution for # milliseconds
        time.sleep(delay)        

        # Return 
        return response

<a id=section_5_2></a>
#### 5.2 Create a data quality constraint violations

In [None]:
# Set datadrift dataset key name
key = '{0}/train/{1}'.format(PREFIX, DATASETS[1])

# Download from S3
datadrift_file_obj = s3_client.get_object(Bucket=BUCKET, Key=key)

# Save to in-memory binary stream since file is relatively small (< 1 Mb)
datadrift_file_buf = io.BytesIO(datadrift_file_obj['Body'].read())

# Convert to Dataframe
df = pd.read_csv(datadrift_file_buf, header=None)

# Convert dataframe samples to list
samples = df.values.tolist()

# Invoke real-time inference endpoint 
for index, sample in enumerate(samples):

    # Removes the open/close bracket from string -- not required
    #sample = str(sample)[0:-1] 
    
    # Get inference response
    response = predict(sample)
    
    # Display the model's prediction probability
    print('Sample {0} >> Input: {1}: >> Prediction: {2}'.format(index, sample, response))

#### All set

Now that the monitoring schedule has been created and we've generated some sample anamolous data to cause data drift detection, please return to the Amazon SageMaker Studio to checkout the Monitoring Job Details.

#### Cleanup Resources

In [None]:
'''
# Step 1.
print('Stopping monitoring schedule...')
!aws sagemaker stop-monitoring-schedule --monitoring-schedule-name 'third-party-model-data-quality-schedule-2'
time.sleep(30) # allow time for processing
!aws sagemaker list-monitoring-schedules --endpoint-name 'third-party-model-endpoint-2'

# Step 2.
print('Deleting monitoring schedule...')
!aws sagemaker delete-monitoring-schedule --monitoring-schedule-name 'third-party-model-data-quality-schedule-2'
time.sleep(30) # allow time for processing
!aws sagemaker list-monitoring-schedules --endpoint-name 'third-party-model-endpoint'

# Step 3.
print('Deleting model endpoint...')
!aws sagemaker delete-endpoint --endpoint-name 'third-party-model-endpoint-2'
time.sleep(30) # allow time for processing
!aws sagemaker list-endpoints --name-contains 'third-party-model-endpoint-2'

# Step 4.
print('Deleting model endpoint config...')
!aws sagemaker delete-endpoint-config --endpoint-config-name 'third-party-model-endpoint-config-2'
time.sleep(30) # allow time for processing
!aws sagemaker list-endpoint-configs --name-contains 'third-party-model-endpoint-config-2'

# Step 5.
print('Deleting model...')
!aws sagemaker delete-model --model-name 'third-party-model-2'
time.sleep(30) # allow time for processing
!aws sagemaker list-models --name-contains 'third-party-model-2'
'''