In [2]:
import os
import boto3
import re
import time
from time import gmtime, strftime
import json
from datetime import datetime, timedelta
from sagemaker import get_execution_role, session
import pandas as pd

region = boto3.Session().region_name

role = get_execution_role()
print("RoleArn: {}".format(role))

RoleArn: arn:aws:iam::874163252636:role/service-role/AmazonSageMaker-ExecutionRole-20201201T202376


In [3]:
bucket = session.Session(boto3.Session()).default_bucket()

print("Demo Bucket: {}".format(bucket))
# prefix = f"sagemaker/demo-preprocess-custom-monitor-batch-transform/{int(time.time())}" 1682420795
prefix = f"sagemaker/demo-preprocess-custom-monitor-batch-transform/1682420795" 


# batch transform output
s3_inference_output_uri = "s3://{}/{}/transform-outputs".format(bucket, prefix)

# monitoring results for violations
monitor_reports_prefix = "{}/monitor-reports".format(prefix)
s3_monitor_report_uri = "s3://{}/{}".format(bucket, monitor_reports_prefix)

# baseline dataset paths
baseline_prefix = f"{prefix}/baseline"
baseline_data_prefix = baseline_prefix + "/data"
s3_baseline_data_uri = f"s3://{bucket}/{baseline_data_prefix}"   # this has to come from training pipeline

# model artifacts
s3_model_artifacts_uri = 's3://sagemaker-ap-south-1-874163252636/sagemaker/demo-preprocess-custom-monitor-batch-transform/1682420795/model-artifacts/pipelines-pz947y174s62-ModelTraining-m818UPms0R/output/model.tar.gz'

# test data
s3_inference_data_uri = f"s3://{bucket}/{prefix}/inference-data/test_sample.csv"     # needs to come from the developer

print(f"Inference Output uri: {s3_inference_output_uri}")
print(f"Baseline data uri: {s3_baseline_data_uri}")
print(f"Monitoring Report uri: {s3_monitor_report_uri}")
print(f"Training Job Name: {training_job_name}")
print(f"Model Artifacts: {s3_model_artifacts_uri}")
print(f"Inference Input Uri: {s3_inference_data_uri}")

Demo Bucket: sagemaker-ap-south-1-874163252636
Inference Output uri: s3://sagemaker-ap-south-1-874163252636/sagemaker/demo-preprocess-custom-monitor-batch-transform/1682420795/transform-outputs
Baseline data uri: s3://sagemaker-ap-south-1-874163252636/sagemaker/demo-preprocess-custom-monitor-batch-transform/1682420795/baseline/data
Monitoring Report uri: s3://sagemaker-ap-south-1-874163252636/sagemaker/demo-preprocess-custom-monitor-batch-transform/1682420795/monitor-reports
Training Job Name: pipelines-pz947y174s62-ModelTraining-m818UPms0R
Model Artifacts: s3://sagemaker-ap-south-1-874163252636/sagemaker/demo-preprocess-custom-monitor-batch-transform/1682420795/model-artifacts/pipelines-pz947y174s62-ModelTraining-m818UPms0R/output/model.tar.gz
Inference Input Uri: s3://sagemaker-ap-south-1-874163252636/sagemaker/demo-preprocess-custom-monitor-batch-transform/1682420795/inference-data/test_sample.csv


In [4]:
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.steps import TransformStep
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.model_step import ModelStep

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.model import Model
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.image_uris import retrieve
from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.inputs import CreateModelInput

In [5]:
pipeline_session = PipelineSession()

inference/testing data


In [6]:
!aws s3 cp test_data/test_sample.csv {s3_inference_data_uri}
!aws s3 ls {s3_inference_data_uri}

upload: test_data/test_sample.csv to s3://sagemaker-ap-south-1-874163252636/sagemaker/demo-preprocess-custom-monitor-batch-transform/1682420795/inference-data/test_sample.csv
2023-05-02 18:45:51      28734 test_sample.csv


In [7]:
baseline_input_data = ParameterString(
    name="BaselineData", 
    default_value=s3_baseline_data_uri
)

model_location = ParameterString(
    name='ModelLocation',
    default_value=s3_model_artifacts_uri
)

inference_input_data = ParameterString(
    name="InferenceData", 
    default_value=s3_inference_data_uri
)

monitor_output = ParameterString(
    name="MonitorOutput", 
    default_value=s3_monitor_report_uri
)

inference_output = ParameterString(
    name="InferenceOutput", 
    default_value=s3_inference_output_uri
)

### Preprocessing

create the script that is going to do the preprocessing, then define the script object

In [8]:
!mkdir -p inference_code

In [9]:
%%writefile inference_code/preprocess.py
import argparse
import boto3
import pandas as pd
import numpy as np
import sys
import subprocess

subprocess.check_call([
    sys.executable, "-m", "pip", "install", "-r",
    "/opt/ml/processing/input/code/custom_packages/requirements.txt",
])

base_dir = '/opt/ml/processing'

def preprocess(data):
    # do some pre-processing
    return data

def main(args):
    # read inference data for preprocessing from {base_dir}/input
    data = pd.read_csv(f'{base_dir}/input/{args.file_name}', header=None)
    processed_data = preprocess(data)
    # store the inference ready processed data ta {base_dir}/output
    processed_data.to_csv(f'{base_dir}/output/inference_processed_data.csv', header=False, index=False)
    
