In [None]:
import os
import boto3
import re
import json
import sagemaker
from sagemaker import get_execution_role, session
from time import gmtime, strftime

region = boto3.Session().region_name

role = get_execution_role()
print("RoleArn: {}".format(role))

sm_client = boto3.client('sagemaker')
sm_runtime = boto3.client('sagemaker-runtime')

boto_session = boto3.Session()

bucket = session.Session(boto3.Session()).default_bucket()
print("Demo Bucket: {}".format(bucket))

prefix = "sm-end-to-end-d96a4b92/demo-model-monitor"

data_capture_prefix = "{}/datacapture".format(prefix)
s3_capture_upload_path = "s3://{}/{}".format(bucket, data_capture_prefix)

reports_prefix = "{}/reports".format(prefix)
s3_report_path = "s3://{}/{}".format(bucket, reports_prefix)

print("Capture path: {}".format(s3_capture_upload_path))
print("Report path: {}".format(s3_report_path))

In [3]:
# Name of the endpoint already deployed

endpoint_name = 'demo-e2e-model-monitor-endpoint-2021-07-19-17-40-22'

In [None]:
# Invoke the deployed model

from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer
import time

predictor = Predictor(endpoint_name=endpoint_name, serializer=CSVSerializer())

!head -10 data/test.csv > data/test_sample.csv
print("Sending test traffic to the endpoint {}. \nPlease wait...".format(endpoint_name))

with open("data/test.csv", "r") as f:
    for row in f:
        payload = row.rstrip("\n")
        payload = ','.join(payload.split(',')[1:])
        response = predictor.predict(data=payload)

In [None]:
s3_client = boto3.Session().client('s3')

current_endpoint_capture_prefix = '{}/{}'.format(data_capture_prefix, endpoint_name)

result = s3_client.list_objects(Bucket=bucket, Prefix=current_endpoint_capture_prefix)

capture_files = [capture_file.get("Key") for capture_file in result.get('Contents')]

print("Found Capture Files:")
print("\n ".join(capture_files[:5]))

### Data Quality Model Monitor

#### Create baseline

In [None]:
# copy over the training dataset to Amazon S3 (if you already have it in Amazon S3, you could reuse it)
baseline_prefix = prefix + "/data_quality_baselining"
baseline_data_prefix = baseline_prefix + "/data"
baseline_results_prefix = baseline_prefix + "/results"

baseline_data_uri = "s3://{}/{}".format(bucket, baseline_data_prefix)
baseline_results_uri = "s3://{}/{}".format(bucket, baseline_results_prefix)
print("Baseline data uri: {}".format(baseline_data_uri))
print("Baseline results uri: {}".format(baseline_results_uri))

In [7]:
training_data_file = open("data/train.csv", "rb")
s3_key = os.path.join(baseline_prefix, "data", "training-dataset.csv")
boto3.Session().resource("s3").Bucket(bucket).Object(s3_key).upload_fileobj(training_data_file)

In [None]:
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

data_quality_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

data_quality_monitor.suggest_baseline(
    baseline_dataset=baseline_data_uri + "/training-dataset.csv",
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    wait=True,
)

In [None]:
s3_client = boto3.Session().client("s3")
result = s3_client.list_objects(Bucket=bucket, Prefix=baseline_results_prefix)
report_files = [report_file.get("Key") for report_file in result.get("Contents")]
print("Found Files:")
print("\n ".join(report_files))

In [None]:
import pandas as pd

baseline_job = data_quality_monitor.latest_baselining_job
schema_df = pd.io.json.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df.head(10)

In [None]:
constraints_df = pd.io.json.json_normalize(
    baseline_job.suggested_constraints().body_dict["features"]
)
constraints_df.head(10)

#### Analyzing collected data for data quality issuesÂ¶

When you have collected the data above, analyze and monitor the data with Monitoring Schedules

In [None]:
from sagemaker.model_monitor import CronExpressionGenerator
from time import gmtime, strftime

