# CloudWatch

In [1]:
%%time
import boto3
import pandas as pd
from datetime import datetime, timedelta, timezone
import io
import json
from time import sleep
from threading import Thread

import sagemaker
from sagemaker.session import Session
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker import get_execution_role, session, Session, image_uris
from sagemaker.s3 import S3Downloader, S3Uploader
from sagemaker.processing import ProcessingJob
from sagemaker.serializers import CSVSerializer
from sagemaker.model_monitor import DataCaptureConfig
from sagemaker.huggingface.model import HuggingFaceModel

CPU times: user 685 ms, sys: 87.5 ms, total: 773 ms
Wall time: 734 ms


In [2]:
##S3 prefixes
bucket_name = 'aai-540-final-data'
region_name = 'us-west-2'
session = sagemaker.Session(boto3.Session(region_name=region_name))
s3 = session.boto_session.client('s3')
s3_path = 'athena/results/'

prefix = "sagemaker/AIEmotions-ModelQualityMonitor"
data_capture_prefix = f"{prefix}/datacapture"
s3_capture_upload_path = f"s3://{bucket_name}/{data_capture_prefix}"


ground_truth_upload_path = (
    f"s3://{bucket_name}/{prefix}/ground_truth_data/{datetime.now():%Y-%m-%d-%H-%M-%S}"
)

reports_prefix = f"{prefix}/reports"
s3_report_path = f"s3://{bucket_name}/{reports_prefix}"

##Get the model monitor image
monitor_image_uri = image_uris.retrieve(framework="model-monitor", region=region_name)

print("Image URI:", monitor_image_uri)
print(f"Capture path: {s3_capture_upload_path}")
print(f"Ground truth path: {ground_truth_upload_path}")
print(f"Report path: {s3_report_path}")

Image URI: 159807026194.dkr.ecr.us-west-2.amazonaws.com/sagemaker-model-monitor-analyzer
Capture path: s3://aai-540-final-data/sagemaker/AIEmotions-ModelQualityMonitor/datacapture
Ground truth path: s3://aai-540-final-data/sagemaker/AIEmotions-ModelQualityMonitor/ground_truth_data/2024-02-18-16-00-46
Report path: s3://aai-540-final-data/sagemaker/AIEmotions-ModelQualityMonitor/reports


In [4]:
# Access Queried Test Data
bucket_name = 'aai-540-final-data'
s3_test_query = 'athena/results/f4cf6706-1ba6-4538-ab86-13c8137facb5.csv'
test_data_obj = s3.get_object(Bucket=bucket_name, Key = s3_test_query)
df_test = pd.read_csv(io.BytesIO(test_data_obj['Body'].read()))

In [5]:
df_test_new = df_test[['text', 'emotions']]
del(df_test)
df_test_new.head()

Unnamed: 0,text,emotions
0,WE can't always have what we want. I see them ...,sadness
1,"I work 12 hour shifts, and agree. Tea is life-...",neutral
2,I don't even know who he is but I think he app...,neutral
3,COUNT YOUR CALORIES. EAT ABOVE YOUR TDEE EVERY...,neutral
4,Federal employee here. My amazing tenant offer...,affectionate


In [6]:
#Retrieve Model
role = sagemaker.get_execution_role()
s3 = boto3.client('s3')
model_dir = 'models/base_model/'
tar_file = 'model.tar.gz'


model_name = f"AIEmotion-base-model-monitor-{datetime.utcnow():%Y-%m-%d-%H%M}"

model_data = f's3://{bucket_name}/{model_dir}{tar_file}'
tensorflow_version = '2.6.3'
transformers_version='4.17.0'
py_version = 'py38'
huggingface_model = HuggingFaceModel(model_data=model_data,
                                     role=role,
                                     transformers_version=transformers_version,
                                     tensorflow_version=tensorflow_version,
                                     py_version=py_version,
                                     entry_point='inference.py'
                                   )

In [7]:
endpoint_name = f"AIEmotion-base-model-quality-monitor-{datetime.utcnow():%Y-%m-%d-%H%M}"
print("EndpointName =", endpoint_name)

data_capture_config = DataCaptureConfig(
    enable_capture=True, sampling_percentage=100, destination_s3_uri=s3_capture_upload_path
)

local_predictor = huggingface_model.deploy(
    initial_instance_count=1,
    instance_type='ml.c5.large',
    endpoint_name=endpoint_name,
    data_capture_config=data_capture_config,
)

