# Task 2: Model monitoring

In this notebook, you monitor and evaluate the data captured from the endpoint. You create a baseline with which you compare the real-time traffic. Once a baseline is ready, you set up a schedule to continuously evaluate and compare the data against the baseline.

## Task 2.1: Environment setup

In this task, you set up your environment.

In [None]:
#install-dependencies
%matplotlib inline
from datetime import datetime, timedelta
import json
import boto3
import time
import pandas as pd
import matplotlib.pyplot as plt
from sagemaker import get_execution_role, session
from sagemaker.s3 import S3Uploader
from sagemaker.image_uris import retrieve
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import CSVDeserializer
from time import sleep
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat
from sagemaker.model_monitor import CronExpressionGenerator

region = boto3.Session().region_name
role = get_execution_role()
sm_session = session.Session(boto3.Session())
sm = boto3.Session().client("sagemaker")
sm_runtime = boto3.Session().client("sagemaker-runtime")
cw = boto3.Session().client("cloudwatch")

bucket = sm_session.default_bucket()
prefix = 'sagemaker/abalone'
data_capture_prefix = "{}/datacapture".format(prefix)
s3_capture_upload_path = "s3://{}/{}".format(bucket, data_capture_prefix)
capture_modes = [ "Input",  "Output" ]
code_prefix = "{}/code".format(prefix)
s3_code_preprocessor_uri = "s3://{}/{}/{}".format(bucket, code_prefix, "preprocessor.py")
s3_code_postprocessor_uri = "s3://{}/{}/{}".format(bucket, code_prefix, "postprocessor.py")
reports_prefix = "{}/reports".format(prefix)
s3_report_path = "s3://{}/{}".format(bucket, reports_prefix)


## Task 2.2: Create a production endpoint with Data Capture enabled

To log the inputs to your endpoint and the inference outputs from your deployed model to Amazon S3, you can enable a feature called Data Capture. Data Capture records information that can be used for training, debugging, and monitoring. Amazon SageMaker Model Monitor automatically parses this captured data and compares metrics from this data with a baseline that you create for the model.

In this task, you upload the pre-trained model to the S3 bucket, create an Amazon Sagemaker model object, configure an Amazon SageMaker real-time endpoint with Data Capture enabled, and create the real-time endpoint.

<i class="fas fa-sticky-note" style="color:#ff6633"></i> **Note:** Endpoint creation takes approximately 5 minutes to complete.

In [None]:
#create-production-endpoint
# Upload models
model_url = S3Uploader.upload(
    local_path="models/model.tar.gz", desired_s3_uri=f"s3://{bucket}/{prefix}"
)

# Create the model definitions
model_name = f"abalone-A-{datetime.now():%Y-%m-%d-%H-%M-%S}"
image_uri = retrieve("xgboost", boto3.Session().region_name, "1.5-1")

# Create production model object
predictor=sm_session.create_model(
    name=model_name, role=role, container_defs={"Image": image_uri, "ModelDataUrl": model_url}
)

# Create the endpoint configurations
variant_name = 'AllTraffic'

endpoint_config_name = f'Abalone-Endpoint-1-{datetime.now():%Y-%m-%d-%H-%M-%S}'
endpoint_config_response = sm.create_endpoint_config(
    EndpointConfigName = endpoint_config_name,
    ProductionVariants=[
        {
            'ModelName':model_name,
            'InstanceType':'ml.m5.xlarge',
            'InitialInstanceCount':1,
            'VariantName':variant_name
        }
    ],
    
        DataCaptureConfig= {
        'EnableCapture': True, # Whether data should be captured or not.
        'InitialSamplingPercentage' : 100,
        'CaptureContentTypeHeader': {'CsvContentTypes': [ 'text/csv' ]},
        'DestinationS3Uri': s3_capture_upload_path,
        'CaptureOptions': [{"CaptureMode" : capture_mode} for capture_mode in capture_modes] # Example - Use list comprehension to capture both Input and Output
    }
)
print(f"Created the Production Model Endpoint Config: {endpoint_config_name}")
time.sleep(5)

# Create the endpoint with the production model
endpoint_name = f"Abalone-{datetime.now():%Y-%m-%d-%H-%M-%S}"
endpoint_response = sm.create_endpoint(EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name)