if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--file_name', help='file name for preprocessing')
    args = parser.parse_args()
    main(args)

Overwriting inference_code/preprocess.py


In [10]:
framework_version = "1.2-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=inference_input_data, destination="/opt/ml/processing/input"),
        ProcessingInput(source='custom_packages/', destination="/opt/ml/processing/input/code/custom_packages/"),
    ],
    outputs=[
        ProcessingOutput(output_name="output", source="/opt/ml/processing/output")
    ],
    code="inference_code/preprocess.py",
    arguments=['--file_name', 'test_sample.csv']
)

step_process = ProcessingStep(name="InferencePreProcessing", step_args=processor_args)



### Batch Transform

first we create the model object and then use it to do the batch transform

In [11]:
%%writefile inference_code/inference.py
import argparse
import os
import boto3
import json
import pandas as pd
import numpy as np
import xgboost as xgb
from io import StringIO

def model_fn(model_dir):
    model = xgb.XGBClassifier()
    model.load_model(os.path.join(model_dir,"model.json"))
    return model

"""
input_fn
    request_body: The body of the request sent to the model.
    request_content_type: (string) specifies the format/variable type of the request
"""
def input_fn(request_body, request_content_type):
    if request_content_type == "text/csv":
        data = pd.read_csv(StringIO(request_body), header=None, index_col=False)
        return data
    else:
        raise ValueError("This model only supports text/csv input")

"""
predict_fn
    input_data: returned array from input_fn above
    model (sklearn model) returned model loaded from model_fn above
"""
def predict_fn(input_data, model):
    return model.predict(input_data)

"""
output_fn
    prediction: the returned value from predict_fn above
    content_type: the content type the endpoint expects to be returned. Ex: JSON, string
"""
def output_fn(prediction, content_type):
    response = '\n'.join([str(x) for x in prediction.tolist()])
    return response

Overwriting inference_code/inference.py


In [14]:
image_uri = retrieve(framework='sklearn', version='1.2-1', region=region)
model = Model(
    image_uri=image_uri,
    model_data=model_location, 
    role=role,
    sagemaker_session=pipeline_session,
    source_dir='inference_code',
    entry_point='inference.py',
    dependencies=['custom_packages/requirements.txt'] 
)

load_model_step = ModelStep(
    name="LoadModelForInference",
    step_args=model.create(instance_type="ml.m5.xlarge"),
    # depends_on=[step_process]
)

In [15]:
transformer = Transformer(
    model_name=load_model_step.properties.ModelName,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    accept="text/csv",
    assemble_with="Line",
    strategy="MultiRecord",
    output_path=inference_output,
    sagemaker_session=pipeline_session,
)

transform_arg = transformer.transform(
    step_process.properties.ProcessingOutputConfig.Outputs["output"].S3Output.S3Uri,
    content_type="text/csv",
    split_type="Line",
    join_source="Input",
    input_filter="$",
    output_filter="$",
)

step_transform = TransformStep(
    name="BatchTransform", step_args=transform_arg,
)

### Monitoring Data and Predictions

processing script for monitoring the data and predictions that is to be run post batch transform

In [16]:
%%writefile inference_code/monitor.py
import boto3
import pandas as pd
import numpy as np
import sys
import subprocess

subprocess.check_call([
    sys.executable, "-m", "pip", "install", "-r",
    "/opt/ml/processing/input/code/custom_packages/requirements.txt",
])

base_dir = '/opt/ml/processing'

def monitor_data_and_model(baseline_data, inference_data):
    # do some monitoring
    return "ALL GOOD"

def main():
    # read baseline and inference data for monitoring data and model predictions {base_dir}/input
    baseline_data = pd.read_csv(f'{base_dir}/input/baseline/training_processed_data.csv', header=None)
    inference_data = pd.read_csv(f'{base_dir}/input/inference/inference_processed_data.csv.out', header=None)
    
    
    result = monitor_data_and_model(baseline_data, inference_data)
    # store the monitoring results ready processed data ta {base_dir}/output
    with open(f'{base_dir}/output/monitoring_result.txt', 'w') as f:
        f.write(result)
    
if __name__ == "__main__":
    main()

Overwriting inference_code/monitor.py


In [17]:
framework_version = "1.2-1"

monitor_sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    role=role,
    sagemaker_session=pipeline_session,
)
monitor_args = monitor_sklearn_processor.run(
    inputs=[
        ProcessingInput(source=inference_output, destination="/opt/ml/processing/input/inference"),
        ProcessingInput(source=baseline_input_data, destination="/opt/ml/processing/input/baseline"),
        ProcessingInput(source='custom_packages/', destination="/opt/ml/processing/input/code/custom_packages/"),
    ],
    outputs=[
        ProcessingOutput(source="/opt/ml/processing/output", destination=monitor_output)
    ],
    code="inference_code/monitor.py"
)

step_monitor = ProcessingStep(name="DataAndModelMonitoring", step_args=monitor_args, depends_on=[step_transform])

In [20]:
pipeline_name = f"InferencePipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        baseline_input_data,
        model_location,
        inference_input_data,
        monitor_output,
        inference_output,
    ],
    steps=[step_process, load_model_step, step_transform, step_monitor],
)

In [21]:
pipeline.upsert(role_arn=role)
execution = pipeline.start()

