# Amazon SageMaker Model Quality Monitor


#### Host a trained machine learning model in Amazon SageMaker.  Monitor and detect machine learning model quality drift


This notebook shows how to:
* Host a machine learning model in Amazon SageMaker and capture inference requests, results, and metadata
* Generate a baseline of model quality and suggested constraints
* Monitor a live endpoint for violations against the suggested constraints
* Generate CloudWatch Alarms on model quality drift


**Table of Contents**

1. [Introduction](#intro)
2. [Section 1 - Setup](#setup)
3. [Section 2 - Deploy pre-trained model with data capture enabled](#deploy)
5. [Section 3 - Generate baseline for model quality performance](#generate-baseline)
6. [Section 4 - Setup continuous model monitoring to identify model quality drift](#analyze-model-quality-drift)
7. [Section 5 - Analyze model quality CloudWatch metrics](#analyze-cloudwatch-metrics)
8. [Clean up](#cleanup)



## Introduction <a id='intro'></a>    

Amazon SageMaker provides every developer and data scientist with the ability to build, train, and deploy machine learning (ML) models quickly by bringing together a broad set of capabilities purpose-built for ML. Amazon SageMaker is a fully-managed service that encompasses the entire ML workflow. You can label and prepare your data, choose an algorithm, train a model, and then tune and optimize it for deployment. You can deploy your models to production with Amazon SageMaker to make predictions and lower costs than it was previously possible.

Amazon SageMaker Model Monitor allows you to maintain high quality ML models by automatically detecting and helping you remediate inaccuracies in model predictions. Model Monitor helps
you detect changes in properties of independent variables to help maintain data quality, and
monitors model performance characteristics such as accuracy and precision in real-time to
help maintain model quality.

In this notebook, you learn how to use Amazon SageMaker model quality monitoring capability to monitor model performance characteristics of your in-production ML models. You will learn how to configure Amazon CloudWatch alerts to get notified if the model quality degrades from the configured baseline quality.

## Section 1 - Setup <a id='setup'></a>

In this section, you will import the necessary libraries, setup variables and examine data that was used to train the XGBoost customer churn model provided with this notebook.

Let's start by specifying:

* The AWS region used to host your model.
* The IAM role associated with this SageMaker notebook instance.
* The S3 bucket used to store the data used to train your model, any additional model data, and the data captured from model invocations.

#### 1.1 Import necessary libraries

In [None]:
%%time

from datetime import datetime, timedelta, timezone
import json
import os
import re
import boto3
from time import sleep
from threading import Thread

import pandas as pd

from sagemaker import get_execution_role, session, Session, image_uris
from sagemaker.s3 import S3Downloader, S3Uploader
from sagemaker.processing import ProcessingJob
from sagemaker.serializers import CSVSerializer

from sagemaker.model import Model
from sagemaker.model_monitor import DataCaptureConfig

session = Session()

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
CPU times: user 21.8 ms, sys: 9.18 ms, total: 31 ms
Wall time: 56 ms


#### 1.2 AWS region and  IAM Role

In [None]:
# Get Execution role
role = get_execution_role()
print("RoleArn:", role)

region = session.boto_region_name
print("Region:", region)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
RoleArn: arn:aws:iam::752847213914:role/service-role/AmazonSageMakerServiceCatalogProductsExecutionRole
Region: us-east-1


#### 1.3 S3 bucket and prefixes

In [None]:
# Setup S3 bucket
# You can use a different bucket, but make sure the role you chose for this notebook
# has the s3:PutObject permissions. This is the bucket into which the data is captured
bucket = session.default_bucket() #'sagemaker-studio-009676737623-l4vs7j0o0ib' #session.default_bucket()
print("Demo Bucket:", bucket)
prefix = 'mlops-data' #'mlops-level1-data'

##S3 prefixes
data_capture_prefix = f"{prefix}/datacapture"
s3_capture_upload_path = f"s3://{bucket}/{data_capture_prefix}"

ground_truth_upload_path = (
    f"s3://{bucket}/{prefix}/ground_truth_data/{datetime.now():%Y-%m-%d-%H-%M-%S}"
)

reports_prefix = f"{prefix}/reports"
s3_report_path = f"s3://{bucket}/{reports_prefix}"

##Get the model monitor image
monitor_image_uri = image_uris.retrieve(framework="model-monitor", region=region)

print("Image URI:", monitor_image_uri)
print(f"Capture path: {s3_capture_upload_path}")
print(f"Ground truth path: {ground_truth_upload_path}")
print(f"Report path: {s3_report_path}")

Demo Bucket: sagemaker-us-east-1-752847213914
Image URI: 156813124566.dkr.ecr.us-east-1.amazonaws.com/sagemaker-model-monitor-analyzer
Capture path: s3://sagemaker-us-east-1-752847213914/mlops-data/datacapture
Ground truth path: s3://sagemaker-us-east-1-752847213914/mlops-data/ground_truth_data/2023-11-06-14-16-55
Report path: s3://sagemaker-us-east-1-752847213914/mlops-data/reports


1. ground_truth_upload_path is bucket where ground truth data is stored. This data is used for baseline and validation.

2. s3_report_path is bucket where model monitoring reports will be uploaded.

3. monitor_image_uri = image_uris.retrieve(framework="model-monitor", region=region)

this line is likely used to fetch the Docker image URI required for running Model Monitoring jobs. These jobs enable monitoring of deployed machine learning models for drift, bias, or anomalies in real-time or periodically.

## Section 2 - Deploy pre-trained model with data capture enabled <a id='deploy'></a>

In this section, you will upload the pretrained model to the S3 bucket, create an Amazon SageMaker Model, create an Amazon SageMaker real time endpoint, and enable data capture on the endpoint to capture endpoint invocations, predictions, and metadata.

#### 2.1 Use Model Registry


In [None]:
model_url = f's3://{bucket}/{prefix}/xgb-built-in-algo/output/sagemaker-xgboost-2023-11-06-16-00-35-408/output/model.tar.gz'


#### 2.2 Create SageMaker Model entity

This step creates an Amazon SageMaker model from the  model file uploaded to S3.

In [None]:
#model_name = f"DEMO-xgb-churn-pred-model-monitor-{datetime.utcnow():%Y-%m-%d-%H%M}"

image_uri = image_uris.retrieve(framework="xgboost", version="1.7-1", region=region)

model = Model(image_uri=image_uri, model_data=model_url, role=role, sagemaker_session=session)

INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


#### 2.3 Deploy the model with data capture enabled.
Next, deploy the SageMaker model on a specific instance with data capture enabled.

In [None]:
endpoint_name = f"DEMO-mlops-xgboost-quality-monitor-{datetime.utcnow():%Y-%m-%d-%H%M}"
print("EndpointName =", endpoint_name)

data_capture_config = DataCaptureConfig(
    enable_capture=True, sampling_percentage=100, destination_s3_uri=s3_capture_upload_path
)

model.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.large",
    endpoint_name=endpoint_name,
    data_capture_config=data_capture_config,
)

EndpointName = DEMO-mlops-xgboost-quality-monitor-2023-11-06-1605
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


INFO:sagemaker:Creating model with name: sagemaker-xgboost-2023-11-06-16-05-16-281
INFO:sagemaker:Creating endpoint-config with name DEMO-mlops-xgboost-quality-monitor-2023-11-06-1605
INFO:sagemaker:Creating endpoint with name DEMO-mlops-xgboost-quality-monitor-2023-11-06-1605


----!

#### 2.4 Create the SageMaker Predictor object from the endpoint to be used for invoking the model

In [None]:
endpoint_name = 'Xgboost-Inference-endpoint-2023-11-07-1116'

This is endpoint of deployed model which we are going to monitor.

In [None]:
from sagemaker.predictor import Predictor

predictor = Predictor(
    endpoint_name=endpoint_name, sagemaker_session=session, serializer=CSVSerializer()
)

Above code initializes an instance of the Predictor class from the SageMaker SDK to make predictions (inferences) using a deployed model.

from sagemaker.predictor import Predictor:

    This imports the Predictor class from the SageMaker Python SDK. The Predictor class is used to interact with a deployed endpoint for making predictions.

Predictor(endpoint_name=endpoint_name, sagemaker_session=session, serializer=CSVSerializer()):

    Creates a Predictor object with the following parameters:
        endpoint_name=endpoint_name:
            Specifies the name of the deployed endpoint to which the Predictor will send requests.
            endpoint_name is likely a variable holding the endpoint's name as a string (e.g., "my-endpoint-name").
        sagemaker_session=session:
            Specifies the SageMaker session to be used for this prediction.
            session is typically an instance of sagemaker.Session() which manages interactions with AWS services.
        serializer=CSVSerializer():
            Specifies how the input data will be serialized before being sent to the endpoint.
            CSVSerializer() serializes input data into CSV format (comma-separated values), which is required if your endpoint expects CSV input.

##  Section 3 - Generate a baseline for model quality performance <a id='generate-baseline'></a>

In this section, you will invoke the endpoint created above using validation data. Predictions from the deployed model using this validation data will be used as a baseline dataset.  You will then use SageMaker's Model Monitoring to execute a baseline job that computes model performance data, and suggest model quality constraints based on the baseline dataset.

#### 3.1 Execute predictions using the validation dataset.

The deployed model returns probability that a customer will churn. Let's choose an arbitrary 0.8 cutoff to consider that a customer will churn.

In [None]:
churn_cutoff = 0.8
validate_dataset = "validation_with_predictions.csv"


In [None]:
validation_data = "./data/validation.csv"
df = pd.read_csv(f's3://{bucket}/{prefix}/validation/validation.csv')


In [None]:
df.to_csv(validation_data,index = False)


1. Set churn cut off to 0.8.

2. validation dataset is read from s3 bucket and saved in current directory.

validation.csv contains validation data.

In [None]:
limit = 1000 # Need at least 200 samples to compute standard deviations
i = 0
with open(f"data/{validate_dataset}", "w") as baseline_file:
    baseline_file.write("probability,prediction,label\n")  # our header
    with open(validation_data, "r") as f:
        for row in f:
            (label, input_cols) = row.split(",", 1)
            probability = float(predictor.predict(input_cols))
            prediction = int(probability)
            baseline_file.write(f"{probability},{prediction},{label}\n")
            i += 1
            if i > limit:
                break
            print(".", end="", flush=True)
            sleep(0.5)
print("Done!")


........................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................

Above code block takes 1000 observations from validation dataset, each observation is passed through model and done prediction. For each prediction on observation, it writes probability, prediction and actual label in validation_with_predictions.csv file as shown below.

#### 3.2 Examine the predictions from the model


In [None]:
!head data/validation_with_predictions.csv


probability,prediction,label
0.0,0,0
0.0,0,4
0.0,0,4
1.0,1,2
0.0,0,4
1.0,1,1
0.0,0,4
1.0,1,1
0.0,0,3


#### 3.3 Upload the predictions as a baseline dataset.
Now we will upload the predictions made using validation dataset to S3 which will be used for creating model quality baseline statistics and constraints

In [None]:
baseline_prefix = prefix + "/baselining"
baseline_data_prefix = baseline_prefix + "/data"
baseline_results_prefix = baseline_prefix + "/results"

baseline_data_uri = f"s3://{bucket}/{baseline_data_prefix}"
baseline_results_uri = f"s3://{bucket}/{baseline_results_prefix}"
print(f"Baseline data uri: {baseline_data_uri}")
print(f"Baseline results uri: {baseline_results_uri}")


Baseline data uri: s3://sagemaker-us-east-1-752847213914/mlops-data/baselining/data
Baseline results uri: s3://sagemaker-us-east-1-752847213914/mlops-data/baselining/results


In [None]:
baseline_dataset_uri = S3Uploader.upload(f"data/{validate_dataset}", baseline_data_uri)
baseline_dataset_uri


sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


's3://sagemaker-us-east-1-752847213914/mlops-data/baselining/data/validation_with_predictions.csv'

1. The generated validation_with_predictions.csv file is uploaded in baselining.results folder.

2. This validation_with_predictions.csv ie models performance on validation dataset is considered as baseline and if its performance goes below baseline performance (on basis of defined metrics) system generates an alert.

#### 3.4 Create a baselining job with validation dataset predictions
Define the model quality monitoring object and execute the model quality monitoring baseline job. Model monitor will automatically generate baseline statistics and constraints based on the validation dataset provided.

In [None]:
from sagemaker.model_monitor import ModelQualityMonitor
from sagemaker.model_monitor import EndpointInput
from sagemaker.model_monitor.dataset_format import DatasetFormat


In [None]:
# Create the model quality monitoring object
mlops_model_quality_monitor = ModelQualityMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.large",
    volume_size_in_gb=20,
    max_runtime_in_seconds=1800,
    sagemaker_session=session,
)


INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: .
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


Model quality monitoring object is created using instance.

In [None]:
# Name of the model quality baseline job
baseline_job_name = f"DEMO-mlops-xgboost-baseline-job-{datetime.utcnow():%Y-%m-%d-%H%M}"


Defined variable for job.

In [None]:
# Execute the baseline suggestion job.
# You will specify problem type, in this case Multi class Classification, and provide other required attributes.
job = mlops_model_quality_monitor.suggest_baseline(
    job_name=baseline_job_name,
    baseline_dataset=baseline_dataset_uri,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    problem_type="MulticlassClassification",
    inference_attribute="prediction",
    probability_attribute="probability",
    ground_truth_attribute="label",
)

job.wait(logs=False)


INFO:sagemaker:Creating processing-job with name DEMO-mlops-xgboost-baseline-job-2023-11-07-1754


.......................................................................................!

1. Above block creates processing job.

2. Evaluation metrics are mentioned above, this job creates baseline values from validation data inference.
ie gives accuracy, etc values by comparing prediction and ground truth values in validation (or any provided) dataset.


#### 3.5 Explore the results of the baselining job
You could see the baseline constraints and statistics files are uploaded to the S3 location.

In [None]:
baseline_job = mlops_model_quality_monitor.latest_baselining_job


##### 3.5.1 View the metrics generated
You could see that the baseline statistics and constraints files are already uploaded to S3.

In [None]:
multiclas_metrics = baseline_job.baseline_statistics().body_dict["multiclass_classification_metrics"]
pd.json_normalize(multiclas_metrics).T

Unnamed: 0,0
confusion_matrix.4.4,0.0
confusion_matrix.4.5,24.0
confusion_matrix.4.1,0.0
confusion_matrix.4.0,148.0
confusion_matrix.4.2,0.0
confusion_matrix.4.3,0.0
confusion_matrix.5.4,0.0
confusion_matrix.5.5,1.0
confusion_matrix.5.1,5.0
confusion_matrix.5.0,156.0


After complection of baseline job, results are stored respective S3 bucket.

##### 3.5.2 View the constraints generated

In [None]:
pd.DataFrame(baseline_job.suggested_constraints().body_dict["multiclass_classification_constraints"]).T


Unnamed: 0,threshold,comparison_operator
accuracy,0.12987,LessThanThreshold
weighted_recall,0.12987,LessThanThreshold
weighted_precision,0.071049,LessThanThreshold
weighted_f0_5,0.07492,LessThanThreshold
weighted_f1,0.087333,LessThanThreshold
weighted_f2,0.107676,LessThanThreshold


In the above example you can see that model quality monitor suggested a constraint that will ensure that the model F2 score should note drop below 0.625. Few generated constraints _may_ be a tad aggressive like precision, where it will alert on any drops below 1.0. It is recommended to modify this file as necessary prior to using for monitoring.

##  Section 4 - Setup continuous model monitoring to identify model quality drift <a id='analyze-model-quality-drift'></a>

In this section, you will setup a continuous model monitoring job that monitors the quality of the deployed model against the baseline generated in the previous section.  This is to ensure that the quality does not degrade over time.

In addition to the generated baseline, Amazon SageMaker Model Quality Monitoring needs two additional inputs - predictions made by the deployed model endpoint and the ground truth data to be provided by the model consuming application.  Since you already enabled data capture on the endpoint, prediction data is captured in S3.  The ground truth data depends on the what your model is predicting and what the business use case is.  In this case, since the model is predicting customer churn, ground truth data may indicate if the customer actually left the company or not.  For the purposes of this notebook, you will generate synthetic data as ground truth.

We will create contineous monitoring job which checks the quality of model performance.

#### 4.1 Generate prediction data for Model Quality  Monitoring

Start generating some artificial traffic.  The cell below starts a thread to send some traffic to the endpoint. Note that you need to stop the kernel to terminate this thread. If there is no traffic, the monitoring jobs are marked as `Failed` since there is no data to process.

In [None]:
endpoint_name

'Xgboost-Inference-endpoint-2023-11-07-1116'

In [None]:
def invoke_endpoint(ep_name, file_name):
    with open(file_name, "r") as f:
        i = 0
        for row in f:
            payload = row.rstrip("\n")
            response = session.sagemaker_runtime_client.invoke_endpoint(
                EndpointName=ep_name,
                ContentType="text/csv",
                Body=payload,
                InferenceId=str(i),  # unique ID per row
            )["Body"].read()
            i += 1
            sleep(1)


def invoke_endpoint_forever():
    while True:
        try:
            invoke_endpoint(endpoint_name, "data/test_x.csv")
        except session.sagemaker_runtime_client.exceptions.ValidationError:
            pass


thread = Thread(target=invoke_endpoint_forever)
thread.start()

Notice the new attribute `inferenceId`, which we're setting when invoking the endpoint. This is used to join the prediction data with the ground truth data.

#### 4.2 View captured data

Now list the data capture files stored in Amazon S3. You should expect to see different files from different time periods organized based on the hour in which the invocation occurred. The format of the Amazon S3 path is:

`s3://{destination-bucket-prefix}/{endpoint-name}/{variant-name}/yyyy/mm/dd/hh/filename.jsonl`

In [None]:
print("Waiting for captures to show up", end="")
for _ in range(120):
    capture_files = sorted(S3Downloader.list(f"{s3_capture_upload_path}/{endpoint_name}"))
    if capture_files:
        capture_file = S3Downloader.read_file(capture_files[-1]).split("\n")
        capture_record = json.loads(capture_file[0])
        if "inferenceId" in capture_record["eventMetadata"]:
            break
    print(".", end="", flush=True)
    sleep(1)
print()
print("Found Capture Files:")
print("\n ".join(capture_files[-3:]))

Waiting for captures to show upsagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
.sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
.sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
.sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
.sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sa

Next, view the contents of a single capture file. Here you should see all the data captured in an Amazon SageMaker specific JSON-line formatted file. Take a quick peek at the first few lines in the captured file.

In [None]:
print("\n".join(capture_files[-3:-1]))




Finally, the contents of a single line is present below in a formatted JSON file so that you can observe a little better.

Again, notice the `inferenceId` attribute that is set as part of the invoke_endpoint call.  If this is present, it will be used to join with ground truth data (otherwise `eventId` will be used):

In [None]:
print(json.dumps(capture_record, indent=2))

{
  "captureData": {
    "endpointInput": {
      "observedContentType": "text/csv",
      "mode": "INPUT",
      "data": "0.2775108399892457,0.4213949623176243,0.5776854083906318,0.7453982733885129,-0.6419521205443532,-0.18389844555064858,-0.0887819772092396,-0.06686049299075673,-0.09097838412590584,0.0939703335680906,-0.0021291967892783903,0.008632794314209756",
      "encoding": "CSV"
    },
    "endpointOutput": {
      "observedContentType": "text/csv; charset=utf-8",
      "mode": "OUTPUT",
      "data": "0.0\n",
      "encoding": "CSV"
    }
  },
  "eventMetadata": {
    "eventId": "63e722b3-ed3f-4450-86e9-d1d308bda1e3",
    "inferenceId": "5998",
    "inferenceTime": "2023-11-06T18:01:40Z"
  },
  "eventVersion": "0"
}


#### 4.3 Generate synthetic ground truth

Next, start generating ground truth data. The model quality job will fail if there's no ground truth data to merge.

In [None]:
import random


def ground_truth_with_id(inference_id):
    random.seed(inference_id)  # to get consistent results
    rand = random.random()
    return {
        "groundTruthData": {
            "data": "1" if rand < 0.7 else "0",  # randomly generate positive labels 70% of the time
            "encoding": "CSV",
        },
        "eventMetadata": {
            "eventId": str(inference_id),
        },
        "eventVersion": "0",
    }


def upload_ground_truth(records, upload_time):
    fake_records = [json.dumps(r) for r in records]
    data_to_upload = "\n".join(fake_records)
    target_s3_uri = f"{ground_truth_upload_path}/{upload_time:%Y/%m/%d/%H/%M%S}.jsonl"
    print(f"Uploading {len(fake_records)} records to", target_s3_uri)
    S3Uploader.upload_string_as_file_body(data_to_upload, target_s3_uri)

In [None]:
NUM_GROUND_TRUTH_RECORDS = 334  # 334 are the number of rows in data we're sending for inference


def generate_fake_ground_truth_forever():
    j = 0
    while True:
        fake_records = [ground_truth_with_id(i) for i in range(NUM_GROUND_TRUTH_RECORDS)]
        upload_ground_truth(fake_records, datetime.utcnow())
        j = (j + 1) % 5
        sleep(60 * 60)  # do this once an hour


gt_thread = Thread(target=generate_fake_ground_truth_forever)
gt_thread.start()

Uploading 334 records to s3://sagemaker-us-east-1-752847213914/mlops-data/ground_truth_data/2023-11-06-14-16-55/2023/11/07/18/0539.jsonl
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


We have model prediction as well as ground truth data.

#### 4.4 Create a monitoring schedule

Now that you have the baseline information and ground truth labels, create a monitoring schedule to run model quality monitoring job.

In [None]:
##Monitoring schedule name
mlops_monitor_schedule_name = (
    f"DEMO-mlops-Xgboost-monitoring-schedule-{datetime.utcnow():%Y-%m-%d-%H%M}"
)

Monitor schedule variable is created.

This monitor schedule contineously run, checks prediction and ground truth data and generates report.

If model performance degrades then it will create alert.

For the monitoring schedule you need to specify how to interpret an endpoint's output. Given that the endpoint in this notebook outputs CSV data, the below code specifies that the first column of the output, `0`, contains a probability (of churn in this example). You will further specify `0.5` as the cutoff  used to determine a positive label (that is, predict that a customer will churn).

In [None]:
# Create an enpointInput
endpointInput = EndpointInput(
    endpoint_name=predictor.endpoint_name,
    probability_attribute="0",
    probability_threshold_attribute=0.5,
    inference_attribute="0",
    destination="/opt/ml/processing/input_data",
)

In [None]:
# Create the monitoring schedule to execute every hour.
from sagemaker.model_monitor import CronExpressionGenerator

response = mlops_model_quality_monitor.create_monitoring_schedule(
    monitor_schedule_name=mlops_monitor_schedule_name,
    endpoint_input=endpointInput,
    output_s3_uri=baseline_results_uri,
    problem_type="MulticlassClassification",
    ground_truth_input=ground_truth_upload_path,
    constraints=baseline_job.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

INFO:sagemaker.model_monitor.model_monitoring:Creating Monitoring Schedule with name: DEMO-mlops-Xgboost-monitoring-schedule-2023-11-07-1806


In [None]:
# Create the monitoring schedule
# You will see the monitoring schedule in the 'Scheduled' status
mlops_model_quality_monitor.describe_schedule()

{'MonitoringScheduleArn': 'arn:aws:sagemaker:us-east-1:752847213914:monitoring-schedule/DEMO-mlops-Xgboost-monitoring-schedule-2023-11-07-1806',
 'MonitoringScheduleName': 'DEMO-mlops-Xgboost-monitoring-schedule-2023-11-07-1806',
 'MonitoringScheduleStatus': 'Pending',
 'MonitoringType': 'ModelQuality',
 'CreationTime': datetime.datetime(2023, 11, 7, 18, 6, 30, 211000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2023, 11, 7, 18, 6, 30, 296000, tzinfo=tzlocal()),
 'MonitoringScheduleConfig': {'ScheduleConfig': {'ScheduleExpression': 'cron(0 * ? * * *)'},
  'MonitoringJobDefinitionName': 'model-quality-job-definition-2023-11-07-18-06-29-593',
  'MonitoringType': 'ModelQuality'},
 'EndpointName': 'Xgboost-Inference-endpoint-2023-11-07-1116',
 'ResponseMetadata': {'RequestId': '91500c02-e015-4e1c-b716-af3fac10e81c',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '91500c02-e015-4e1c-b716-af3fac10e81c',
   'content-type': 'application/x-amz-json-1.1',
   'conten

1. New preprocessing job will be created. This monitoring job will be executed after every x hrs mentioned in script.

#### 4.5 Examine monitoring schedule executions

In [None]:
# Initially there will be no executions since the first execution happens at the top of the hour
# Note that it is common for the execution to luanch upto 20 min after the hour.
executions = mlops_model_quality_monitor.list_executions()
executions

No executions found for schedule. monitoring_schedule_name: DEMO-mlops-Xgboost-monitoring-schedule-2023-11-07-1806


[]

In [None]:
# Wait for the first execution of the monitoring_schedule
print("Waiting for first execution", end="")
while True:
    execution = mlops_model_quality_monitor.describe_schedule().get(
        "LastMonitoringExecutionSummary"
    )
    if execution:
        break
    print(".", end="", flush=True)
    sleep(1)
print()
print("Execution found!")

Waiting for first execution................................................................................................................................................................................................................................................................................................................................................................................................................................Uploading 334 records to s3://sagemaker-us-east-1-752847213914/mlops-data/ground_truth_data/2023-11-06-14-16-55/2023/11/07/19/0305.jsonl
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
..
Execution found!


In [None]:
while not executions:
    executions = mlops_model_quality_monitor.list_executions()
    sleep(1)
latest_execution = executions[-1]
latest_execution.describe()

Uploading 334 records to s3://sagemaker-us-east-1-752847213914/mlops-data/ground_truth_data/2023-11-06-14-16-55/2023/11/07/19/0539.jsonl
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


{'ProcessingInputs': [{'InputName': 'groundtruth_input_1',
   'AppManaged': False,
   'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-752847213914/mlops-data/ground_truth_data/2023-11-06-14-16-55/2023/11/07/18',
    'LocalPath': '/opt/ml/processing/groundtruth/2023/11/07/18',
    'S3DataType': 'S3Prefix',
    'S3InputMode': 'File',
    'S3DataDistributionType': 'FullyReplicated',
    'S3CompressionType': 'None'}},
  {'InputName': 'endpoint_input_1',
   'AppManaged': False,
   'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-752847213914/mlops-level-1-demo/datacapture/Xgboost-Inference-endpoint-2023-11-07-1116/AllTraffic/2023/11/07/18',
    'LocalPath': '/opt/ml/processing/input_data/Xgboost-Inference-endpoint-2023-11-07-1116/AllTraffic/2023/11/07/18',
    'S3DataType': 'S3Prefix',
    'S3InputMode': 'File',
    'S3DataDistributionType': 'FullyReplicated',
    'S3CompressionType': 'None'}}],
 'ProcessingOutputConfig': {'Outputs': [{'OutputName': 'result',
    'S3Output': {'S3Uri': 's3://sa

##### Inspect a specific execution (latest execution)
In the previous cell, you picked up the latest completed or failed scheduled execution. Here are the possible terminal states and what each of them mean:
* Completed - This means the monitoring execution completed and no issues were found in the violations report.
* CompletedWithViolations - This means the execution completed, but constraint violations were detected.
* Failed - The monitoring execution failed, maybe due to client error (perhaps incorrect role permissions) or infrastructure issues. Further examination of FailureReason and ExitMessage is necessary to identify what exactly happened.
* Stopped - job exceeded max runtime or was manually stopped.

In [None]:
status = execution["MonitoringExecutionStatus"]

while status in ["Pending", "InProgress"]:
    print("Waiting for execution to finish", end="")
    latest_execution.wait(logs=False)
    latest_job = latest_execution.describe()
    print()
    print(f"{latest_job['ProcessingJobName']} job status:", latest_job["ProcessingJobStatus"])
    print(
        f"{latest_job['ProcessingJobName']} job exit message, if any:",
        latest_job.get("ExitMessage"),
    )
    print(
        f"{latest_job['ProcessingJobName']} job failure reason, if any:",
        latest_job.get("FailureReason"),
    )
    sleep(
        30
    )  # model quality executions consist of two Processing jobs, wait for second job to start
    latest_execution = mlops_model_quality_monitor.list_executions()[-1]
    execution = mlops_model_quality_monitor.describe_schedule()["LastMonitoringExecutionSummary"]
    status = execution["MonitoringExecutionStatus"]

print("Execution status is:", status)

if status != "Completed":
    print(execution)
    print(
        "====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures."
    )

Waiting for execution to finish.........................................................................!
groundtruth-merge-202311071900-fd1b0ba271556a4dccb99c93 job status: Completed
groundtruth-merge-202311071900-fd1b0ba271556a4dccb99c93 job exit message, if any: None
groundtruth-merge-202311071900-fd1b0ba271556a4dccb99c93 job failure reason, if any: None
..........................................................!
model-quality-monitoring-202311071900-fd1b0ba271556a4dccb99c93 job status: Completed
model-quality-monitoring-202311071900-fd1b0ba271556a4dccb99c93 job exit message, if any: CompletedWithViolations: Job completed successfully with 5 violations.
model-quality-monitoring-202311071900-fd1b0ba271556a4dccb99c93 job failure reason, if any: None
Execution status is: CompletedWithViolations
{'MonitoringScheduleName': 'DEMO-mlops-Xgboost-monitoring-schedule-2023-11-07-1806', 'ScheduledTime': datetime.datetime(2023, 11, 7, 19, 0, tzinfo=tzlocal()), 'CreationTime': datetime.datetime(2

In [None]:
latest_execution = mlops_model_quality_monitor.list_executions()[-1]
report_uri = latest_execution.describe()["ProcessingOutputConfig"]["Outputs"][0]["S3Output"][
    "S3Uri"
]
print("Report Uri:", report_uri)


Report Uri: s3://sagemaker-us-east-1-752847213914/mlops-data/baselining/results/Xgboost-Inference-endpoint-2023-11-07-1116/DEMO-mlops-Xgboost-monitoring-schedule-2023-11-07-1806/2023/11/07/19


#### 4.5 View violations generated by monitoring schedule

If there are any violations compared to the baseline, they will be listed in the reports uploaded to S3.

In [None]:
pd.options.display.max_colwidth = None
violations = latest_execution.constraint_violations().body_dict["violations"]
violations_df = pd.json_normalize(violations)
violations_df.head(10)


Unnamed: 0,constraint_check_type,description,metric_name
0,LessThanThreshold,Metric weightedF2 with 0.0 +/- 0.0 was LessThanThreshold '0.10767589126519939',weightedF2
1,LessThanThreshold,Metric accuracy with 0.0 +/- 0.0 was LessThanThreshold '0.12987012987012986',accuracy
2,LessThanThreshold,Metric weightedRecall with 0.0 +/- 0.0 was LessThanThreshold '0.12987012987012986',weightedRecall
3,LessThanThreshold,Metric weightedPrecision with 0.0 +/- 0.0 was LessThanThreshold '0.07104887909209637',weightedPrecision
4,LessThanThreshold,Metric weightedF1 with 0.0 +/- 0.0 was LessThanThreshold '0.08733336927609615',weightedF1


1. In endpoint details section we can check model quality, monitory, etc details for each execution job executed.

2. If model performance degraded then it will be logged as well.

3. If data is not present at the time of monitoring job execution, job failure will be logged.



Here you can see that one of the violations generated is that the f2 score is less than the threshold value set as part of baselining.

## Section 5 - Analyze model quality CloudWatch metrics <a id='analyze-cloudwatch-metrics'></a>

In addition to the violations, the monitoring schedule also emits CloudWatch metrics. In this section, you will view the metrics generated and setup an CloudWatch alarm to be triggered when the model quality drifts from the baseline thresholds. You could use CloudWatch alarms to trigger remedial actions such as retraining your model or updating the training dataset.

#### 5.1 List the CW metrics generated.

In [None]:
# Create CloudWatch client
cw_client = boto3.Session().client("cloudwatch")

namespace = "aws/sagemaker/Endpoints/model-metrics"

cw_dimensions = [
    {"Name": "Endpoint", "Value": endpoint_name},
    {"Name": "MonitoringSchedule", "Value": churn_monitor_schedule_name},
]

In [None]:
# List metrics through the pagination interface
paginator = cw_client.get_paginator("list_metrics")

for response in paginator.paginate(Dimensions=cw_dimensions, Namespace=namespace):
    model_quality_metrics = response["Metrics"]
    for metric in model_quality_metrics:
        print(metric["MetricName"])

#### 5.2 Create a CloudWatch Alarm

Based on the cloud watch metrics, you can create a cloud watch alarm when a specific metric does not meet the threshold configured. Here you will create an alarm if the f2 value of the model fall below the threshold suggested by the baseline constraints.

In [None]:
alarm_name = "MODEL_QUALITY_F2_SCORE"
alarm_desc = (
    "Trigger an CloudWatch alarm when the f2 score drifts away from the baseline constraints"
)
mdoel_quality_f2_drift_threshold = (
    0.625  ##Setting this threshold purposefully low to see the alarm quickly.
)
metric_name = "f2"
namespace = "aws/sagemaker/Endpoints/model-metrics"

cw_client.put_metric_alarm(
    AlarmName=alarm_name,
    AlarmDescription=alarm_desc,
    ActionsEnabled=True,
    #AlarmActions=[sns_notifications_topic]
    MetricName=metric_name,
    Namespace=namespace,
    Statistic="Average",
    Dimensions=[
        {"Name": "Endpoint", "Value": endpoint_name},
        {"Name": "MonitoringSchedule", "Value": mlops_monitor_schedule_name},
    ],
    Period=600,
    EvaluationPeriods=1,
    DatapointsToAlarm=1,
    Threshold=mdoel_quality_f2_drift_threshold,
    ComparisonOperator="LessThanOrEqualToThreshold",
    TreatMissingData="breaching",
)

{'ResponseMetadata': {'RequestId': '8fbb3d95-9aa1-46fb-b29b-9215decc1ee4',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '8fbb3d95-9aa1-46fb-b29b-9215decc1ee4',
   'content-type': 'text/xml',
   'content-length': '214',
   'date': 'Tue, 07 Nov 2023 19:22:44 GMT'},
  'RetryAttempts': 0}}

1. In alarms> cloud watch consol different metrics are logged. In alarms section, alarm will be logged if F2_score is lesser than threshlod set.

2. Based on this alarm given, we can retrigger new model training job (on new labelled data) and deploy it in producion.

In [None]:
import boto3
client = boto3.client('sns', region_name= region)
response = client.list_topics()

for each_reg in response['Topics']:
    print(each_reg['TopicArn'])

Uploading 334 records to s3://sagemaker-us-east-1-752847213914/mlops-data/ground_truth_data/2023-11-06-14-16-55/2023/11/06/21/2205.jsonl
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


In [None]:
alarm_name = "MODEL_QUALITY_F2_SCORE"
alarm_desc = (
    "Trigger an CloudWatch alarm when the f2 score drifts away from the baseline constraints"
)
mdoel_quality_f2_drift_threshold = (
    0.625  ##Setting this threshold purposefully low to see the alarm quickly.
)
metric_name = "f2"
namespace = "aws/sagemaker/Endpoints/model-metrics"

cw_client.put_metric_alarm(
    AlarmName=alarm_name,
    AlarmDescription=alarm_desc,
    ActionsEnabled=True,
    AlarmActions=[sns_notifications_topic],
    MetricName=metric_name,
    Namespace=namespace,
    Statistic="Average",
    Dimensions=[
        {"Name": "Endpoint", "Value": endpoint_name},
        {"Name": "MonitoringSchedule", "Value": mlops_monitor_schedule_name},
    ],
    Period=600,
    EvaluationPeriods=1,
    DatapointsToAlarm=1,
    Threshold=mdoel_quality_f2_drift_threshold,
    ComparisonOperator="LessThanOrEqualToThreshold",
    TreatMissingData="breaching",
)

ClientError: An error occurred (ValidationError) when calling the PutMetricAlarm operation: Invalid arn syntax: RetrainSNSTopic

In [None]:
cw_client = boto3.Session().client('cloudwatch')

alarm_name='BASELINE_DRIFT_FEATURE_AGE'
alarm_desc='Trigger an cloudwatch alarm when the feature age drifts away from the baseline'
feature_age_drift_threshold=0.1 ##Setting this threshold purposefully slow to see the alarm quickly.
metric_name='feature_baseline_drift_Age'
namespace='aws/sagemaker/Endpoints/data-metrics'

endpoint_name=endpoint_name
monitoring_schedule_name=mon_schedule_name

cw_client.put_metric_alarm(
    AlarmName=alarm_name,
    AlarmDescription=alarm_desc,
    ActionsEnabled=True,
    AlarmActions=[sns_notifications_topic],
    MetricName=metric_name,
    Namespace=namespace,
    Statistic='Average',
    Dimensions=[
        {
            'Name': 'Endpoint',
            'Value': endpoint_name
        },
        {
            'Name': 'MonitoringSchedule',
            'Value': monitoring_schedule_name
        }
    ],
    Period=600,
    EvaluationPeriods=1,
    DatapointsToAlarm=1,
    Threshold=feature_age_drift_threshold,
    ComparisonOperator='GreaterThanOrEqualToThreshold',
    TreatMissingData='breaching'
)

## Clean up <a id='cleanup'></a>  

You can keep your endpoint running to continue capturing data. If you do not plan to collect more data or use this endpoint further, you should delete the endpoint to avoid incurring additional charges. Note that deleting your endpoint does not delete the data that was captured during the model invocations. That data persists in Amazon S3 until you delete it yourself.

But before that, you need to delete the schedule first.

In [None]:
churn_model_quality_monitor.delete_monitoring_schedule()
sleep(60)  # actually wait for the deletion

In [None]:
predictor.delete_model()
predictor.delete_endpoint()

# Summery:




1. Once model is deployed, data is captured and stored in one bucket for prediction. This data is passed through model and prediction is done.

2. Ground truth data is also stored in one bucket when it is captured.

3. Validation dataset (Or any other known dataset) is used to create baseline.
 i. Here predictions are done on validation dataset and model performance evaluation is done, this performance is considered as baseline performance of model.
  ii. These baseline evaluation metrics are used as threshold to check model performance.

4. Now contineous model performance monitoring job is created. This job check prediction and ground truth data is avaliable or not in respective buckets. and evaluate model performance/ drift.

      This job is periodically executed using automated script/ cronjob.

5. For each monitored job, performance of model is logged and reports are saved in repo.

6. If model performance is not upto expections (Model performance violets perticular evaluation metric), alarm is created and its performance is logged.

7. Based on this alarm given, model retraining job is triggerd and model is trained again with new data. This trained model is deployed and used for prediction.