def wait_for_endpoint_creation_complete(endpoint):
    """Helper function to wait for the completion of creating an endpoint"""
    response = sm.describe_endpoint(EndpointName=endpoint_name)
    status = response.get("EndpointStatus")
    while status == "Creating":
        print("Waiting for Endpoint Creation")
        time.sleep(15)
        response = sm.describe_endpoint(EndpointName=endpoint_name)
        status = response.get("EndpointStatus")

    if status != "InService":
        print(f"Failed to create endpoint, response: {response}")
        failureReason = response.get("FailureReason", "")
        raise SystemExit(
            f"Failed to create endpoint {endpoint_response['EndpointArn']}, status: {status}, reason: {failureReason}"
        )
    print(f"Endpoint {endpoint_response['EndpointArn']} successfully created.")

wait_for_endpoint_creation_complete(endpoint=endpoint_response)

When the cell completes, an endpoint ARN is returned that looks like *arn:aws:sagemaker:us-west-2:012345678910:endpoint/abalone-2040-10-11-10-11-12*.

Your endpoint is currently configured with one variant, the production model. You can view the endpoint configuration using *describe_endpoint*.

In [None]:
#describe-the-endpoint
sm.describe_endpoint(EndpointName=endpoint_name)

## Task 2.3: View the captured data

In this task, you invoke the endpoint created above using production data. Since you already enabled Data Capture on the endpoint, the request payload, response, and additional metadata is saved in the S3 location you specified earlier in the notebook. After invoking the endpoint, you examine the data captured in the S3 bucket.

First, use an initialized predictor configured with the endpoint name to invoke the endpoint. Invoking the endpoint with new records helps you confirm your Data Capture configuration is set up correctly. A predictor makes prediction requests to an Amazon SageMaker endpoint. Then, run inference by sending records to the endpoint.

In [None]:
#invoke-the-endpoint
predictor = Predictor(endpoint_name=endpoint_name,
                        serializer=CSVSerializer(),
                        deserializer=CSVDeserializer())

validate_dataset = "abalone_data_new_predictions.csv"

limit = 200  # Need at least 200 samples to compute standard deviations
i = 0
with open(f"data/{validate_dataset}", "w") as validation_file:
    validation_file.write("prediction,label\n")  # CSV header
    with open("data/abalone_data_new.csv", "r") as f:
        for row in f:
            (label, input_cols) = row.split(",", 1)
            prediction = predictor.predict(input_cols)[0][0]
            validation_file.write(f"{prediction},{label}\n")
            i += 1
            if i > limit:
                break
            print(".", end="", flush=True)
            sleep(0.5)

print("\nDone!")

Next, list the Data Capture files stored in Amazon S3.

In [None]:
#list-data-capture-files
s3_client = boto3.Session().client("s3")
current_endpoint_capture_prefix = "{}/{}".format(data_capture_prefix, endpoint_name)
result = s3_client.list_objects(Bucket=bucket, Prefix=current_endpoint_capture_prefix)
capture_files = [capture_file.get("Key") for capture_file in result.get("Contents")]
print("Found Capture Files:")
print("\n ".join(capture_files))

Then, view the contents of a single Data Capture file. You should see all the data captured in an Amazon SageMaker specific JSON-line formatted file. Take a moment to review the first few lines in the captured file.

In [None]:
#view-captured-file-lines
def get_obj_body(obj_key):
    return s3_client.get_object(Bucket=bucket, Key=obj_key).get("Body").read().decode("utf-8")


capture_file = get_obj_body(capture_files[-1])
print(capture_file[:2000])

Finally, view the contents of one captured endpoint input and output record.

In [None]:
#print-json-file
print(json.dumps(json.loads(capture_file.split("\n")[0]), indent=2))

Enabling Data Capture on your endpoints gives you more flexibility when you want to save information for training, debugging, and monitoring. Since Amazon SageMaker Model Monitor parses this captured data automatically, using Data Capture helps you compare new records to baseline data. 

You have not configured a baseline yet. In the next task, you use SageMaker Model Monitor to generate baseline statistics and constraints. 

## Task 2.4: Generate baseline statistics and constraints

In this task, you create a baseline. Baseline statistics and constraints serve as a standard for detecting data drift and other data quality issues. 

