In [None]:
!pip install "sagemaker>=2.123.0"

In [None]:
from datetime import datetime, timedelta
import pandas as pd
import time
import csv
import json
import boto3
import sagemaker

region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()

sagemaker_client = sagemaker_session.sagemaker_client
sagemaker_runtime_client = sagemaker_session.sagemaker_runtime_client

from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer

from sagemaker.clarify import (
    BiasConfig,
    DataConfig,
    ModelConfig,
    ModelPredictedLabelConfig,
    SHAPConfig,
)

from sagemaker.model_monitor import (
    BiasAnalysisConfig,
    CronExpressionGenerator,
    DataCaptureConfig,
    EndpointInput,
    ExplainabilityAnalysisConfig,
    ModelBiasMonitor,
    ModelExplainabilityMonitor,
    DefaultModelMonitor,
    ModelQualityMonitor,
)

from sagemaker.model_monitor.dataset_format import DatasetFormat

from sagemaker.s3 import S3Downloader, S3Uploader

In [None]:
print(f"AWS region: {region}")
# A different bucket can be used, but make sure the role for this notebook has
# the s3:PutObject permissions. This is the bucket into which the data is captured.
print(f"S3 Bucket: {default_bucket}")

# Endpoint metadata.
# Note: you will use the staging endpoint from the previously lab just as you would in a real scenario to verify your monitoring
# setup before deploying your setup on production endpoints.
endpoint_name = "workshop-project-staging"
endpoint_instance_count = 1
endpoint_instance_type = "ml.m5.large"
print(f"Endpoint: {endpoint_name}")

prefix = "sagemaker/xgboost-dm-model-monitoring"
s3_key = f"s3://{default_bucket}/{prefix}"
print(f"S3 key: {s3_key}")

s3_capture_upload_path = f"{s3_key}/data_capture"
s3_ground_truth_upload_path = f"{s3_key}/ground_truth_data/{datetime.now():%Y-%m-%d-%H-%M-%S}"
s3_baseline_results_path = f"{s3_key}/baselines"
s3_report_path = f"{s3_key}/reports"

print(f"Capture path: {s3_capture_upload_path}")
print(f"Ground truth path: {s3_ground_truth_upload_path}")
print(f"Baselines path: {s3_baseline_results_path}")
print(f"Report path: {s3_report_path}")

sm_client = boto3.client('sagemaker')

endpoint_config = sm_client.describe_endpoint(EndpointName = endpoint_name)['EndpointConfigName']
model_name = sm_client.describe_endpoint_config(EndpointConfigName = endpoint_config)['ProductionVariants'][0]['ModelName']

print("Model Name : ", model_name)

In [None]:
# Create a Predictor Python object for real-time endpoint requests. https://sagemaker.readthedocs.io/en/stable/api/inference/predictors.html
predictor = Predictor(endpoint_name=endpoint_name, serializer=CSVSerializer())

In [None]:
# Read in training set for schema and to compute feature attribution baselines.
train_df = pd.read_csv("train-headers.csv")

In [None]:
# Use test set to create a file without headers and labels to mirror data format at inference time.
test_df = pd.read_csv("test.csv", header = None)
test_df.drop(test_df.columns[0], axis=1, inplace=True)
test_df.sample(180).to_csv('test-samples-no-header.csv', header = False, index = None)

In [None]:
print("Sending test traffic to the endpoint {}. \nPlease wait...".format(endpoint_name))

test_sample_df = pd.read_csv('test-samples-no-header.csv', header = None, index_col = False)

response = predictor.predict(data=test_sample_df.to_numpy())

print("Done!")

In [None]:
print("Waiting 60 seconds for captures to show up", end="")

for _ in range(60):
    capture_files = sorted(S3Downloader.list(f"{s3_capture_upload_path}/{endpoint_name}"))
    if capture_files:
        break
    print(".", end="", flush=True)
    time.sleep(1)

print("\nFound Capture Files:")
print("\n ".join(capture_files[-10:]))

In [None]:
capture_file = S3Downloader.read_file(capture_files[-1]).split("\n")[-10:-1]
print(capture_file[-1])

View a single line is present below in a formatted JSON file.

In [None]:
print(json.dumps(json.loads(capture_file[-1]), indent=2))

In [None]:
import threading

class WorkerThread(threading.Thread):
    def __init__(self, do_run, *args, **kwargs):
        super(WorkerThread, self).__init__(*args, **kwargs)
        self.__do_run = do_run
        self.__terminate_event = threading.Event()

    def terminate(self):
        self.__terminate_event.set()

    def run(self):
        while not self.__terminate_event.is_set():
            self.__do_run(self.__terminate_event)

