## SageMaker Model Monitor with Batch Transform

In [None]:
%%time

# Handful of configuration

import os
import boto3
import re
import json
from sagemaker import get_execution_role, session

region = boto3.Session().region_name

role = get_execution_role()
print("RoleArn: {}".format(role))

# You can use a different bucket, but make sure the role you chose for this notebook
# has the s3:PutObject permissions. This is the bucket into which the data is captured
bucket = session.Session(boto3.Session()).default_bucket()
print("Demo Bucket: {}".format(bucket))
prefix = "sagemaker/DEMO-ModelMonitor"

data_capture_prefix = "{}/datacapture".format(prefix)
s3_capture_upload_path = "s3://{}/{}".format(bucket, data_capture_prefix)
reports_prefix = "{}/reports".format(prefix)
s3_report_path = "s3://{}/{}".format(bucket, reports_prefix)

print("Capture path: {}".format(s3_capture_upload_path))
print("Report path: {}".format(s3_report_path))

### 1) Deploy the model to Amazon SageMaker
Create a SageMaker Model from pre-trained churn prediction model. 

In [None]:
model_file = open("model/xgb-churn-prediction-model.tar.gz", "rb")
s3_key = os.path.join(prefix, "xgb-churn-prediction-model.tar.gz")
boto3.Session().resource("s3").Bucket(bucket).Object(s3_key).upload_fileobj(model_file)

In [None]:
from time import gmtime, strftime
from sagemaker.model import Model
from sagemaker.image_uris import retrieve

model_name = "DEMO-xgb-churn-pred-model-monitor-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
model_url = "https://{}.s3-{}.amazonaws.com/{}/xgb-churn-prediction-model.tar.gz".format(
    bucket, region, prefix
)

image_uri = retrieve("xgboost", boto3.Session().region_name, "0.90-1")

model = Model(image_uri=image_uri, model_data=model_url, role=role)

### 2) Upload test data for batch inference that will be used as input for a Batch Transform Job

In [None]:
!aws s3 cp test_data/test-dataset-input-cols.csv s3://{bucket}/transform-input/test-dataset-input-cols.csv


### 3) Create the Batch Transform Job
Transform Job is configured with `join_source` in order to associate the input with the output predictions

In [None]:
transfomer = model.transformer(instance_count=1, instance_type="ml.m4.xlarge",accept="text/csv",assemble_with="Line")

transfomer.transform("s3://{}/transform-input/test-dataset-input-cols.csv".format(bucket),
                     content_type="text/csv",
                     join_source="Input",
                     split_type="Line")

### 4) Download the Batch Transform output
The output will contain the inference requests and predictions as the last column in the CSV

In [None]:
! aws s3 cp $transfomer.output_path ./batch-output --recursive
! mkdir ./batch-output-jsonl
! head -1 batch-output/test-dataset-input-cols.csv.out

### 5) Convert Transform Jobs CSV output to JSONL "Data Capture".
This is done to be compliant with what Model Monitor expects (JSON Lines). One could also look at using a SageMaker Processing Job to this when the Transform Job is complete.  

In [None]:
import csv
import json
import os
import datetime as dt

def make_json(csv_filepath, jsonl_filepath):

    dicts_array=[]
    with open(csv_filepath) as f:
        lines = f.readlines()

        for line in lines:
            #example jsonline from real Data Capture
            line_dict = {"captureData":{"endpointInput":{"observedContentType":"text/csv","mode":"INPUT","data":"132,25,113.2,96,269.9,107,229.1,87,7.1,7,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,1,0,0,1","encoding":"CSV"},"endpointOutput":{"observedContentType":"text/csv; charset=utf-8","mode":"OUTPUT","data":"0.01076381653547287","encoding":"CSV"}},"eventMetadata":{"eventId":"8233817c-9ab2-4c7f-b0af-7f144ff5954e","inferenceTime":"2022-03-24T01:39:39Z"},"eventVersion":"0"}

            data_row = line.split(",")

            input = ','.join(data_row[0:len(data_row)-1])
            output = data_row[len(data_row)-1].strip("\n")

            line_dict["captureData"]["endpointInput"]["data"]=str(input)
            line_dict["captureData"]["endpointOutput"]["data"]=str(output)
            now = dt.datetime.now()
            line_dict["eventMetadata"]["inferenceTime"] = now.strftime("%Y-%m-%dT%H:%M:%SZ")

            dicts_array.append(line_dict)

    with open(jsonl_filepath, 'w') as outfile:
        for entry in dicts_array:
            json.dump(entry, outfile,separators=(',', ':'))
            outfile.write('\n')    

root_dir="./batch-output"
output_dir = "./batch-output-jsonl"

s3_batch_output_jsonl = "s3://{}/sagemaker/DEMO-ModelMonitor/datacapture/churn-pred-model-monitor/AllTraffic/{}/{}/{}/{}".format(bucket,dt.datetime.now().year, '{:02}'.format(dt.datetime.now().month) , dt.datetime.now().day, '{:02}'.format(dt.datetime.now().hour))