The test dataset from training the model is often a good baseline dataset. The test dataset schema and the inference dataset schema should exactly match, including the number and order of the features. From the test dataset, you can ask Amazon SageMaker to suggest a set of baseline constraints and generate descriptive statistics to explore the data.

First, configure the baseline prefixes and variables.

In [None]:
#configure-baseline-variables
baseline_prefix = prefix + "/baselining"
baseline_data_prefix = baseline_prefix + "/data"
baseline_results_prefix = baseline_prefix + "/results"

baseline_data_uri = "s3://{}/{}".format(bucket, baseline_data_prefix)
baseline_results_uri = "s3://{}/{}".format(bucket, baseline_results_prefix)
print("Baseline data uri: {}".format(baseline_data_uri))
print("Baseline results uri: {}".format(baseline_results_uri))

Next, start a job to suggest a baseline and constraints. *DefaultModelMonitor.suggest_baseline()* starts a **ProcessingJob** using an Amazon SageMaker provided Model Monitor container to generate the baseline and constraints.

<i class="fas fa-sticky-note" style="color:#ff6633"></i> **Note:** A baseline job takes approximately 10 minutes to complete.

<i class="fas fa-info-circle" style="color:#008296"></i> **Learn more:** Refer to [Create a Baseline](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-create-baseline.html) for more information about creating baseline calculations of statistics and constraints.

In [None]:
#create-baselining-job
my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.t3.xlarge",
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

my_default_monitor.suggest_baseline(
    baseline_dataset="data/abalone_data_new_withheader.csv",
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    wait=True,
)

<i class="fas fa-sticky-note" style="color:#ec7211"></i> **NOTE:** This code returns a lengthy response. You can ignore any warnings or error messages.

When the cell completes, a message is returned that looks like *2025-10-11 12:13:14,156 - DefaultDataAnalyzer - INFO - Spark job completed*.

Now, search for the *constraints.json* and *statistics.json* files to see where they are located.

In [None]:
#explore-generated-constraints-and-statistics
s3_client = boto3.Session().client("s3")
result = s3_client.list_objects(Bucket=bucket, Prefix=baseline_results_prefix)
report_files = [report_file.get("Key") for report_file in result.get("Contents")]
print("Found Files:")
print("\n ".join(report_files))

Next, view the generated statistics.

In [None]:
#view-statistics
baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df.head(10)

The statistics table shows each feature with its corresponding summary statistics, including the mean, standard deviation, min, max, and other important details.

Finally, view the generated constraints.

In [None]:
#view-constraints
constraints_df = pd.json_normalize(
    baseline_job.suggested_constraints().body_dict["features"]
)
constraints_df.head(10)

The constraints table shows the inferred type of each feature, the record completeness (in this case 1.0 for all features because the file has no missing values), and the fields that have no non-negative values. 

Now that you have a baseline created and have viewed the statistics and constraints, create a Model Monitor data quality monitoring job to track new inference records against the baseline.

# Task 2.5: Create a Model Monitor data quality monitoring job

After you create your baseline, you can call the *create_monitoring_schedule()* method of the *DefaultModelMonitor* class instance to schedule an hourly data quality monitor.

In this task, you analyze and monitor the data with a data quality monitoring job.

First, use the *create_monitoring_schedule()* method to schedule an hourly data quality monitoring schedule. 

In [None]:
#create-monitoring-schedule
bucket = boto3.Session().resource("s3").Bucket(bucket)
bucket.Object(code_prefix + "/postprocessor.py").upload_file("python/postprocessor.py")

mon_schedule_name = f"model-monitor-schedule-{datetime.now():%Y-%m-%d-%H-%M-%S}"

