In [3]:
import boto3
import sagemaker
import sagemaker.session
from sagemaker.workflow.pipeline_context import PipelineSession

In [4]:
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"MyAbaloneModelPackageGroupName"

In [203]:
role

'arn:aws:iam::709891711940:role/service-role/SageMaker-MLOpsEngineer-poc'

In [101]:
!mkdir -p data

In [31]:
local_path = "data/abalone-dataset.csv"
s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file("dataset/abalone-dataset.csv", local_path)

base_uri = f"s3://{default_bucket}/abalone"
# input_data_uri = sagemaker.s3.S3Uploader.upload(
#     local_path=local_path,
#     desired_s3_uri=base_uri
# )

print(input_data_uri)

s3://sagemaker-us-east-1-709891711940/abalone/abalone-dataset.csv


In [103]:
local_path = 'data/abalone-dataset-batch.csv'
s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file("dataset/abalone-dataset-batch", local_path)

base_uri = f"s3://{default_bucket}/abalone"
batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri
)

print(batch_data_uri)

s3://sagemaker-us-east-1-709891711940/abalone/abalone-dataset-batch.csv


In [104]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)

input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri
)

batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri
)

mse_threshold = ParameterFloat(name="MseThreshold", default_value=6.0)

In [105]:
!mkdir -p abalone

In [106]:
%%writefile abalone/preprocessing.py
import argparse
import os
import requests
import tempfile
import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder

# Because this is a headerless CSV file, specify the column name here.
feature_columns_names = [
    "sex",
    "length",
    "diameter",
    "height",
    "whole_weight",
    "shucked_weight",
    "viscera_weight",
    "shell_weight",
]

label_column = "rings"

feature_columns_dtype = {
    "sex": str,
    "length": np.float64,
    "diameter": np.float64,
    "height": np.float64,
    "whole_weight": np.float64,
    "shucked_weight": np.float64,
    "viscera_weight": np.float64,
    "shell_weight": np.float64
}
label_column_dtype = {"rings": np.float64}

def merge_two_dicts(x, y):
    z = x.copy()
    z.update(y)
    return z

if __name__ == "__main__":
    base_dir = "opt/ml/processing"
    df = pd.read_csv(
        f"{base_dir}/input/abalone-dataset.csv",
        header=None,
        names=feature_columns_names + [label_column],
        dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype)
    )
    numeric_features = list(feature_columns_names)
    numeric_features.remove("sex")
    numeric_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler", StandardScaler())
        ]
    )
    
    categorical_features = ["sex"]
    categorical_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
            ("onehot", OneHotEncoder(handle_unknown="ignore"))
        ]
    )
    
    preprocess = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, numeric_features),
            ("cat", categorical_transformer, categorical_features)
        ]
    )
    
    y = df.pop("rings")
    X_pre = preprocess.fit_transform(df)
    y_pre = y.to_numpy().reshape(len(y), 1)
    
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    
    pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Overwriting abalone/preprocessing.py


'us-east-1'

In [107]:
 from sagemaker.sklearn.processing import SKLearnProcessor

framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-abalone-process",
    sagemaker_session=pipeline_session,
    role=role
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [108]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
    ],
    code="abalone/preprocessing.py",
)

step_process = ProcessingStep(name="MyAbaloneProcess", step_args=processor_args)



In [109]:
# from sagemaker.processing import ProcessingInput, ProcessingOutput
# from sagemaker.workflow.steps import ProcessingStep

# step_process = ProcessingStep(
#     name="MyAbaloneProcess",
#     processor=sklearn_processor,
#     inputs=[
#         ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
#     ],
#     outputs=[
#         ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
#         ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
#         ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
#     ],
#     code="abalone/preprocessing.py",
# )

In [110]:
input_data

ParameterString(name='InputData', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='s3://sagemaker-us-east-1-709891711940/abalone/abalone-dataset.csv')

In [111]:
model_path = f"s3://{default_bucket}/MyAbaloneTrain"

In [112]:
from sagemaker.estimator import Estimator

image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge"
)

xgb_train = Estimator(
    image_uri=image_uri,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=model_path,
    sagemaker_session=pipeline_session,
    role=role
)

xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0
)

In [113]:
from sagemaker.inputs import TrainingInput
train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
            content_type="text/csv"
        ),
    }
)

In [114]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

step_train = TrainingStep(
    name="MyAbaloneTrain",
    step_args=train_args
)



In [115]:
%%writefile abalone/evaluation.py
import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost

from sklearn.metrics import mean_squared_error

if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
        
    model = pickle.load(open("xgboost-model", "rb"))
    
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    
    X_test = xgboost.DMatrix(df.values)
    predictions = model.predict(X_test)
    
    mse = mean_squared_error(y_test, predictions)
    std = np.std(y_test - predictions)
    
    report_dict = {
        "regression_metrics": {
            "mse": {
                "value": mse,
                "standard_deviation": std
            },
        },
    }
    
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Overwriting abalone/evaluation.py


In [116]:
from sagemaker.processing import ScriptProcessor

script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="my-script-abalone-eval",
    sagemaker_session=pipeline_session,
    role=role
)