for subdir, dirs, files in os.walk(root_dir):
    for file in files:
        csv_filepath = subdir + os.sep + file
        print("Converting {}".format(csv_filepath))
        jsonl_filepath = '{}/{}.jsonl'.format(output_dir,file)
        make_json(csv_filepath, jsonl_filepath)

### 6) Upload JSONL data to S3 for input to Model Monitor

In [None]:
!aws s3 cp ./batch-output-jsonl $s3_batch_output_jsonl --recursive

### 7) Create a Baseline that will be used by Model Monitor
In general this could be done parrallel to the Transform Job

In [None]:
# copy over the training dataset to Amazon S3 (if you already have it in Amazon S3, you could reuse it)
baseline_prefix = prefix + "/baselining"
baseline_data_prefix = baseline_prefix + "/data"
baseline_results_prefix = baseline_prefix + "/results"

baseline_data_uri = "s3://{}/{}".format(bucket, baseline_data_prefix)
baseline_results_uri = "s3://{}/{}".format(bucket, baseline_results_prefix)
print("Baseline data uri: {}".format(baseline_data_uri))
print("Baseline results uri: {}".format(baseline_results_uri))

In [None]:
training_data_file = open("test_data/training-dataset-with-header.csv", "rb")
s3_key = os.path.join(baseline_prefix, "data", "training-dataset-with-header.csv")
boto3.Session().resource("s3").Bucket(bucket).Object(s3_key).upload_fileobj(training_data_file)

In [None]:
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

my_default_monitor.suggest_baseline(
    baseline_dataset=baseline_data_uri + "/training-dataset-with-header.csv",
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    wait=True,
)


In [None]:
s3_client = boto3.Session().client("s3")
result = s3_client.list_objects(Bucket=bucket, Prefix=baseline_results_prefix)
report_files = [report_file.get("Key") for report_file in result.get("Contents")]
print("Found Files:")
print("\n ".join(report_files))

In [None]:
import pandas as pd

baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.io.json.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df.head(10)

In [None]:
constraints_df = pd.io.json.json_normalize(
    baseline_job.suggested_constraints().body_dict["features"]
)
constraints_df.head(10)

### 8) Launch the Model Monitor Processing Job manually option 
While this is done manually here one could look at using an EventBridge rule to trigger the running of the Model Monitor Processing Job when the Transform Job Completes (Assuming baselining has already be done)

In [None]:
import monitoringjob_utils

reports="monitor-reports"
instance_type="ml.m5.xlarge"

data_capture_path=s3_batch_output_jsonl

statistics_path= "{}/statistics.json".format(baseline_results_uri)
constraints_path="{}/constraints.json".format(baseline_results_uri)
reports_path=s3_report_path
instance_count=1
preprocessor_path=None
postprocessor_path=None
publish_cloudwatch_metrics='Disabled'

monitoringjob_utils.run_model_monitor_job_processor(region, 
                                                    instance_type, 
                                                    role, 
                                                    data_capture_path,
                                                    statistics_path,
                                                    constraints_path, 
                                                    reports_path,
                                                    instance_count,
                                                    preprocessor_path, 
                                                    postprocessor_path,
                                                    publish_cloudwatch_metrics
                                                   )

In [None]:
from urllib.parse import urlparse

s3uri = urlparse(reports_path)
report_bucket = s3uri.netloc
report_key = s3uri.path.lstrip("/")
print("Report bucket: {}".format(report_bucket))
print("Report key: {}".format(report_key))

s3_client = boto3.Session().client("s3")
result = s3_client.list_objects(Bucket=report_bucket, Prefix=report_key)
report_files = [report_file.get("Key") for report_file in result.get("Contents")]
print("Found Report Files:")
print("\n ".join(report_files))

### 9) Dummy Endpoint option
For this option we will launch a dummy Endpoint configured with data capture, however we will point the data capture to the S3 location with our JSONL Batch Transform output. 

Note: the data must needs to be in the corect UTC hour format S3 path when the Monitoring schedule will run.

i.e if the Monitoring schedule will run just after the 22nd hour on the 25th March 2022. Then the path must look like:

`../datacapture/churn-pred-model-monitor/AllTraffic/2022/03/25/22/`

Specifically:

`s3://<YourBucket>/<YourKey>/datacapture/churn-pred-model-monitor/AllTraffic/2022/03/25/19/<YourDataFile(s)>`


In [None]:
data_capture_path=s3_batch_output_jsonl

spoof_data_capture = data_capture_path.split("/churn") # Retrieve up to ../datacapture S3 path.
spoof_data_capture = spoof_data_capture[0]
spoof_data_capture

data_capture_path=s3_batch_output_jsonl

In [None]:
from sagemaker.model_monitor import DataCaptureConfig


dummy_endpoint_name = "churn-pred-model-monitor" 

