# Penguins in Production - Session 5

This notebook aims to create a [SageMaker Pipeline](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) to build an end-to-end Machine Learning system to solve the problem of classifying penguin species.

This example uses the [Penguins dataset](https://www.kaggle.com/parulpandey/palmer-archipelago-antarctica-penguin-data).

Amazon SageMaker is free to try. Your free tier starts from the first month you create your first SageMaker resource and lasts two months. Check out the  [Amazon SageMaker Pricing](https://aws.amazon.com/sagemaker/pricing/) for more information. Also, we'll be working extensively with [boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html) and the [SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable/). Keep their documentation handy.

This notebook was created by [Santiago L. Valdarrama](https://twitter.com/svpino) as part of the [Machine Learning School](https://www.ml.school) program.

Let's ensure we are running the latest version of the SakeMaker SDK. **Restart the Kernel** after you run the following cell.

In [28]:
%load_ext autoreload
%autoreload 2

import sys
CODE_FOLDER = "code"
sys.path.append(f"./{CODE_FOLDER}")

Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


# Session 5 - Data Monitoring

In this session we'll set up a monitoring process to analyze the quality of the data our endpoint receives in production. For this, we will have SageMaker capture and evaluate the data observed by the endpoint.

To enable this functionality, we need a couple of steps:

1. Create a baseline to compare the real-time traffic.
2. Set up a schedule to continuously evaluate and compare against the baseline.

Notice that the Data Quality process uses the baseline dataset we generated during preprocessing. This baseline dataset is the same unprocessed train set in JSON format. We do this because we transformed the train data during the preprocessing step, but we need raw data because that's what the endpoint expects.

Check [Amazon SageMaker Model Monitor](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_monitoring.html) for a brief explanation of how to use SageMaker's Model Monitoring functionality. [Monitor models for data and model quality, bias, and explainability](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor.html) is a much more extensive guide to monitoring in Amazon SageMaker.

Here is what the Pipeline will look like at the end of this session:

<img src='penguins/images/session5-pipeline.png' alt='Session 5 Pipeline' width="600">


In [38]:
%%writefile {CODE_FOLDER}/session5.py

from session1 import *
from session2 import *
from session3 import *
from session4 import * 

from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.quality_check_step import DataQualityCheckConfig, QualityCheckStep
from sagemaker.model_monitor.dataset_format import DatasetFormat

def delete_monitoring_schedule(schedule):
    """
    Deletes a monitoring schedule. This function waits
    for the job to finish before deleting it.
    """
    
    attempts = 30
    q
    try:
        status = schedule.describe_schedule()["MonitoringScheduleStatus"]
    except Exception:
        print("Monitoring schedule deleted.")
        return
        
    while status in ("Pending", "InProgress") and attempts > 0:
        attempts -= 1
        print(f"Monitoring schedule status: {status}. Waiting for it to finish.")
        time.sleep(30)
        status = schedule.describe_schedule()["MonitoringScheduleStatus"]

    if status not in ("Pending", "InProgress"):
        schedule.delete_monitoring_schedule()
        print("Monitoring schedule deleted.")
    else:
        print("Waiting for monitoring schedule timed out")

Overwriting code/session5.py


## Step 1 - Generating a Baseline

Let's now configure the [Quality Check Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-quality-check) and feed it the train set we generated in the preprocessing step.

We can configure the instance that will run the quality check using the [CheckJobConfig](https://sagemaker.readthedocs.io/en/v2.73.0/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.check_job_config.CheckJobConfig) class, and we can use the `DataQualityCheckConfig` class to configure the job.

In [39]:
%%writefile -a {CODE_FOLDER}/session5.py

data_quality_location = f"{S3_FILEPATH}/monitoring/data-quality"

data_quality_baseline_step = QualityCheckStep(
    name="generate-data-quality-baseline",
    
    check_job_config = CheckJobConfig(
        instance_type="ml.t3.xlarge",
        instance_count=1,
        volume_size_in_gb=20,
        sagemaker_session=sagemaker_session,
        role=role,
    ),
    
    quality_check_config = DataQualityCheckConfig(
        # We will use the train dataset we generated during the preprocessing 
        # step to generate the data quality baseline.
        baseline_dataset=preprocess_data_step.properties.ProcessingOutputConfig.Outputs["train-baseline"].S3Output.S3Uri,

        dataset_format=DatasetFormat.json(lines=True),
        output_s3_uri=data_quality_location
    ),
    
    skip_check=True,
    register_new_baseline=True,
    model_package_group_name=model_package_group_name,
    cache_config=cache_config
)

Appending to code/session5.py


## Step 2 - Running the Pipeline

We can now run the pipeline.

In [40]:
from session5 import *
from sagemaker.workflow.pipeline import Pipeline


pipeline = Pipeline(
    name="penguins-session5-pipeline",
    parameters=[
        dataset_location, 
        preprocessor_destination,
        train_dataset_baseline_destination,
        test_dataset_baseline_destination,
        data_capture_percentage,
        data_capture_destination,       
        accuracy_threshold,
    ],
    steps=[
        preprocess_data_step, 
        data_quality_baseline_step,
        train_model_step, 
        evaluate_model_step,
        condition_step
    ],
    pipeline_definition_config=pipeline_definition_config
)

Submit the pipeline definition to the SageMaker Pipelines service to create a pipeline if it doesn't exist or update it if it does.

In [8]:
pipeline.upsert(role_arn=role)
execution = pipeline.start()

Using provided s3_resource


Popping out 'CertifyForMarketplace' from the pipeline definition since it will be overridden in pipeline execution time.


Using provided s3_resource
Using provided s3_resource
Using provided s3_resource


## Step 3 - Statistics and Constraints

Our pipeline generated baseline statistics and constraints using our train set. We can take a look at what these values look like by downloading them from S3.

In [14]:
from IPython.display import JSON

statistics = f"{data_quality_location}/statistics.json"
JSON(json.loads(S3Downloader.read_file(statistics)))

<IPython.core.display.JSON object>

In [15]:
constraints = f"{data_quality_location}/constraints.json"
JSON(json.loads(S3Downloader.read_file(constraints)))

<IPython.core.display.JSON object>

## Step 4 - Generating Endpoint Traffic

Let's generate some traffic for our endpoint so we can test the monitoring functionality. We will repeatedly send every sample from the dataset to the endpoint to simulate real prediction requests.

In [16]:
%%writefile -a {CODE_FOLDER}/session5.py

import pandas as pd
from threading import Thread, Event
from time import sleep

def generate_traffic(predictor):
    
    def _predict(data, predictor, stop_traffic_thread):
        for index, row in data.iterrows():
            predictor.predict(row.to_dict(), inference_id=str(index))
            
            sleep(1)

            if stop_traffic_thread.is_set():
                break

    def _generate_prediction_data(data, predictor, stop_traffic_thread):
        while True:
            print(f"Generating {data.shape[0]} predictions...")
            _predict(data, predictor, stop_traffic_thread)
            
            if stop_traffic_thread.is_set():
                break

                
    stop_traffic_thread = Event()
    
    data = pd.read_csv(LOCAL_FILEPATH).dropna()
    data.drop(["sex", "species"], axis=1, inplace=True)
    
    traffic_thread = Thread(
        target=_generate_prediction_data,
        args=(data, predictor, stop_traffic_thread,)
    )
    
    traffic_thread.start()
    
    return stop_traffic_thread, traffic_thread


Appending to code/session5.py


Let's start generating the traffic.

In [17]:
import session5
from session5 import *
from sagemaker.predictor import Predictor
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer

waiter = sagemaker_client.get_waiter("endpoint_in_service")
waiter.wait(
    EndpointName=session5.endpoint_name,
    WaiterConfig={
        "Delay": 10,
        "MaxAttempts": 30
    }
)

predictor = Predictor(
    endpoint_name=session5.endpoint_name,
    serializer=JSONSerializer(),
    deserializer=JSONDeserializer()
)

stop_traffic_thread, traffic_thread = generate_traffic(predictor)

Generating 334 predictions...


## Step 5 - Checking the Captured Data

Let's check the S3 location where the endpoint stores the requests and responses that it receives.

Notice that it make take a few minutes for the first few files to show up in S3. Keep running the following line until you get some.

In [18]:
files = S3Downloader.list(data_capture_destination.default_value)[:3]
files

['s3://mlschool/penguins/monitoring/data-capture/penguins-endpoint/AllTraffic/2023/06/27/10/58-55-069-5a80dc2b-dafe-4be7-920a-eaece978479b.jsonl',
 's3://mlschool/penguins/monitoring/data-capture/penguins-endpoint/AllTraffic/2023/06/27/10/59-56-068-4bdc305d-5965-490e-b0e5-5e687dac7702.jsonl',
 's3://mlschool/penguins/monitoring/data-capture/penguins-endpoint/AllTraffic/2023/06/27/11/00-56-384-6decb274-dfde-459c-9bf9-e53036a12711.jsonl']

These files contain the data captured by the endpoint in a SageMaker-specific JSON-line format. Each inference request is captured in a single line in the `jsonl` file. The line contains both the input and output merged together.

Let's read the first line from the first file:

In [19]:
if len(files):
    lines = S3Downloader.read_file(files[0])
    print(json.dumps(json.loads(lines.split("\n")[0]), indent=2))

{
  "captureData": {
    "endpointInput": {
      "observedContentType": "application/json",
      "mode": "INPUT",
      "data": "{\"island\": \"Torgersen\", \"culmen_length_mm\": 39.1, \"culmen_depth_mm\": 18.7, \"flipper_length_mm\": 181.0, \"body_mass_g\": 3750.0}",
      "encoding": "JSON"
    },
    "endpointOutput": {
      "observedContentType": "application/json",
      "mode": "OUTPUT",
      "data": "{\"species\": \"Adelie\", \"prediction\": 0, \"confidence\": 0.809994876}",
      "encoding": "JSON"
    }
  },
  "eventMetadata": {
    "eventId": "705ebc5b-70c7-4eab-977a-807320b1b589",
    "inferenceId": "0",
    "inferenceTime": "2023-06-27T10:58:55Z"
  },
  "eventVersion": "0"
}


## Step 6 - Scheduling the Monitoring Job

We can now set up a schedule to continuously monitor data going into the endpoint and compare it to the baseline we generated before. This monitoring job will use the baseline statistics and constraints we generated during the Data Quality Check Step. Check [Schedule Data Quality Monitoring Jobs](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-schedule-data-monitor.html) for more information.

SageMaker looks for violations in the data captured by the endpoint. By default, it combines the input data with the endpoint output and compare the result with the baseline we generated. If we let SageMaker do this, we will get a few violations, for example an "extra column check" violation because the fields `confidence` and `prediction` don't exist in the baseline data.

We can fix these violations by creating a preprocessing script configuring the data we want the monitoring job to use.


In [20]:
DATA_QUALITY_PREPROCESSOR = "data_quality_preprocessor.py"

Here is the preprocessing script for the Data Quality Monitoring Job. Check [Preprocessing and Postprocessing](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-pre-and-post-processing.html) for more information about how to configure these scripts.

In [21]:
%%writefile {CODE_FOLDER}/{DATA_QUALITY_PREPROCESSOR}

import json

def preprocess_handler(inference_record):
    input_data = inference_record.endpoint_input.data
    output_data = json.loads(inference_record.endpoint_output.data)
    
    response = json.loads(input_data)
    response["species"] = output_data["species"]

    # The `response` variable contains the data that we want the
    # monitoring job to use to compare with the baseline.
    return response

Overwriting code/data_quality_preprocessor.py


The monitoring schedule expects an S3 location pointing to the preprocessing script. Let's upload the script to the default bucket.

In [22]:
import os

bucket = boto3.Session().resource("s3").Bucket(sagemaker_session.default_bucket())
prefix = "penguins-monitoring"
bucket.Object(os.path.join(prefix, DATA_QUALITY_PREPROCESSOR)).upload_file(str(Path(CODE_FOLDER) / DATA_QUALITY_PREPROCESSOR))
data_quality_preprocessor = f"s3://{os.path.join(bucket.name, prefix, DATA_QUALITY_PREPROCESSOR)}"
data_quality_preprocessor

's3://sagemaker-us-east-1-325223348818/penguins-monitoring/data_quality_preprocessor.py'

We can now set up the Data Quality Monitoring Job using the [DefaultModelMonitor](https://sagemaker.readthedocs.io/en/stable/api/inference/model_monitor.html#sagemaker.model_monitor.model_monitoring.DefaultModelMonitor) class. Notice how we specify the `record_preprocessor_script` using the S3 location where we uploaded our script.

In [95]:
from sagemaker.model_monitor import CronExpressionGenerator, DefaultModelMonitor

data_monitor = DefaultModelMonitor(
    instance_type="ml.m5.xlarge",
    instance_count=1,
    max_runtime_in_seconds=3600,
    role=role,
)

data_monitor.create_monitoring_schedule(
    monitor_schedule_name="penguins-data-monitoring-schedule",
    endpoint_input=predictor.endpoint_name,
    record_preprocessor_script=data_quality_preprocessor,
    statistics=f"{data_quality_location}/statistics.json",
    constraints=f"{data_quality_location}/constraints.json",
    schedule_cron_expression=CronExpressionGenerator.hourly(),
)

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: .
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker.model_monitor.model_monitoring:Creating Monitoring Schedule with name: penguins-data-monitoring-schedule


You can describe the schedule to see more information about the Data Quality Monitoring Job.

## Step 7 - Introducing a Violation

Let's make a prediction for a penguin and include extra fields in the request. This should be flagged by the monitoring job.

In [23]:
predictor.predict({
    "island": "Dream",
    "culmen_length_mm": 46.4,
    "culmen_depth_mm": 18.6,
    "flipper_length_mm": 190.0,
    "body_mass_g": 5608.0,
    
    # These two columns are not in the baseline data,
    # so they will be reported by the monitoring job
    # as a violation.
    "name": "Johnny",
    "height": 28.0
})

{'species': 'Adelie', 'prediction': 0, 'confidence': 0.669251382}

## Step 8 - Checking Monitoring Violations

We can check the results of the monitoring job by looking at whether it generated any violations.

In [115]:
description = data_monitor.describe_schedule()
description

{'MonitoringScheduleArn': 'arn:aws:sagemaker:us-east-1:325223348818:monitoring-schedule/penguins-data-monitoring-schedule',
 'MonitoringScheduleName': 'penguins-data-monitoring-schedule',
 'MonitoringScheduleStatus': 'Scheduled',
 'MonitoringType': 'DataQuality',
 'CreationTime': datetime.datetime(2023, 6, 28, 10, 25, 47, 459000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2023, 6, 28, 11, 18, 42, 954000, tzinfo=tzlocal()),
 'MonitoringScheduleConfig': {'ScheduleConfig': {'ScheduleExpression': 'cron(0 * ? * * *)'},
  'MonitoringJobDefinitionName': 'data-quality-job-definition-2023-06-28-10-25-46-656',
  'MonitoringType': 'DataQuality'},
 'EndpointName': 'penguins-endpoint',
 'LastMonitoringExecutionSummary': {'MonitoringScheduleName': 'penguins-data-monitoring-schedule',
  'ScheduledTime': datetime.datetime(2023, 6, 28, 11, 0, tzinfo=tzlocal()),
  'CreationTime': datetime.datetime(2023, 6, 28, 11, 5, 38, 386000, tzinfo=tzlocal()),
  'LastModifiedTime': datetime.datetime(2

Generating 334 predictions...
Generating 334 predictions...
Generating 334 predictions...


In [104]:
from sagemaker.model_monitor import MonitoringExecution
)

status = description["LastMonitoringExecutionSummary"]["MonitoringExecutionStatus"]
print(f"Status: {status}")

if status == "CompletedWithViolations":
    processing_job_arn = description["LastMonitoringExecutionSummary"]["ProcessingJobArn"]
    execution = MonitoringExecution.from_processing_arn(sagemaker_session=sagemaker_session, processing_job_arn=processing_job_arn)
    execution_destination = execution.output.destination
    
    violations_filepath = os.path.join(execution_destination, "constraint_violations.json")
    violations = json.loads(S3Downloader.read_file(violations_filepath))["violations"]
    
    print(json.dumps(violations, indent=2))

Status: Completed
Generating 334 predictions...


## Step 9 - Cleaning up

Let's stop the monitoring jobs by deleting the monitoring schedule we created before. 

In [24]:
delete_monitoring_schedule(data_monitor)

NameError: name 'data_monitor' is not defined

Let's now stop the thread generating traffic.

In [25]:
stop_traffic_thread.set()
traffic_thread.join()

Finally, we can delete the endpoint.

In [26]:
predictor.delete_endpoint()