In [117]:
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test"
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="abalone/evaluation.py"
)

In [118]:
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(
    name="MyEvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)

step_eval = ProcessingStep(
    name="MyAbaloneEval",
    step_args=eval_args,
    property_files=[evaluation_report],
)

# step_eval = ProcessingStep(
#     name="MyAbaloneEval",
#     processor=script_eval,
#     inputs=[
#         ProcessingInput(
#             source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
#             destination="/opt/ml/processing/model"
#         ),
#         ProcessingInput(
#             source=step_process.properties.ProcessingOutputConfig.Outputs[
#                 "test",
#             ].S3Output.S3Uri,
#             destination="/opt/ml/processing/test"
#         )
#     ],
#     outputs=[
#         ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
#     ],
#     code="abalone/evaluation.py",
#     property_files=[evaluation_report],
# )

In [119]:
from sagemaker.model import Model

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role
)

In [120]:
from sagemaker.inputs import CreateModelInput

model_args = model.create(
    instance_type="ml.m5.large",
    accelerator_type="ml.eia1.medium"
)
# inputs = CreateModelInput(
#     instance_type="ml.m5.xlarge",
#     accelerator_type="ml.eia1.medium"
# )

In [121]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep

step_create_model = ModelStep(
    name="MyAbaloneCreateModel",
    step_args=model_args
)

# from sagemaker.workflow.steps import CreateModelStep

# step_create_model = CreateModelStep(
#     name="MyAbaloneCreateModel",
#     model=model,
#     inputs=inputs
# )

In [122]:
from sagemaker.transformer import Transformer

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/MyAbaloneTransform"
)

In [123]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

step_transform = TransformStep(
    name="MyAbaloneTransform",
    transformer=transformer,
    inputs=TransformInput(data=batch_data)
)

In [124]:
step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
# a + ["evaluation.json"]

# step_eval.arguments

's3://sagemaker-us-east-1-709891711940/my-script-abalone-eval-2023-05-18-13-09-00-765/output/evaluation'

In [142]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.model_step import ModelStep

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics
)

step_register = ModelStep(
    name="MyAbaloneRegisterModel",
    step_args=register_args
)
# step_register = RegisterModel(
#     name="MyAbaloneRegisterModel",
#     estimator=xgb_train,
#     model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
#     content_types=["text/csv"],
#     response_types=["text/csv"],
#     inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
#     transform_instances=["ml.m5.xlarge"],
#     model_package_group_name=model_package_group_name,
#     model_metrics=model_metrics
# )



In [143]:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

step_fail = FailStep(
    name="MyAbaloneMSEFail",
    error_message=Join(on=" ", values=["Execution failed due to MSE >", mse_threshold]),
)

In [144]:
step_eval.name

'MyAbaloneEval'

In [145]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value",
    ),
    right=mse_threshold,
)

In [146]:
step_register.name, step_create_model.name, step_transform.name, step_fail.name

('MyAbaloneRegisterModel',
 'MyAbaloneCreateModel',
 'MyAbaloneTransform',
 'MyAbaloneMSEFail')

In [147]:
step_cond = ConditionStep(
    name="MyAbaloneMSECond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

In [148]:
step_cond.name

'MyAbaloneMSECond'

In [149]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"MyAbalonePipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        model_approval_status,
        input_data,
        batch_data,
        mse_threshold,
    ],
    steps=[step_process
           , step_train,
    step_eval, 
    step_cond]
)

In [150]:
import json

json.loads(pipeline.definition())



{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-709891711940/abalone/abalone-dataset.csv'},
  {'Name': 'BatchData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-709891711940/abalone/abalone-dataset-batch.csv'},
  {'Name': 'MseThreshold', 'Type': 'Float', 'DefaultValue': 6.0}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'MyAbaloneProcess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification'

In [151]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:709891711940:pipeline/myabalonepipeline',
 'ResponseMetadata': {'RequestId': '1b588229-2bb3-4d50-a57e-79227148ee73',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '1b588229-2bb3-4d50-a57e-79227148ee73',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '85',
   'date': 'Thu, 18 May 2023 13:16:03 GMT'},
  'RetryAttempts': 0}}

In [152]:
execution = pipeline.start()

In [153]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:709891711940:pipeline/myabalonepipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:709891711940:pipeline/myabalonepipeline/execution/oxnwnmq2numt',
 'PipelineExecutionDisplayName': 'execution-1684415767214',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'myabalonepipeline',
  'TrialName': 'oxnwnmq2numt'},
 'CreationTime': datetime.datetime(2023, 5, 18, 13, 16, 7, 98000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2023, 5, 18, 13, 16, 7, 98000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:709891711940:user-profile/d-odvpujw44yfo/default-1681294514475',
  'UserProfileName': 'default-1681294514475',
  'DomainId': 'd-odvpujw44yfo'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:709891711940:user-profile/d-odvpujw44yfo/default-1681294514475',
  'UserProfileName': 'default-1681294514475',
  'DomainId': 'd-odvpujw44yfo'},
 

In [154]:
execution.wait()

In [None]:
execution.list_steps()

In [9]:
sm_client = boto3.client("sagemaker")