mon_schedule_name = "demo-data-quality-monitor-" + strftime(
    "%Y-%m-%d-%H-%M-%S", gmtime()
)
data_quality_monitor.create_monitoring_schedule(
    monitor_schedule_name=mon_schedule_name,
    endpoint_input=predictor.endpoint,
    output_s3_uri=s3_report_path,
    statistics=data_quality_monitor.baseline_statistics(),
    constraints=data_quality_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

In [None]:
### Start generating artificial traffic

from threading import Thread
from time import sleep
import time

endpoint_name = predictor.endpoint
runtime_client = boto3.client("runtime.sagemaker")

# (just repeating code from above for convenience/ able to run this section independently)
def invoke_endpoint(ep_name, file_name, runtime_client):
    with open(file_name, "r") as f:
        for row in f:
            payload = row.rstrip("\n")
            payload = ','.join(payload.split(',')[1:])
            response = runtime_client.invoke_endpoint(
                EndpointName=ep_name, ContentType="text/csv", Body=payload
            )
            response["Body"].read()
            time.sleep(1)


def invoke_endpoint_forever():
    while True:
        invoke_endpoint(endpoint_name, "data/test.csv", runtime_client)


thread = Thread(target=invoke_endpoint_forever)
thread.start()

In [14]:
desc_schedule_result = data_quality_monitor.describe_schedule()
print("Schedule status: {}".format(desc_schedule_result["MonitoringScheduleStatus"]))

Schedule status: Pending


In [None]:
mon_executions = data_quality_monitor.list_executions()
print(
    "We created a hourly schedule above and it will kick off executions ON the hour (plus 0 - 20 min buffer.\nWe will have to wait till we hit the hour..."
)

while len(mon_executions) == 0:
    print("Waiting for the 1st execution to happen...")
    time.sleep(60)
    mon_executions = data_quality_monitor.list_executions()

In [None]:
# Inspect a specific execution

latest_execution = mon_executions[
    -1
]  # latest execution's index is -1, second to last is -2 and so on..
time.sleep(60)
latest_execution.wait(logs=False)

print("Latest execution status: {}".format(latest_execution.describe()["ProcessingJobStatus"]))
print("Latest execution result: {}".format(latest_execution.describe()["ExitMessage"]))

latest_job = latest_execution.describe()
if latest_job["ProcessingJobStatus"] != "Completed":
    print(
        "====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures."
    )

In [None]:
report_uri = latest_execution.output.destination
print("Report Uri: {}".format(report_uri))

#### List the generated reports

In [None]:
from urllib.parse import urlparse

s3uri = urlparse(report_uri)
report_bucket = s3uri.netloc
report_key = s3uri.path.lstrip("/")
print("Report bucket: {}".format(report_bucket))
print("Report key: {}".format(report_key))

s3_client = boto3.Session().client("s3")
result = s3_client.list_objects(Bucket=report_bucket, Prefix=report_key)
report_files = [report_file.get("Key") for report_file in result.get("Contents")]
print("Found Report Files:")
print("\n ".join(report_files))

#### Violations report

In [None]:
violations = data_quality_monitor.latest_monitoring_constraint_violations()
pd.set_option("display.max_colwidth", -1)
constraints_df = pd.io.json.json_normalize(violations.body_dict["violations"])
constraints_df.head(10)

#### Stop Monitoring Schedule / Delete Resources

In [None]:
# import boto3
# sm_client = boto3.client('sagemaker')
# sm_client.stop_monitoring_schedule(MonitoringScheduleName = 'demo-data-quality-monitor-2021-07-19-15-53-15')

# sm_client.delete_endpoint(EndpointName = '')
# # sm_client.delete_monitoring_schedule(MonitoringScheduleName = 'demo-data-quality-monitor-2021-07-19-15-53-15')

# # predictor.delete_endpoint()

# # predictor.delete_model()