EndpointName = AIEmotion-base-model-quality-monitor-2024-02-18-1600
-----------------!

<sagemaker.huggingface.model.HuggingFacePredictor at 0x7f90203407c0>

In [8]:
from sagemaker.predictor import Predictor

predictor = Predictor(
    endpoint_name=endpoint_name, sagemaker_session=session, serializer=CSVSerializer()
)

In [9]:
# Predictions as baseline dataset
baseline_prefix = prefix + "/baselining"
baseline_data_prefix = baseline_prefix + "/data"
baseline_results_prefix = baseline_prefix + "/results"

baseline_data_uri = f"s3://{bucket_name}/{baseline_data_prefix}"
baseline_results_uri = f"s3://{bucket_name}/{baseline_results_prefix}"
print(f"Baseline data uri: {baseline_data_uri}")
print(f"Baseline results uri: {baseline_results_uri}")

Baseline data uri: s3://aai-540-final-data/sagemaker/AIEmotions-ModelQualityMonitor/baselining/data
Baseline results uri: s3://aai-540-final-data/sagemaker/AIEmotions-ModelQualityMonitor/baselining/results


In [10]:
# Grab a sample of the dataset
df_test_subset = df_test_new.sample(500)
df_test_subset = df_test_subset.reset_index(drop=True)

In [11]:
def calculate_predictions(results):
    df_results = pd.DataFrame(columns = ['pred_labels', 'probabilities'])
    for result in results:
        res = json.loads(result[0])
        pred_labels = list(res.keys())
        prob = list(res.values())
        row = {'pred_labels': pred_labels, 'probabilities': prob}
        df_results = df_results.append(row, ignore_index = True)
    return df_results

In [12]:
predictions = df_test_subset.apply(lambda x: local_predictor.predict({'text': x['text']}), axis = 1)
predictions