In [12]:
model_package_arn = sm_client.list_model_packages(ModelPackageGroupName=model_package_group_name)["ModelPackageSummaryList"][0]["ModelPackageArn"]

In [13]:
model_package_arn

'arn:aws:sagemaker:us-east-1:709891711940:model-package/myabalonemodelpackagegroupname/1'

In [14]:
model_package_update_input_dict = {
    "ModelPackageArn": model_package_arn,
    "ModelApprovalStatus": "Approved"
}

In [15]:
model_package_update_response = sm_client.update_model_package(**model_package_update_input_dict)

In [16]:
model_package_update_response

{'ModelPackageArn': 'arn:aws:sagemaker:us-east-1:709891711940:model-package/myabalonemodelpackagegroupname/1',
 'ResponseMetadata': {'RequestId': '7e5d1d91-9d8e-48c3-9e35-bf74c952a537',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '7e5d1d91-9d8e-48c3-9e35-bf74c952a537',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '109',
   'date': 'Fri, 26 May 2023 10:20:42 GMT'},
  'RetryAttempts': 0}}

In [17]:
from sagemaker import ModelPackage
from time import gmtime, strftime

model = ModelPackage(
    role=role,
    model_package_arn=model_package_arn,
    sagemaker_session=sagemaker_session
)

model.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge')

----!

# Amazon SageMaker Drift Detection

This sample demonstrates how to setup an Amazon SageMaker MLOps deployment pipeline for Drift detection

![Solution Architecture](docs/drift-solution-architecture.png)

The following are the high-level steps to deploy this solution:

1. Publish the SageMaker [MLOps Project template](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-projects-templates.html) in the [AWS Service Catalog](https://aws.amazon.com/servicecatalog/)
2. Create a new Project in [Amazon SageMaker Studio](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-projects-create.html)

Once complete, you can Train and Deploy machine learning models, and send traffic to the Endpoint to cause the Model Monitor to raise a drift alert.

## Get Started

Use this following AWS CloudFormation quick start to create a custom [SageMaker MLOps project](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-projects-templates-custom.html) template in the [AWS Service Catalog](https://aws.amazon.com/servicecatalog/) and configure the portfolio and products so you can launch the project from within your Studio domain.

[![Launch Stack](https://s3.amazonaws.com/cloudformation-examples/cloudformation-launch-stack.png)](https://console.aws.amazon.com/cloudformation/home?region=us-east-1#/stacks/quickcreate?templateUrl=https%3A%2F%2Faws-ml-blog.s3.amazonaws.com%2Fartifacts%2Famazon-sagemaker-drift-detection%2Fdrift-service-catalog.yml&stackName=drift-pipeline&param_ExecutionRoleArn=&param_PortfolioName=SageMaker%20Organization%20Templates&param_PortfolioOwner=administrator&param_ProductVersion=1.0)

Follow are the list of the parameters. 

| Parameters         | Description                                    |
| ------------------ | ---------------------------------------------- |
| ExecutionRoleArn   | The SageMaker Studio execution role (required) |
| PortfolioName      | The name of the portfolio                      |
| PortfolioOwner     | The owner of the portfolio                     |
| ProductVersion     | The product version to deploy                  |

You can copy the the required `ExecutionRoleArn` role from your **User Details** in the SageMaker Studio dashboard.

![Execution Role](docs/studio-execution-role.png)

Alternatively see [BUILD.md](BUILD.md) for instructions on how to build the MLOps template from source.

## Creating a new Project in Amazon SageMaker Studio

Once your MLOps project template is registered in **AWS Service Catalog** you can create a project using your new template.

1. Switch back to the Launcher
2. Click **New Project** from the **ML tasks and components** section.

On the Create project page, SageMaker templates is chosen by default. This option lists the built-in templates. However, you want to use the template you published for Amazon SageMaker drift detection.

3. Choose **Organization templates**.
4. Choose **Amazon SageMaker drift detection template for real-time deployment**.
5. Choose **Select project template**.

![Select Template](docs/drift-select-template.png)

`NOTE`: If you have recently updated your AWS Service Catalog Project, you may need to refresh SageMaker Studio to ensure it picks up the latest version of your template.

6. In the **Project details** section, for **Name**, enter **drift-pipeline**.
  - The project name must have 32 characters or fewer.
7. In the Project template parameter, for **RetrainSchedule**, input a validate [Cron Schedule](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-schedule-expression.html)
  - This defaults to `cron(0 12 1 * ? *)` which is the first day of every month.
8. Choose **Create project**.

![Create Project](docs/drift-create-project.png)

`NOTE`: If the **Create project** button is not enabled, touch the value in the **RetrainSchedule** to allow continuing.

### Project Resources

The MLOps Drift Detection template will create the following AWS services and resources:

1. An [Amazon Simple Storage Service](https://aws.amazon.com/s3/) (Amazon S3) bucket is created for output model artifacts generated from the pipeline.

2. Two repositories are added to [AWS CodeCommit](https://aws.amazon.com/codecommit/):
  -  The first repository provides code to create a multi-step model building pipeline using [AWS CloudFormation](https://aws.amazon.com/cloudformation/).  The pipeline includes the following steps: data processing, model baseline, model training, model evaluation, and conditional model registration based on accuracy. The pipeline trains a linear regression model using the XGBoost algorithm on trip data from the [NYC Taxi Dataset](https://registry.opendata.aws/nyc-tlc-trip-records-pds/). This repository also includes the [build-pipeline.ipynb](build_pipeline/build-pipeline.ipynb) notebook to [Run the Pipeline](#run-the-pipeline) (see below)
  - The second repository contains code and configuration files for model deployment and monitoring. This repo also uses [AWS CodePipeline](https://aws.amazon.com/codepipeline/) and [CodeBuild](https://aws.amazon.com/codebuild/), which run an [AWS CloudFormation](https://aws.amazon.com/cloudformation/) template to create model endpoints for staging and production.  This repository includes the [prod-config.json](deployment_pipeline/prod-config.json) configure to set metrics and threshold for drift detection.

3. Two AWS CodePipeline pipelines:
  - The [model build pipeline](build_pipeline) creates or updates the pipeline definition and then starts a new execution with a custom [AWS Lambda](https://aws.amazon.com/lambda/) function whenever a new commit is made to the ModelBuild CodeCommit repository. The first time the CodePipeline is started, it will fail to complete expects input data to be uploaded to the Amazon S3 artifact bucket.
  - The [deployment pipeline](deployment_pipeline/README.md) automatically triggers whenever a new model version is added to the model registry and the status is marked as Approved. Models that are registered with Pending or Rejected statuses aren’t deployed.

4. [SageMaker Pipelines](https://aws.amazon.com/sagemaker/pipelines) uses the following resources:
  - This workflow contains the directed acyclic graph (DAG) that creates a baseline and training job in parallel following up with a step to evaluate the model.  Each step in the pipeline keeps track of the lineage and steps are cached for quickly re-running the pipeline.  
  - Within SageMaker Pipelines, the [SageMaker Model Registry](https://docs.aws.amazon.com/sagemaker/latest/dg/model-registry.html) tracks the model versions and respective artifacts, including the lineage and metadata for how they were created. Different model versions are grouped together under a model group, and new models registered to the registry are automatically versioned. The model registry also provides an approval workflow for model versions and supports deployment of models in different accounts. You can also use the model registry through the boto3 package.

5. Two SageMaker Endpoints:
  - After a model is approved in the registry, the artifact is automatically deployed to a staging endpoint followed by a manual approval step.
  - If approved, it’s deployed to a production endpoint in the same AWS account along with a Model Monitoring schedule configured to detect drift compared against the baseline.

6. Two [Amazon Event Bridge](https://aws.amazon.com/eventbridge/) Rules and [CloudWatch](https://aws.amazon.com/cloudwatch/) Alarm:
  - One scheduled rule configured to re-train the model on a regular schedule. 
  - One CloudWatch alarm that will trigger when drift is detected in the Model Monitor and trigger a rule to re-train the model.

You will see a summary of these resources in the project page including the Repositories and Pipelines.  The Model groups and Endpoints will become visible after we have completed running the pipeline.

## Run the Pipeline

Once your project is created, following the instructions to [Clone the Code Repository](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-projects-walkthrough.html#sagemaker-proejcts-walkthrough-clone)

![Solution Architecture](docs/drift-clone-repository.png)

1. Choose **Repositories**, and in the **Local path** column for the repository that ends with *build*, choose **clone repo....**
2. In the dialog box that appears, accept the defaults and choose **Clone repository**
3. When clone of the repository is complete, the local path appears in the **Local path** column. Click on the path to open the local folder that contains the repository code in SageMaker Studio.
4. Click on the [build-pipeline.ipynb](build_pipeline/build-pipeline.ipynb) file to open the notebook.

In the notebook, provide the **Project Name** in the first cell to get started:

```
project_name = "<<project_name>>"  # << Update this drift detection project
```

Then follow the series of steps in the notebook to run through the sample:

1. Fetch the [NYC Taxi Dataset](https://registry.opendata.aws/nyc-tlc-trip-records-pds/) and upload to S3
2. Start the model build pipeline
3. Review the training job performance
4. Update the [Model Registry](https://docs.aws.amazon.com/sagemaker/latest/dg/model-registry-approve.html) status to `Approved`
5. Deploy the model to Staging
6. Make predictions against the Staging Endpoint
7. Manually Approve the Staging endpoint in the [deployment pipeline](deployment_pipeline/README.md)
8. Deploy the model to Production
9. Make predictions against the Production Endpoint to cause the the Model Monitor to alarm on drift detection.

### Model Monitor

To [visualize the results](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-interpreting-visualize-results.html) of Model Monitoring in Amazon SageMaker Studio select the Production Endpoint from the **Components and registries** left navigation pane or from the **Endpoints** tab in the project summary.

![Solution Architecture](docs/drift-model-monitor.png)

Once the Model Monitor **Data Quality** schedule has completed its execution (usually about 10 minutes past the hour) you will be able to navigate to the **Monitoring job history** tab to see that *issue found* will be identified in the **Monitoring status** column.

## Running Costs

This section outlines cost considerations for running the Drift Detection Pipeline. Completing the pipeline will deploy an endpoint with 2 production variants which will cost less than $8 per day. Further cost breakdowns are below.

- **CodeBuild** – Charges per minute used. First 100 minutes each month come at no charge. For information on pricing beyond the first 100 minutes, see [AWS CodeBuild Pricing](https://aws.amazon.com/codebuild/pricing/).
- **CodeCommit** – $1/month if you didn't opt to use your own GitHub repository.
- **CodePipeline** – CodePipeline costs $1 per active pipeline* per month. Pipelines are free for the first 30 days after creation. More can be found at [AWS CodePipeline Pricing](https://aws.amazon.com/codepipeline/pricing/).
- **SageMaker** – Prices vary based on EC2 instance usage for the Notebook Instances, Model Hosting, Model Training and Model Monitoring; each charged per hour of use. For more information, see [Amazon SageMaker Pricing](https://aws.amazon.com/sagemaker/pricing/).
  - The three `mml.m5.xlarge` *baseline, training and evaluation jobs* run for approx 20 minutes at $0.23 an hour, and cost less than $1.
  - The one `ml.t2.medium` instance for staging *hosting* endpoint costs $0.056 per hour, or $1.34 per day.
  - The two `ml.m5.large` instances for production *hosting* endpoint at 2 x $0.115 per hour, or $5.52 per day.
  - The one `ml.m5.xlarge` instance for *model monitor* schedule at $0.23 an hour, and cost less than $1 per day.
- **S3** – Low cost, prices will vary depending on the size of the models/artifacts stored. The first 50 TB each month will cost only $0.023 per GB stored. For more information, see [Amazon S3 Pricing](https://aws.amazon.com/s3/pricing/).
- **Lambda** - Low cost, $0.20 per 1 million request see [AWS Lambda Pricing](https://aws.amazon.com/lambda/pricing/).

## Cleaning Up

The [build-pipeline.ipynb](build_pipeline/build-pipeline.ipynb) notebook includes cells that you can run to cleanup the resources.

1. SageMaker prod endpoint
2. SageMaker staging endpoint
3. SageMaker Pipeline Workflow and Model Package Group

You can also clean up resources using the [AWS Command Line Interface](http://aws.amazon.com/cli) (AWS CLI):

1. Delete the CloudFormation stack created to provision the Production endpoint:

```
aws cloudformation delete-stack --stack-name sagemaker-<<project_name>>-deploy-prod
```

2. Delete the CloudFormation stack created to provision the Staging endpoint:

```
aws cloudformation delete-stack --stack-name sagemaker-<<project_name>>-deploy-staging
```

3. Delete the CloudFormation stack created to provision the SageMaker Pipeline and Model Package Group:

```
aws cloudformation delete-stack --stack-name sagemaker-<<project_name>>-deploy-pipeline
```

4. Empty the S3 bucket containing the artifacts output from the drift deployment pipeline:

```
aws s3 rm --recursive s3://sagemaker-project-<<project_id>>-<<region_name>>
```

5. Delete the project, which removes the CloudFormation stack that created the deployment pipeline:

```
aws sagemaker delete-project --project-name <<project_name>>
```

6. Delete the AWS Service Catalog project template:

```
aws cloudformation delete-stack --stack-name <<drift-pipeline>>
```

## Security

See [CONTRIBUTING](CONTRIBUTING.md#security-issue-notifications) for more information.

## License

This library is licensed under the MIT-0 License. See the [LICENSE](LICENSE) file.


In [21]:
role

'arn:aws:iam::709891711940:role/service-role/SageMaker-MLOpsEngineer-poc'

In [26]:
from sagemaker.model_monitor import DataCaptureConfig

# Set to True to enable data capture
enable_capture = True

# Optional - Sampling percentage. Choose an integer value between 0 to 100 
# sampling_percentage = int
# Sampling percentage = 30 # Example 30%

# Optional - The S3 URI of stored captured-data location
s3_capture_upload_path = f"s3://{default_bucket}/my-abalone-capture_data"

# Specify either Input, Output or both.
capture_modes = ["REQUEST", "RESPONSE"] # In this example, we specify both
# capture mode = ["REQUEST"] # Example - If you want to only capture input.

# Configuration object passed in when deploying Models to SM endpoints
data_capture_config = DataCaptureConfig(
    enable_capture=enable_capture,
    # sampling_percentage=sampling_percentage, # Optional
    destination_s3_uri=s3_capture_upload_path, # Optional
    capture_options=capture_modes
)

In [41]:
s3_capture_upload_path

's3://sagemaker-us-east-1-709891711940/my-abalone-capture_data'

In [27]:
from datetime import datetime

endpoint_name = '1-2023-05-26-10-26-28-921'
# endpoint_name = f'my-abalone-endpoint-{datetime.utcnow():%Y-%m-%d-%H%M}'
print(endpoint_name)

1-2023-05-26-10-26-28-921


In [29]:
initial_instance_count = 1
instance_type = "ml.m5.xlarge"

model.deploy(
    initial_instance_count=initial_instance_count,
    instance_type=instance_type,
    endpoint_name=endpoint_name,
    data_capture_config=data_capture_config
)

----!

In [36]:
region

'us-east-1'

In [37]:
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import CSVDeserializer

predictor = Predictor(endpoint_name=endpoint_name,
                      serializer=CSVSerializer,
                      deserializer=CSVDeserializer)

In [78]:
predictor.endpoint

The endpoint attribute has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


'1-2023-05-26-10-26-28-921'

# Data Capture

In [110]:
    from time import sleep

    validate_dataset = "validation_with_predictions.csv"
    sm_client = boto3.client("sagemaker-runtime")
    # Cut off threshold of 80%
    cutoff = 0.8

    limit = 200 # Need at least 200 samples to compute standard deviations
    i = 0
    with open(f"test_data/{validate_dataset}", "w") as validation_file:
        validation_file.write("prediction,label\n") # CSV header
        with open("test_data/validation.csv", "r") as f:
            for row in f:
                (label, input_cols) = row.split(",", 1)
                res = sm_client.invoke_endpoint(EndpointName=endpoint_name,
                              ContentType='text/csv',
                              Body=input_cols)
                prediction = res["Body"].read().decode()
                # prediction = "1" if probability > cutoff else "0"
                validation_file.write(f"{prediction},{label}\n")
                i += 1
                if i > limit:
                    break
                print(".", end="", flush=True)
                sleep(0.5)
    print()
    print("Done!")

........................................................................................................................................................................................................
Done!


In [134]:
local_path_validation_with_prediction = "test_data/validation_with_predictions.csv"
base_uri_validation_with_prediction = f"s3://{default_bucket}/MyAbalonePipeline/7p6mevksvmgc/MyAbaloneProcess/output/validation"

input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path_validation_with_prediction,
    desired_s3_uri=base_uri_validation_with_prediction
)

In [135]:
input_data_uri

's3://sagemaker-us-east-1-709891711940/MyAbalonePipeline/7p6mevksvmgc/MyAbaloneProcess/output/validation/validation_with_predictions.csv'

In [50]:
role

'arn:aws:iam::709891711940:role/service-role/SageMaker-MLOpsEngineer-poc'

# Create a baseline

In [103]:
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor import DatasetFormat

baseline_data_uri = "s3://sagemaker-us-east-1-709891711940/MyAbalonePipeline/7p6mevksvmgc/MyAbaloneProcess/output/train/train.csv"

my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600
)

my_default_monitor.suggest_baseline(
    baseline_dataset=baseline_data_uri,
    dataset_format=DatasetFormat.csv(header=False),
    output_s3_uri=f"s3://{default_bucket}/my-abalone-baseline-results",
    wait=True
)

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: .
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker:Creating processing-job with name baseline-suggestion-job-2023-06-13-14-59-49-342


..........................[34m2023-06-13 15:04:11,325 - matplotlib.font_manager - INFO - Generating new fontManager, this may take some time...[0m
[34m2023-06-13 15:04:11.863276: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory[0m
[34m2023-06-13 15:04:11.863306: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.[0m
[34m2023-06-13 15:04:13.397810: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory[0m
[34m2023-06-13 15:04:13.397840: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)[0m
[34m2023-06-13 15:04:13.397860: I tensorflow/stream_executor/cuda/cuda_diagnostic

<sagemaker.processing.ProcessingJob at 0x7f2b8ea8a2d0>

# Monitoring Data Qality

In [108]:
from sagemaker.model_monitor import CronExpressionGenerator
from sagemaker.model_monitor import EndpointInput

my_default_monitor.create_monitoring_schedule(
    monitor_schedule_name="MyAbaloneModelMonitorSchedule",
    output_s3_uri=f"s3://{default_bucket}/my-abalone-model-monitoring-report",
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    enable_cloudwatch_metrics=True,
    endpoint_input=EndpointInput(
        endpoint_name=endpoint_name,
        destination="/opt/ml/processing/input/endpoint",
    )
)

INFO:sagemaker.model_monitor.model_monitoring:Creating Monitoring Schedule with name: MyAbaloneModelMonitorSchedule


In [None]:
status_default_monitor_schedule = my_default_monitor.stop_monitoring_schedule()

In [112]:
import boto3
boto3_sm_client = boto3.client('sagemaker')
boto3_sm_client.describe_monitoring_schedule(MonitoringScheduleName='MyAbaloneModelMonitorSchedule')

{'MonitoringScheduleArn': 'arn:aws:sagemaker:us-east-1:709891711940:monitoring-schedule/MyAbaloneModelMonitorSchedule',
 'MonitoringScheduleName': 'MyAbaloneModelMonitorSchedule',
 'MonitoringScheduleStatus': 'Scheduled',
 'MonitoringType': 'DataQuality',
 'CreationTime': datetime.datetime(2023, 6, 14, 6, 45, 9, 514000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2023, 6, 14, 10, 8, 12, 93000, tzinfo=tzlocal()),
 'MonitoringScheduleConfig': {'ScheduleConfig': {'ScheduleExpression': 'cron(0 * ? * * *)'},
  'MonitoringJobDefinitionName': 'data-quality-job-definition-2023-06-14-06-45-08-925',
  'MonitoringType': 'DataQuality'},
 'EndpointName': '1-2023-05-26-10-26-28-921',
 'LastMonitoringExecutionSummary': {'MonitoringScheduleName': 'MyAbaloneModelMonitorSchedule',
  'ScheduledTime': datetime.datetime(2023, 6, 14, 10, 0, tzinfo=tzlocal()),
  'CreationTime': datetime.datetime(2023, 6, 14, 10, 1, 31, 259000, tzinfo=tzlocal()),
  'LastModifiedTime': datetime.datetime(2023, 6, 

# Create a Model Quality Baseline

In [114]:
role, sagemaker_session

('arn:aws:iam::709891711940:role/service-role/SageMaker-MLOpsEngineer-poc',
 <sagemaker.session.Session at 0x7f2b96033d50>)

In [115]:
from sagemaker import get_execution_role, session, Session
from sagemaker.model_monitor import ModelQualityMonitor

# role = get_execution_role()
# session = Session()

model_quality_monitor = ModelQualityMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size_in_gb=20,
    max_runtime_in_seconds=1800,
    sagemaker_session=sagemaker_session
)

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: .
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


In [146]:
from datetime import datetime

baseline_model_quality_job_name = "MyAbaloneModelQualityMonitorBaselineJob-" + datetime.now().strftime("%m-%d-%Y-%H-%M-%S")
job = model_quality_monitor.suggest_baseline(
    job_name=baseline_model_quality_job_name,
    baseline_dataset=f"s3://{default_bucket}/MyAbalonePipeline/7p6mevksvmgc/MyAbaloneProcess/output/validation/validation_with_predictions.csv",
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=f"s3://{default_bucket}/my-abalone-model-quality-monitoring-report",
    problem_type="Regression",
    inference_attribute="prediction",
    # probability_attribute="probability",
    ground_truth_attribute="label",
)
job.wait(logs=True)


INFO:sagemaker:Creating processing-job with name MyAbaloneModelQualityMonitorBaselineJob-06-15-2023-09-57-58


..........................[34m2023-06-15 10:02:16,519 - matplotlib.font_manager - INFO - Generating new fontManager, this may take some time...[0m
[34m2023-06-15 10:02:17.068724: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory[0m
[34m2023-06-15 10:02:17.068753: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.[0m
[34m2023-06-15 10:02:18.656970: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory[0m
[34m2023-06-15 10:02:18.657000: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)[0m
[34m2023-06-15 10:02:18.657020: I tensorflow/stream_executor/cuda/cuda_diagnostic

In [147]:
baseline_job_model_quality = model_quality_monitor.latest_baselining_job

In [149]:
baseline_job_model_quality.describe()

{'ProcessingInputs': [{'InputName': 'baseline_dataset_input',
   'AppManaged': False,
   'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-709891711940/MyAbalonePipeline/7p6mevksvmgc/MyAbaloneProcess/output/validation/validation_with_predictions.csv',
    'LocalPath': '/opt/ml/processing/input/baseline_dataset_input',
    'S3DataType': 'S3Prefix',
    'S3InputMode': 'File',
    'S3DataDistributionType': 'FullyReplicated',
    'S3CompressionType': 'None'}}],
 'ProcessingOutputConfig': {'Outputs': [{'OutputName': 'monitoring_output',
    'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-709891711940/my-abalone-model-quality-monitoring-report',
     'LocalPath': '/opt/ml/processing/output',
     'S3UploadMode': 'EndOfJob'},
    'AppManaged': False}]},
 'ProcessingJobName': 'MyAbaloneModelQualityMonitorBaselineJob-06-15-2023-09-57-58',
 'ProcessingResources': {'ClusterConfig': {'InstanceCount': 1,
   'InstanceType': 'ml.m5.xlarge',
   'VolumeSizeInGB': 20}},
 'StoppingCondition': {'MaxRuntimeIn

In [152]:
import pandas as pd

pd.DataFrame(baseline_job_model_quality.suggested_constraints().body_dict["regression_constraints"]).T

Unnamed: 0,threshold,comparison_operator
mae,1.186433,GreaterThanThreshold
mse,2.700755,GreaterThanThreshold
rmse,1.643397,GreaterThanThreshold
r2,0.756449,LessThanThreshold


In [221]:
baseline_job_model_quality1 = model_quality_monitor.latest_baselining_job

In [222]:
baseline_job_model_quality1.describe()

{'ProcessingInputs': [{'InputName': 'baseline_dataset_input',
   'AppManaged': False,
   'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-709891711940/MyAbalonePipeline/7p6mevksvmgc/MyAbaloneProcess/output/validation/validation_with_predictions.csv',
    'LocalPath': '/opt/ml/processing/input/baseline_dataset_input',
    'S3DataType': 'S3Prefix',
    'S3InputMode': 'File',
    'S3DataDistributionType': 'FullyReplicated',
    'S3CompressionType': 'None'}}],
 'ProcessingOutputConfig': {'Outputs': [{'OutputName': 'monitoring_output',
    'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-709891711940/my-abalone-model-quality-monitoring-report',
     'LocalPath': '/opt/ml/processing/output',
     'S3UploadMode': 'EndOfJob'},
    'AppManaged': False}]},
 'ProcessingJobName': 'MyAbaloneModelQualityMonitorBaselineJob-06-15-2023-09-57-58',
 'ProcessingResources': {'ClusterConfig': {'InstanceCount': 1,
   'InstanceType': 'ml.m5.xlarge',
   'VolumeSizeInGB': 20}},
 'StoppingCondition': {'MaxRuntimeIn

In [223]:
import pandas as pd

pd.DataFrame(baseline_job_model_quality1.suggested_constraints().body_dict["regression_constraints"]).T

Unnamed: 0,threshold,comparison_operator
mae,1.186433,GreaterThanThreshold
mse,2.700755,GreaterThanThreshold
rmse,1.643397,GreaterThanThreshold
r2,0.756449,LessThanThreshold


In [179]:
df = pd.read_csv("test_data/validation.csv", header=None)

In [180]:
df

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10
0,5.0,-2.115216,-2.145375,-1.901303,-1.541292,-1.490821,-1.433321,-1.500373,0.0,1.0,0.0
1,13.0,0.258230,0.172519,-0.227545,-0.126939,-0.429706,-0.114904,0.259862,0.0,0.0,1.0
2,8.0,-0.449640,-0.482538,-0.944870,-0.730614,-0.882539,-0.835699,-0.720840,1.0,0.0,0.0
3,7.0,-2.031938,-2.044597,-2.020857,-1.458695,-1.416476,-1.396825,-1.475226,0.0,1.0,0.0
4,11.0,0.966100,0.877965,0.609334,0.975379,1.435693,1.016471,0.195200,0.0,0.0,1.0
...,...,...,...,...,...,...,...,...,...,...,...
622,10.0,-1.490625,-1.540707,-1.423087,-1.345506,-1.348889,-1.259965,-1.327941,0.0,0.0,1.0
623,8.0,0.508067,0.273297,0.011563,-0.271740,-0.229199,-0.470740,-0.102962,1.0,0.0,0.0
624,8.0,-0.449640,-0.432149,-0.466653,-0.442033,-0.145841,-0.279136,-0.710063,0.0,0.0,1.0
625,8.0,0.216591,0.172519,0.250672,-0.126939,-0.091772,-0.041912,-0.207139,0.0,1.0,0.0


In [None]:
len(df.columns)
for i in range(1, len(df.columns)):
    if i < 8:
        df[i] = np.random.uniform(df[i].min(), df[i].max(), len(df))
    else:
        df[i] = np.random.randint(df[i].min(), 2, len(df), )

In [197]:
df.to_csv("test_data/validation_with_data_drift.csv", header=False, index=False)

In [198]:
from time import sleep

validate_dataset = "validation_with_predictions_data_drift.csv"
sm_client = boto3.client("sagemaker-runtime")
# Cut off threshold of 80%
cutoff = 0.8

limit = 200 # Need at least 200 samples to compute standard deviations
i = 0
with open(f"test_data/{validate_dataset}", "w") as validation_file:
    validation_file.write("prediction,label\n") # CSV header
    with open("test_data/validation_with_data_drift.csv", "r") as f:
        for row in f:
            (label, input_cols) = row.split(",", 1)
            res = sm_client.invoke_endpoint(EndpointName=endpoint_name,
                          ContentType='text/csv',
                          Body=input_cols)
            prediction = res["Body"].read().decode()
            # prediction = "1" if probability > cutoff else "0"
            validation_file.write(f"{prediction},{label}\n")
            i += 1
            if i > limit:
                break
            print(".", end="", flush=True)
            sleep(0.5)
print()
print("Done!")

........................................................................................................................................................................................................
Done!


In [206]:
a = res

b''

# Schedule Model Quality Monitoring Jobs

In [217]:
from sagemaker.model_monitor import CronExpressionGenerator
from sagemaker.model_monitor import EndpointInput

model_quality_schedule_name = "MyAbaloneModelQualityMonitorSchedule"
gt_s3_uri = f"s3://{default_bucket}/my-abalone-capture_data/1-2023-05-26-10-26-28-921/AllTraffic/2023/06/15/12/"

model_quality_monitor_schedule = model_quality_monitor.create_monitoring_schedule(
    monitor_schedule_name=model_quality_schedule_name,
    output_s3_uri=f"s3://{default_bucket}/my-abalone-model-quality-monitoring-report",
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    # statistics=model_quality_monitor.baseline_statistics(),
    ground_truth_input=gt_s3_uri,
    problem_type="Regression",
    constraints=model_quality_monitor.suggested_constraints(),
    enable_cloudwatch_metrics=True,
    endpoint_input=EndpointInput(
        endpoint_name=endpoint_name,
        destination="/opt/ml/processing/input/endpoint",
        start_time_offset="-PT2H",
        end_time_offset="-PT1H",
        inference_attribute="prediction"
    )
)

INFO:sagemaker.model_monitor.model_monitoring:Creating Monitoring Schedule with name: MyAbaloneModelQualityMonitorSchedule


In [220]:
model_quality_monitor.stop_monitoring_schedule()


Stopping Monitoring Schedule with name: MyAbaloneModelQualityMonitorSchedule