spoof_s3_capture_upload_path= data_capture_path
dummy_data_capture_config = DataCaptureConfig(
    enable_capture=True, sampling_percentage=100, destination_s3_uri=spoof_data_capture
)
dummy_predictor = model.deploy(
    initial_instance_count=1,
    instance_type="ml.t2.medium",
    endpoint_name=dummy_endpoint_name,
    data_capture_config=dummy_data_capture_config,
)

### 10) Monitoring Schedule


### Create a schedule

You can create a model monitoring schedule for the endpoint created earlier. Use the baseline resources (constraints and statistics) to compare against the realtime traffic.

In [None]:
from sagemaker.model_monitor import CronExpressionGenerator
from time import gmtime, strftime

statistics_path= "{}/statistics.json".format(baseline_results_uri)
constraints_path="{}/constraints.json".format(baseline_results_uri)

mon_schedule_name = "DEMO-xgb-churn-pred-model-monitor-schedule-" + strftime(
    "%Y-%m-%d-%H-%M-%S", gmtime()
)
my_default_monitor.create_monitoring_schedule(
    monitor_schedule_name=mon_schedule_name,
    endpoint_input=dummy_endpoint_name,
    output_s3_uri=s3_report_path,
    statistics= statistics_path,
    constraints = constraints_path,
  # statistics=my_default_monitor.baseline_statistics(), # set directly with S3 path 
  # constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

---

### Extras: Describe and inspect the schedule

Once you describe, observe that the MonitoringScheduleStatus changes to Scheduled.

In [None]:
desc_schedule_result = my_default_monitor.describe_schedule()
print("Schedule status: {}".format(desc_schedule_result["MonitoringScheduleStatus"]))

### List executions
The schedule starts jobs at the previously specified intervals. Here, you list the latest five executions. Note that if you are kicking this off after creating the hourly schedule, you might find the executions empty. You might have to wait until you cross the hour boundary (in UTC) to see executions kick off. The code below has the logic for waiting.

Note: Even for an hourly schedule, Amazon SageMaker has a buffer period of 20 minutes to schedule your execution. You might see your execution start in anywhere from zero to ~20 minutes from the hour boundary. This is expected and done for load balancing in the backend.

In [None]:
import time
mon_executions = my_default_monitor.list_executions()
print(
    "We created a hourly schedule above and it will kick off executions ON the hour (plus 0 - 20 min buffer.\nWe will have to wait till we hit the hour..."
)

while len(mon_executions) == 0:
    print("Waiting for the 1st execution to happen...")
    time.sleep(60)
    mon_executions = my_default_monitor.list_executions()

### Inspect a specific execution (latest execution)
In the previous cell, you picked up the latest completed or failed scheduled execution. Here are the possible terminal states and what each of them mean: 
* Completed - This means the monitoring execution completed and no issues were found in the violations report.
* CompletedWithViolations - This means the execution completed, but constraint violations were detected.
* Failed - The monitoring execution failed, maybe due to client error (perhaps incorrect role premissions) or infrastructure issues. Further examination of FailureReason and ExitMessage is necessary to identify what exactly happened.
* Stopped - job exceeded max runtime or was manually stopped.

In [None]:
latest_execution = mon_executions[
    -1
]  # latest execution's index is -1, second to last is -2 and so on..
#time.sleep(60)
latest_execution.wait(logs=False)

print("Latest execution status: {}".format(latest_execution.describe()["ProcessingJobStatus"]))
print("Latest execution result: {}".format(latest_execution.describe()["ExitMessage"]))

latest_job = latest_execution.describe()
if latest_job["ProcessingJobStatus"] != "Completed":
    print(
        "====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures."
    )

In [None]:
report_uri = latest_execution.output.destination
print("Report Uri: {}".format(report_uri))

### List the generated reports

In [None]:
from urllib.parse import urlparse

s3uri = urlparse(report_uri)
report_bucket = s3uri.netloc
report_key = s3uri.path.lstrip("/")
print("Report bucket: {}".format(report_bucket))
print("Report key: {}".format(report_key))

s3_client = boto3.Session().client("s3")
result = s3_client.list_objects(Bucket=report_bucket, Prefix=report_key)
report_files = [report_file.get("Key") for report_file in result.get("Contents")]
print("Found Report Files:")
print("\n ".join(report_files))

### Violations report

If there are any violations compared to the baseline, they will be listed here.

In [None]:
violations = my_default_monitor.latest_monitoring_constraint_violations()
pd.set_option("display.max_colwidth", -1)
constraints_df = pd.io.json.json_normalize(violations.body_dict["violations"])
constraints_df.head(10)

### Other commands
We can also start and stop the monitoring schedules.

In [None]:
#my_default_monitor.stop_monitoring_schedule()
#my_default_monitor.start_monitoring_schedule()

## Delete the resources


In [None]:
#my_default_monitor.stop_monitoring_schedule()
#my_default_monitor.delete_monitoring_schedule()
#time.sleep(60)  # actually wait for the deletion

In [None]:
#predictor.delete_model()