0      [{"neutral": 0.690695583820343, "happy": 0.246...
1      [{"anger": 0.5461097955703735, "sad": 0.338331...
2      [{"surprised": 0.754666805267334, "neutral": 0...
3      [{"neutral": 0.4230937957763672, "optimistic":...
4      [{"happy": 0.9121904969215393, "neutral": 0.05...
                             ...                        
495    [{"sad": 0.9500253796577454, "neutral": 0.0329...
496    [{"neutral": 0.9266665577888489, "happy": 0.03...
497    [{"affectionate": 0.9746029376983643, "neutral...
498    [{"happy": 0.831647515296936, "affectionate": ...
499    [{"happy": 0.4278762936592102, "optimistic": 0...
Length: 500, dtype: object

In [12]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

df_preds = calculate_predictions(predictions)
df_preds

Unnamed: 0,pred_labels,probabilities
0,"[neutral, anger, sad]","[0.9746280312538147, 0.0172532107681036, 0.002..."
1,"[neutral, surprised, anger]","[0.671329915523529, 0.17627429962158203, 0.055..."
2,"[happy, affectionate, sad]","[0.9748302102088928, 0.006392994895577431, 0.0..."
3,"[affectionate, happy, optimistic]","[0.45750537514686584, 0.4294200539588928, 0.07..."
4,"[neutral, affectionate, anger]","[0.4965571463108063, 0.39938440918922424, 0.02..."
...,...,...
495,"[optimistic, neutral, surprised]","[0.9062561988830566, 0.051451414823532104, 0.0..."
496,"[happy, affectionate, optimistic]","[0.5446568131446838, 0.4242367148399353, 0.014..."
497,"[happy, neutral, anger]","[0.8061304688453674, 0.15399974584579468, 0.02..."
498,"[neutral, happy, surprised]","[0.41303831338882446, 0.36112791299819946, 0.1..."


In [13]:
# Extract the first element of each tuple
df_preds['pred_labels'] = df_preds['pred_labels'].apply(lambda x: x[0])
df_preds['probabilities'] = df_preds['probabilities'].apply(lambda x: x[0])
df_preds

Unnamed: 0,pred_labels,probabilities
0,neutral,0.974628
1,neutral,0.671330
2,happy,0.974830
3,affectionate,0.457505
4,neutral,0.496557
...,...,...
495,optimistic,0.906256
496,happy,0.544657
497,happy,0.806130
498,neutral,0.413038


In [14]:
# Concat datasets
validate_dataset = pd.concat([df_test_subset, df_preds], ignore_index=True, axis = 1)
validate_dataset = validate_dataset.rename(columns = {0: 'text', 
                                                      1: 'true_label', 
                                                      2: 'pred_label', 
                                                      3: 'probability'})

validate_dataset['pred_label'] = validate_dataset['pred_label'].replace({'sad': 'sadness', 'surprised': 'surprise'})
validate_dataset

Unnamed: 0,text,true_label,pred_label,probability
0,[NAME] throws the puck to the slot for a cutti...,neutral,neutral,0.974628
1,Looks like the other driver to the right was p...,happy,neutral,0.671330
2,Thank you & my heartfelt condolences. Snap in ...,happy,happy,0.974830
3,Nice glad to hear man!,affectionate,affectionate,0.457505
4,What a unit,affectionate,neutral,0.496557
...,...,...,...,...
495,I hope google does this so that everyone would...,optimistic,optimistic,0.906256
496,I love how happy he looks while listening to t...,affectionate,happy,0.544657
497,Not even going to open the link. Already know ...,happy,happy,0.806130
498,Well...to be fair then. :),happy,neutral,0.413038


In [15]:
# Export to CSV
validate_dataset_name = 'validate_dataset.csv'
validate_dataset.to_csv(validate_dataset_name, index=False) 

In [16]:
# Upload baseline
baseline_dataset_uri = S3Uploader.upload(f"{validate_dataset_name}", baseline_data_uri)
baseline_dataset_uri

's3://aai-540-final-data/sagemaker/AIEmotions-ModelQualityMonitor/baselining/data/validate_dataset.csv'

## Setup Baseline

In [10]:
from sagemaker.model_monitor import ModelQualityMonitor
from sagemaker.model_monitor import EndpointInput
from sagemaker.model_monitor.dataset_format import DatasetFormat

baseline_dataset_uri = 's3://aai-540-final-data/sagemaker/AIEmotions-ModelQualityMonitor/baselining/data/validate_dataset.csv'

emotions_base_model_quality_monitor = ModelQualityMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.large',
    volume_size_in_gb=20,
    max_runtime_in_seconds=1800,
    sagemaker_session=session,
)

# Name of the model quality baseline job
baseline_job_name = f"AIEmotions-base-model-baseline-job-{datetime.utcnow():%Y-%m-%d-%H%M}"


# Execute the baseline suggestion job.
job = emotions_base_model_quality_monitor.suggest_baseline(
    job_name=baseline_job_name,
    baseline_dataset=baseline_dataset_uri,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    problem_type="MulticlassClassification",
    inference_attribute="pred_label",
    probability_attribute="probability",
    ground_truth_attribute="true_label",
)
job.wait(logs=False)

INFO:sagemaker:Creating processing-job with name AIEmotions-base-model-baseline-job-2024-02-18-1442



Job Name:  AIEmotions-base-model-baseline-job-2024-02-18-1442
Inputs:  [{'InputName': 'baseline_dataset_input', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://aai-540-final-data/sagemaker/AIEmotions-ModelQualityMonitor/baselining/data/validate_dataset.csv', 'LocalPath': '/opt/ml/processing/input/baseline_dataset_input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'monitoring_output', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://aai-540-final-data/sagemaker/AIEmotions-ModelQualityMonitor/baselining/results', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}}]
...........................................................................!

In [11]:
baseline_job = emotions_base_model_quality_monitor.latest_baselining_job

In [12]:
multiclass_metrics = baseline_job.baseline_statistics().body_dict["multiclass_classification_metrics"]
pd.json_normalize(multiclass_metrics).T

Unnamed: 0,0
confusion_matrix.disgust.disgust,6.000000
confusion_matrix.disgust.surprise,0.000000
"confusion_matrix.disgust._we'll_have_more_occasions_of_hearing_it_again_though_:)""",0.000000
confusion_matrix.disgust.happy,0.000000
confusion_matrix.disgust.optimistic,0.000000
...,...
weighted_f0_5_best_constant_classifier.standard_deviation,0.006162
weighted_f1_best_constant_classifier.value,0.117859
weighted_f1_best_constant_classifier.standard_deviation,0.007611
weighted_f2_best_constant_classifier.value,0.179094


In [13]:
pd.DataFrame(baseline_job.suggested_constraints().body_dict["multiclass_classification_constraints"]).T

Unnamed: 0,threshold,comparison_operator
accuracy,0.588,LessThanThreshold
weighted_recall,0.588,LessThanThreshold
weighted_precision,0.595712,LessThanThreshold
weighted_f0_5,0.589928,LessThanThreshold
weighted_f1,0.585759,LessThanThreshold
weighted_f2,0.585843,LessThanThreshold


## Setup continuous model monitoring

In [13]:
# Generate a small sample from our testing data
synthetic_sample = df_test_new['text'].sample(200)
synthetic_sample.to_csv('synthetic_data.csv', index=False)

In [16]:
def invoke_endpoint(ep_name, file_name):
    with open(file_name, "r") as f:
        i = 0
        for row in f:
            payload = row.rstrip("\n")
            response = session.sagemaker_runtime_client.invoke_endpoint(
                EndpointName=endpoint_name,
                ContentType="text/csv",
                Body=payload,
                InferenceId=str(i),  # unique ID per row
            )["Body"].read()
            i += 1
            sleep(1)


def invoke_endpoint_forever():
    while True:
        try:
            invoke_endpoint(endpoint_name, 'synthetic_data.csv')
        except session.sagemaker_runtime_client.exceptions.ValidationError:
            pass


thread = Thread(target=invoke_endpoint_forever)
thread.start()

In [17]:
print("Waiting for captures to show up", end="")
for _ in range(120):
    capture_files = sorted(S3Downloader.list(f"{s3_capture_upload_path}/{endpoint_name}"))
    if capture_files:
        capture_file = S3Downloader.read_file(capture_files[-1]).split("\n")
        capture_record = json.loads(capture_file[0])
        if "inferenceId" in capture_record["eventMetadata"]:
            break
    print(".", end="", flush=True)
    sleep(1)
print()
print("Found Capture Files:")
print("\n ".join(capture_files[-3:]))

Waiting for captures to show up........................................................................................................................
Found Capture Files:



In [18]:
print("\n".join(capture_file[-3:-1]))

NameError: name 'capture_file' is not defined

In [None]:
print(json.dumps(capture_record, indent=2))

In [None]:
import random


def ground_truth_with_id(inference_id):
    random.seed(inference_id)  # to get consistent results
    label_int = random.randint(0, 9)
    emotions = ['sadness', 'neutral', 'affectionate', 'surprise', 'happy', 'anger',
       'optimistic', 'fear', 'disgust']
    return {
        "groundTruthData": {
            "data": emotions[label_int],
            "encoding": "CSV",
        },
        "eventMetadata": {
            "eventId": str(inference_id),
        },
        "eventVersion": "0",
    }


def upload_ground_truth(records, upload_time):
    fake_records = [json.dumps(r) for r in records]
    data_to_upload = "\n".join(fake_records)
    target_s3_uri = f"{ground_truth_upload_path}/{upload_time:%Y/%m/%d/%H/%M%S}.jsonl"
    print(f"Uploading {len(fake_records)} records to", target_s3_uri)
    S3Uploader.upload_string_as_file_body(data_to_upload, target_s3_uri)

In [None]:
NUM_GROUND_TRUTH_RECORDS = 200 


def generate_fake_ground_truth_forever():
    j = 0
    while True:
        fake_records = [ground_truth_with_id(i) for i in range(NUM_GROUND_TRUTH_RECORDS)]
        upload_ground_truth(fake_records, datetime.utcnow())
        j = (j + 1) % 5
        sleep(60 * 60)  # do this once an hour


gt_thread = Thread(target=generate_fake_ground_truth_forever)
gt_thread.start()

In [None]:
##Monitoring schedule name
model_monitor_schedule_name = (f"AIEmotion-base-model-monitoring-schedule-{datetime.utcnow():%Y-%m-%d-%H%M}")

In [None]:
# Create an enpointInput
endpointInput = EndpointInput(
    endpoint_name=predictor.endpoint_name,
    probability_attribute="0",
    probability_threshold_attribute=0.5,
    destination="/opt/ml/processing/input_data",
)

In [None]:
# Create the monitoring schedule to execute every hour.
from sagemaker.model_monitor import CronExpressionGenerator

response = emotions_base_model_quality_monitor.create_monitoring_schedule(
    monitor_schedule_name=model_monitor_schedule_name,
    endpoint_input=endpointInput,
    output_s3_uri=baseline_results_uri,
    problem_type="MulticlassClassification",
    ground_truth_input=ground_truth_upload_path,
    constraints=baseline_job.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

In [None]:
# Create the monitoring schedule
# You will see the monitoring schedule in the 'Scheduled' status
emotions_base_model_quality_monitor.describe_schedule()

In [None]:
executions = churn_model_quality_monitor.list_executions()
executions

In [None]:
# Wait for the first execution of the monitoring_schedule
print("Waiting for first execution", end="")
while True:
    execution = churn_model_quality_monitor.describe_schedule().get(
        "LastMonitoringExecutionSummary"
    )
    if execution:
        break
    print(".", end="", flush=True)
    sleep(10)
print()
print("Execution found!")

In [None]:
while not executions:
    executions = churn_model_quality_monitor.list_executions()
    print(".", end="", flush=True)
    sleep(10)
latest_execution = executions[-1]
latest_execution.describe()

In [None]:
status = execution["MonitoringExecutionStatus"]

while status in ["Pending", "InProgress"]:
    print("Waiting for execution to finish", end="")
    latest_execution.wait(logs=False)
    latest_job = latest_execution.describe()
    print()
    print(f"{latest_job['ProcessingJobName']} job status:", latest_job["ProcessingJobStatus"])
    print(
        f"{latest_job['ProcessingJobName']} job exit message, if any:",
        latest_job.get("ExitMessage"),
    )
    print(
        f"{latest_job['ProcessingJobName']} job failure reason, if any:",
        latest_job.get("FailureReason"),
    )
    sleep(
        30
    )  # model quality executions consist of two Processing jobs, wait for second job to start
    latest_execution = churn_model_quality_monitor.list_executions()[-1]
    execution = churn_model_quality_monitor.describe_schedule()["LastMonitoringExecutionSummary"]
    status = execution["MonitoringExecutionStatus"]

print("Execution status is:", status)

if status != "Completed":
    print(execution)
    print(
        "====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures."
    )

In [None]:
latest_execution = churn_model_quality_monitor.list_executions()[-1]
report_uri = latest_execution.describe()["ProcessingOutputConfig"]["Outputs"][0]["S3Output"][
    "S3Uri"
]
print("Report Uri:", report_uri)

## Analyze Model Quality

In [None]:
# Create CloudWatch client
cw_client = boto3.Session().client("cloudwatch")

namespace = "aws/sagemaker/Endpoints/model-metrics"

cw_dimensions = [
    {"Name": "Endpoint", "Value": endpoint_name},
    {"Name": "MonitoringSchedule", "Value": model_monitor_schedule_name},
]

In [None]:
# List metrics through the pagination interface
paginator = cw_client.get_paginator("list_metrics")

for response in paginator.paginate(Dimensions=cw_dimensions, Namespace=namespace):
    model_quality_metrics = response["Metrics"]
    for metric in model_quality_metrics:
        print(metric["MetricName"])

In [None]:
alarm_name = "MODEL_QUALITY_F2_SCORE"
alarm_desc = (
    "Trigger an CloudWatch alarm when the f2 score drifts away from the baseline constraints"
)
mdoel_quality_f2_drift_threshold = (
    0.55  ##Setting this threshold purposefully low to see the alarm quickly.
)
metric_name = "f2"
namespace = "aws/sagemaker/Endpoints/model-metrics"

cw_client.put_metric_alarm(
    AlarmName=alarm_name,
    AlarmDescription=alarm_desc,
    ActionsEnabled=True,
    MetricName=metric_name,
    Namespace=namespace,
    Statistic="Average",
    Dimensions=[
        {"Name": "Endpoint", "Value": endpoint_name},
        {"Name": "MonitoringSchedule", "Value": churn_monitor_schedule_name},
    ],
    Period=600,
    EvaluationPeriods=1,
    DatapointsToAlarm=1,
    Threshold=mdoel_quality_f2_drift_threshold,
    ComparisonOperator="LessThanOrEqualToThreshold",
    TreatMissingData="breaching",
)

## Release Resources

In [29]:
local_predictor.delete_endpoint()
predictor.delete_endpoint()

INFO:sagemaker:Deleting endpoint configuration with name: AIEmotion-base-model-quality-monitor-2024-02-16-2146
INFO:sagemaker:Deleting endpoint with name: AIEmotion-base-model-quality-monitor-2024-02-16-2146


In [30]:
emotions_base_model_quality_monitor.delete_monitoring_schedule()
sleep(60)  # actually wait for the deletion


Deleting Monitoring Schedule with name: None


ParamValidationError: Parameter validation failed:
Invalid type for parameter MonitoringScheduleName, value: None, type: <class 'NoneType'>, valid types: <class 'str'>