my_default_monitor.create_monitoring_schedule(
    monitor_schedule_name=mon_schedule_name,
    endpoint_input=predictor.endpoint_name,
    post_analytics_processor_script=s3_code_postprocessor_uri,
    output_s3_uri=s3_report_path,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

Next, send some artificial traffic to the endpoint for the monitoring job to be able to generate the violations report. To simulate data drift, use a set of skewed data. The skewed data, when compared against the baseline, throws an alert with the automated alert triggering system.

In [None]:
#send-artificial-traffic
endpoint_name = predictor.endpoint_name
runtime_client = sm_session.sagemaker_runtime_client
limit = 200
i = 0

# repeating code from above to run this section independently
def invoke_endpoint(ep_name, file_name, runtime_client):
    i = 0
    with open(file_name, "r") as f:
        for row in f:
            (label, payload) = row.strip("\n").split(",", 1)  

            response = runtime_client.invoke_endpoint(
                EndpointName=ep_name, ContentType="text/csv", Body=payload
            )
            response["Body"].read()
            i += 1
            if i > limit:
                break
            print(".", end="", flush=True)
            time.sleep(0.5)


invoke_endpoint(endpoint_name, "data/abalone_data_skewed.csv", runtime_client)
print("\nDone!")

Use *describe_schedule* to view the schedule you just created.

In [None]:
#model-monitor-schedule-status
desc_schedule_result = my_default_monitor.describe_schedule()
print("Schedule status: {}".format(desc_schedule_result["MonitoringScheduleStatus"]))

The monitor schedule starts jobs at the previously specified hourly interval. Even for an hourly schedule, Amazon SageMaker has a buffer period of 20 minutes to schedule your execution. You might see your execution start anywhere from 0 to 20 minutes from the hour boundary. This is expected and done for load balancing in the backend.

This execution takes approximately one hour to be able to generate the violations report. For the purpose of the lab, the next cells have code snippets for you to view and sample output is shared for reference. In the last step of this task, you view the violations report from a file that was generated and pre-loaded from an earlier monitoring run.

When the execution finishes, SageMaker reports the status of the latest completed or failed execution. 

Here are the possible terminal states:
- **Completed** - The monitoring execution completed and no issues were found in the violations report. 
- **CompletedWithViolations** - The execution completed, but constraint violations were detected. 
- **Failed** - The monitoring execution failed, maybe due to client error (perhaps incorrect role permissions) or infrastructure issues. Further examination of FailureReason and ExitMessage is necessary to identify what exactly happened. 
- **Stopped** - The job exceeded max runtime or was manually stopped.


If you want to list and view the current status of an execution, you can use code similar to this:

```python 
# list the current execution
mon_executions = my_default_monitor.list_executions()
print(
    "We created a hourly schedule above that begins executions ON the hour (plus 0-20 min buffer.\nWe will have to wait for an hour..."
)

while len(mon_executions) == 0:
    print("Waiting for the first execution to happen...")
    time.sleep(60)
    mon_executions = my_default_monitor.list_executions()
```



```python
# Latest execution status
latest_execution = mon_executions[-1]  # Latest execution's index is -1, second to last is -2, etc
time.sleep(60)
latest_execution.wait(logs=False)

print("Latest execution status: {}".format(latest_execution.describe()["ProcessingJobStatus"]))
print("Latest execution result: {}".format(latest_execution.describe()["ExitMessage"]))

latest_job = latest_execution.describe()
if latest_job["ProcessingJobStatus"] != "Completed":
    print(
        "====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures."
    )
```

The following is the expected output when the latest execution of the monitoring job completes.

```bash
!Latest execution status: Completed

Latest execution result: CompletedWithViolations: Job completed successfully with 8 violations.
```

To list the generated violation report, you can use code similar to this:

```python

from urllib.parse import urlparse

report_uri = latest_execution.output.destination
s3uri = urlparse(report_uri)
report_bucket = s3uri.netloc
report_key = s3uri.path.lstrip("/")
s3_client = boto3.Session().client("s3")
result = s3_client.list_objects(Bucket=report_bucket, Prefix=report_key)
report_files = [report_file.get("Key") for report_file in result.get("Contents")]
print("Found Report Files:")
print("\n ".join(report_files))
```

The following is the listing the report files.

```bash
Found Report Files:
sagemaker/abalone/reports/Abalone-2023-09-20-17-05-28/model-monitor-schedule-2023-09-20-17-41-22/2023/09/20/18/constraint_violations.json

sagemaker/abalone/reports/Abalone-2023-09-20-17-05-28/model-monitor-schedule-2023-09-20-17-41-22/2023/09/20/18/constraints.json

sagemaker/abalone/reports/Abalone-2023-09-20-17-05-28/model-monitor-schedule-2023-09-20-17-41-22/2023/09/20/18/statistics.json
```

To list violations compare to the baseline, you can use code similar to this:

```python
violations = my_default_monitor.latest_monitoring_constraint_violations()
pd.set_option("display.max_colwidth", None)
constraints_df = pd.json_normalize(violations.body_dict["violations"])
constraints_df.head(10)
```

Since the execution of the monitoring job you started above will not finish for 60-80 minutes, view a violations report from a file that was generated and pre-loaded from an earlier monitoring run.

In [None]:
#print-violations-report
pd.set_option('display.max_colwidth', None)
violations = json.load(open('data/violations.json'))
constraints_df=pd.json_normalize(violations, record_path=['violations'])
constraints_df.head(10)

The violations report shows eight violations from the skewed data file, with four **data_type_check** violations for **rings**, **sex_f**, **sex_i**, and **sex_m**, and four **baseline_drift_check** violations for **length**, **diameter**, **whole_weight**, and **shell_weight**.

Take a moment to view the description for each violation. Notice that the data type matches were not correct for some of the features. Also, notice that the baseline drift distance exceeded the *0.1* threshold for the four reported features.

<i class="fas fa-sticky-note" style="color:#ff6633" aria-hidden="true"></i> **Note:** When you created a baseline job, it produced constraints.json and statistics.json files. In the *constraints.json* file, the *comparison_threshold* is set to *0.1* by default. To learn more about constraints.json file, refer [Schema for Constraints](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-byoc-constraints.html).

## Task 2.6: Create a CloudWatch alarm

When data drift happens, it is helpful to get notifications so you can address any issues. A notification or alarm can also trigger automatic model retraining to address changes that might be occurring with your inference data.

In this task, you learn how to create alarm and enable notifications to know when data drifts away from baseline data.

First, find your current Amazon Simple Notification Service(Amazon SNS) topics by using **list_topics**.

In [None]:
#list-topics
client = boto3.client('sns')
topics_list= client.list_topics()
print(topics_list)

Next, set the **sns_notifications_topic** variable with the topic ARN value you found in the prior cell.

In [None]:
#set-variables
topic_details = pd.json_normalize(topics_list['Topics'])
topic_arn = topic_details['TopicArn']
print (topic_arn[0])
sns_notifications_topic = topic_arn[0]

Finally, create an alarm using **put_metric_alarm** that triggers and notification when the feature diameter drifts away from the baseline and retrain the model automatically. 

You use the built-in Amazon SageMaker Model Monitor container for CloudWatch metrics. SageMaker emits the metrics for each feature observed in the dataset in the */aws/sagemaker/Endpoints/data-metric* namespace with *EndpointName* and *ScheduleName* dimensions

In [None]:
#trigger-cloudwatch-alarm-when-it-drifts-from-baseline
previous_date = f'{datetime.today() - timedelta(days=1):%Y-%m-%d}'
print (previous_date)
cw_client = boto3.Session().client('cloudwatch')

alarm_name = 'BASELINE_DRIFT_FEATURE_DIAMETER'
alarm_desc = 'Trigger an cloudwatch alarm when the feature diameter drifts away from the baseline'
feature_diameter_drift_threshold = 0.1  # Setting this threshold purposefully low to see the alarm quickly.
metric_name = 'feature_baseline_drift_diameter'
namespace = 'aws/sagemaker/Endpoints/data-metrics'

endpoint_name = 'Abalone-' + previous_date
print (endpoint_name)
monitoring_schedule_name = 'model-monitor-schedule-' + previous_date
print (monitoring_schedule_name)

cw_client.put_metric_alarm(
    AlarmName=alarm_name,
    AlarmDescription=alarm_desc,
    ActionsEnabled=True,
    AlarmActions=[sns_notifications_topic],
    MetricName=metric_name,
    Namespace=namespace,
    Statistic='Sum',
    Dimensions=[
        {
            'Name': 'Endpoint',
            'Value': endpoint_name
        },
        {
            'Name': 'MonitoringSchedule',
            'Value': monitoring_schedule_name
        }
    ],
    Period=600,
    EvaluationPeriods=1,
    DatapointsToAlarm=1,
    Threshold=feature_diameter_drift_threshold,
    ComparisonOperator='GreaterThanOrEqualToThreshold',
    TreatMissingData='breaching'
)

You created a CloudWatch alarm. You can use this alarm to notify you of any data drift issues and trigger automatic model retraining.

### Cleanup

You have completed this notebook. To move to the next part of the lab, do the following:

- Close this notebook file.
- Return to the lab session and continue with **Task 3**.