In [None]:
def invoke_endpoint(terminate_event):
    with open("test-samples-no-header.csv", "r") as f:
        i = 0
        for row in f:
            payload = row.rstrip("\n")
            response = sagemaker_runtime_client.invoke_endpoint(
                EndpointName=endpoint_name,
                ContentType="text/csv",
                Body=payload,
                InferenceId=str(i),  # unique ID per row
            )
            i += 1
            response["Body"].read()
            time.sleep(1)
            if terminate_event.is_set():
                break


# Keep invoking the endpoint with test data
invoke_endpoint_thread = WorkerThread(do_run=invoke_endpoint)
invoke_endpoint_thread.start()

In [None]:
import random

def ground_truth_with_id(inference_id):
    # set random seed to get consistent results.
    random.seed(inference_id) 
    rand = random.random()
    # format required by the merge container.
    return {
        "groundTruthData": {
            # randomly generate positive labels 70% of the time.
            "data": "1" if rand < 0.7 else "0",
            "encoding": "CSV",
        },
        "eventMetadata": {
            "eventId": str(inference_id),
        },
        "eventVersion": "0",
    }


def upload_ground_truth(upload_time):
    # 180 are the number of rows in data we're sending for inference.
    records = [ground_truth_with_id(i) for i in range(180)]
    fake_records = [json.dumps(r) for r in records]
    data_to_upload = "\n".join(fake_records)
    target_s3_uri = f"{s3_ground_truth_upload_path}/{upload_time:%Y/%m/%d/%H/%M%S}.jsonl"
    print(f"Uploading {len(fake_records)} records to", target_s3_uri)
    S3Uploader.upload_string_as_file_body(data_to_upload, target_s3_uri)

In [None]:
# Generate data for the last hour.
upload_ground_truth(datetime.utcnow() - timedelta(hours=1))

In [None]:
# You can also use the WorkerThread class to continue generating synthetic ground truth data once an hour.
def generate_fake_ground_truth(terminate_event):
    upload_ground_truth(datetime.utcnow())
    for _ in range(0, 60):
        time.sleep(60)
        if terminate_event.is_set():
            break


ground_truth_thread = WorkerThread(do_run=generate_fake_ground_truth)
ground_truth_thread.start()

In [None]:
model_quality_monitor = ModelQualityMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    volume_size_in_gb=20,
    max_runtime_in_seconds=1800,
    sagemaker_session=sagemaker_session
)

In [None]:
model_quality_baseline_job_name = f"ModelQualityBaselineJob-{datetime.utcnow():%Y-%m-%d-%H%M}"
model_quality_baseline_job_result_uri = f"{s3_baseline_results_path}/model_quality"

model_quality_baseline_job = model_quality_monitor.suggest_baseline(
    job_name=model_quality_baseline_job_name,
    baseline_dataset="validation-with-predictions.csv", # The S3 location of the validation dataset.
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri = model_quality_baseline_job_result_uri, # The S3 location to store the results.
    problem_type="BinaryClassification",
    inference_attribute= "prediction", # The column in the dataset that contains predictions.
    probability_attribute= "probability", # The column in the dataset that contains probabilities.
    ground_truth_attribute= "label" # The column in the dataset that contains ground truth labels.
)

model_quality_baseline_job.wait(logs=False)

In [None]:
latest_model_quality_baseline_job = model_quality_monitor.latest_baselining_job
pd.DataFrame(latest_model_quality_baseline_job.suggested_constraints().body_dict["binary_classification_constraints"]).T

In [None]:
model_quality_monitor_schedule_name = (
    f"xgboost-dm-model-monitoring-schedule-{datetime.utcnow():%Y-%m-%d-%H%M}"
)

In [None]:
# Create an EndpointInput configuration.
endpointInput = EndpointInput(
    endpoint_name=predictor.endpoint_name,
    probability_attribute="0",
    probability_threshold_attribute=0.5,
    destination="/opt/ml/processing/input_data",
)

In [None]:
# Define a monitoring schedule.
response = model_quality_monitor.create_monitoring_schedule(
    monitor_schedule_name=model_quality_monitor_schedule_name,
    endpoint_input=endpointInput,
    output_s3_uri=model_quality_baseline_job_result_uri,
    problem_type="BinaryClassification",
    ground_truth_input=s3_ground_truth_upload_path,
    constraints=latest_model_quality_baseline_job.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

In [None]:
# Check the model monitor was created.
predictor.list_monitors()

In [None]:
# You will see the monitoring schedule in the 'Scheduled' status.
model_quality_monitor.describe_schedule()

In [None]:
# Initially there will be no executions since the first execution happens at the top of the hour
# Note that it is common for the execution to launch up to 20 min after the hour.
executions = model_quality_monitor.list_executions()
executions[:5]

In [None]:
invoke_endpoint_thread.terminate()
ground_truth_thread.terminate()

In [None]:
model_monitors = predictor.list_monitors()

for monitor in model_monitors:
    monitor.stop_monitoring_schedule()
    monitor.delete_monitoring_schedule()