In [206]:
import boto3
import pandas as pd
import numpy as np
from sagemaker import get_execution_role
import sagemaker
from sklearn.model_selection import train_test_split
from sklearn.datasets import fetch_california_housing

sm_boto3 = boto3.client("sagemaker")

sess = sagemaker.Session()

region = sess.boto_session.region_name

bucket = sess.default_bucket()  # this could also be a hard-coded bucket name

prefix = "nextera/monitoring"

print("Using bucket " + bucket)

Using bucket sagemaker-us-west-2-376678947624


### Prepare data

In [207]:
from sklearn.preprocessing import LabelEncoder
encoder = LabelEncoder()

df=pd.read_csv("data/X.csv")

df['Categorical'] = encoder.fit_transform(df['Categorical'])
X = df[list(df)[:-1]]
Y = df[list(df)[-1]]

X_train, X_test, y_train, y_test = train_test_split(
    X, Y, test_size=0.25, random_state=42
)

trainX = pd.DataFrame(X_train, columns=list(X))
trainX["Categorical"] = y_train

testX = pd.DataFrame(X_test, columns=list(X))
testX["Categorical"] = y_test

trainX

Unnamed: 0,Feature_1,Feature_2,Feature_3,Feature_4,Feature_5,Categorical
82,0.214578,0.763897,0.737818,0.502509,0.021399,1
991,0.479547,0.503150,0.436058,0.025125,0.224305,2
789,0.889155,0.458825,0.061542,0.884091,0.112252,0
894,0.628752,0.162416,0.506247,0.493969,0.096360,2
398,0.043922,0.682004,0.977617,0.784269,0.583460,1
...,...,...,...,...,...,...
106,0.542415,0.760993,0.731476,0.594218,0.593856,0
270,0.792883,0.758514,0.230150,0.093366,0.340973,1
860,0.622484,0.578172,0.418298,0.450728,0.827166,1
435,0.140280,0.507312,0.089126,0.042398,0.550013,1


In [208]:
trainX.to_csv("X_train.csv", index=False)
testX.to_csv("X_test.csv", index=False)

In [209]:
# send data to S3. SageMaker will take training data from s3
trainpath = sess.upload_data(
    path="X_train.csv", bucket=bucket, key_prefix=prefix
)

testpath = sess.upload_data(
    path="X_test.csv", bucket=bucket, key_prefix=prefix
)

### Training script

In [210]:
%%writefile script.py

import argparse
import joblib
import os

import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor


# inference functions ---------------
def model_fn(model_dir):
    clf = joblib.load(os.path.join(model_dir, "model.joblib"))
    return clf


if __name__ == "__main__":
    print("extracting arguments")
    parser = argparse.ArgumentParser()

    # hyperparameters sent by the client are passed as command-line arguments to the script.
    # to simplify the demo we don't use all sklearn RandomForest hyperparameters
    parser.add_argument("--n-estimators", type=int, default=10)
    parser.add_argument("--min-samples-leaf", type=int, default=3)

    # Data, model, and output directories
    parser.add_argument("--model-dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    parser.add_argument("--test", type=str, default=os.environ.get("SM_CHANNEL_TEST"))
    parser.add_argument("--train-file", type=str, default="X_train.csv")
    parser.add_argument("--test-file", type=str, default="X_test.csv")
    parser.add_argument(
        "--features", type=str
    )  # in this script we ask user to explicitly name features
    parser.add_argument(
        "--target", type=str
    )  # in this script we ask user to explicitly name the target

    args, _ = parser.parse_known_args()

    print("reading data")
    train_df = pd.read_csv(os.path.join(args.train, args.train_file))
    test_df = pd.read_csv(os.path.join(args.test, args.test_file))

    print("building training and testing datasets")
    
    X_train = train_df[args.features.split()]
    X_test = test_df[args.features.split()]
    y_train = train_df[args.target]
    y_test = test_df[args.target]

    # train
    print("training model")
    model = RandomForestRegressor(
        n_estimators=args.n_estimators, min_samples_leaf=args.min_samples_leaf, n_jobs=-1
    )

    model.fit(X_train, y_train)

    # print abs error
    print("validating model")
    abs_err = np.abs(model.predict(X_test) - y_test)

    # print couple perf metrics
    for q in [10, 50, 90]:
        print("AE-at-" + str(q) + "th-percentile: " + str(np.percentile(a=abs_err, q=q)))

    # persist model
    path = os.path.join(args.model_dir, "model.joblib")
    joblib.dump(model, path)
    print("model persisted at " + path)
    print(args.min_samples_leaf)

Overwriting script.py


In [211]:
def list_to_string(input_list):
    string = ''
    for item in input_list:
        string += f"{item} "
    return string.strip()


features = list_to_string(list(df)[:-1])

target = list(df)[-1]
features

'Feature_1 Feature_2 Feature_3 Feature_4 Feature_5'

In [212]:
! python script.py --n-estimators 100 \
                   --min-samples-leaf 2 \
                   --model-dir ./ \
                   --train ./ \
                   --test ./ \
                   --features 'Feature_1 Feature_2 Feature_3 Feature_4 Feature_5' \
                   --target {target}

extracting arguments
reading data
building training and testing datasets
training model
validating model
AE-at-10th-percentile: 0.13623333333333337
AE-at-50th-percentile: 0.8872361111111111
AE-at-90th-percentile: 1.2147714285714284
model persisted at ./model.joblib
2


### SageMaker Training

In [213]:
# We use the Estimator from the SageMaker Python SDK
from sagemaker.sklearn.estimator import SKLearn

FRAMEWORK_VERSION = "0.23-1"

sklearn_estimator = SKLearn(
    entry_point="script.py",
    role=get_execution_role(),
    instance_count=1,
    instance_type="ml.c5.xlarge",
    framework_version=FRAMEWORK_VERSION,
    base_job_name="rf-scikit",
    metric_definitions=[{"Name": "median-AE", "Regex": "AE-at-50th-percentile: ([0-9.]+).*$"}],
    hyperparameters={
        "n-estimators": 100,
        "min-samples-leaf": 3,
        "features": features,
        "target": target,
    },
)

# launch training job, with asynchronous call
sklearn_estimator.fit({"train": trainpath, "test": testpath}, wait=True)

INFO:sagemaker:Creating training-job with name: rf-scikit-2023-10-06-01-20-08-930


Using provided s3_resource
2023-10-06 01:20:09 Starting - Starting the training job...
2023-10-06 01:20:26 Starting - Preparing the instances for training.........
2023-10-06 01:21:40 Downloading - Downloading input data...
2023-10-06 01:22:36 Training - Training image download completed. Training in progress.
2023-10-06 01:22:36 Uploading - Uploading generated training model[34m2023-10-06 01:22:29,501 sagemaker-containers INFO     Imported framework sagemaker_sklearn_container.training[0m
[34m2023-10-06 01:22:29,504 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2023-10-06 01:22:29,542 sagemaker_sklearn_container.training INFO     Invoking user training script.[0m
[34m2023-10-06 01:22:29,692 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2023-10-06 01:22:29,702 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2023-10-06 01:22:29,712 sagemaker-traini

In [214]:
sklearn_estimator.latest_training_job.wait(logs="None")
artifact = sm_boto3.describe_training_job(
    TrainingJobName=sklearn_estimator.latest_training_job.name
)["ModelArtifacts"]["S3ModelArtifacts"]

print("Model artifact persisted at " + artifact)


2023-10-06 01:22:47 Starting - Preparing the instances for training
2023-10-06 01:22:47 Downloading - Downloading input data
2023-10-06 01:22:47 Training - Training image download completed. Training in progress.
2023-10-06 01:22:47 Uploading - Uploading generated training model
2023-10-06 01:22:47 Completed - Training job completed
Model artifact persisted at s3://sagemaker-us-west-2-376678947624/rf-scikit-2023-10-06-01-20-08-930/output/model.tar.gz


In [215]:
from sagemaker.sklearn.model import SKLearnModel

model = SKLearnModel(
    model_data=artifact,
    role=get_execution_role(),
    entry_point="inference_handler.py",
    framework_version=FRAMEWORK_VERSION,
)

In [216]:
from sagemaker.model_monitor import DataCaptureConfig
from sagemaker.utils import name_from_base

endpoint_name = name_from_base(prefix.replace('/', '-'))

s3_capture_upload_path = f"s3://{bucket}/{prefix}/data-capture"

# Specify either Input, Output or both. 
capture_modes = ['REQUEST','RESPONSE']

data_capture_config = DataCaptureConfig(
    enable_capture = True, 
    sampling_percentage = 100, # Optional
    destination_s3_uri = s3_capture_upload_path, # Optional
    capture_options = ["REQUEST", "RESPONSE"],
)

predictor = model.deploy(
    instance_type="ml.c5.large",
    initial_instance_count=1,
    endpoint_name=endpoint_name,
    data_capture_config=data_capture_config
)

INFO:sagemaker:Creating model with name: sagemaker-scikit-learn-2023-10-06-01-23-29-105
INFO:sagemaker:Creating endpoint-config with name nextera-monitoring-2023-10-06-01-23-28-768
INFO:sagemaker:Creating endpoint with name nextera-monitoring-2023-10-06-01-23-28-768


----!

In [217]:
import io
import json

# authenticating with AWS
runtime_sm_client = boto3.client(service_name="sagemaker-runtime")

data = testX[list(df)[:-1]]

csv_file = io.StringIO()

#loop over each row of pandas df and convert each row to json
for index, row in data.iterrows():
    row = row.to_list()
    payload = ",".join(str(x) for x in row)

    # invoking endpoint
    response = runtime_sm_client.invoke_endpoint(
        EndpointName=endpoint_name,
        Body=payload,
        Accept="text/csv",
        ContentType="text/csv",  # for csv 'application/x-npy' for numpy
    )
    
    result = json.loads(response["Body"].read())
    print(result)

0.8393766233766238
0.7136541514041512
0.8725263347763347
1.0258849206349208
0.7723917748917748
0.9773885281385287
1.0824635642135643
0.9626071428571428
0.9885555555555555
0.9933098845598843
0.8257582972582972
1.3816165223665229
1.0040101010101012
0.761507575757576
0.9654436396936399
1.2250515873015875
1.0581507936507937
1.0840310245310245
1.4100833333333334
0.8240873015873018
0.9297171717171716
0.9797936507936508
0.6287420634920635
0.7151111111111113
1.3134523809523808
1.1718492063492063
1.2059372294372295
0.968468253968254
1.1405588023088022
1.010085137085137
1.0578466810966807
1.058455988455988
1.046724025974026
1.1378809523809528
1.0224051226551225
1.14041341991342
1.2408690476190474
1.052861111111111
1.2428997113997113
1.0425588023088024
1.161949134199134
1.1260984848484845
0.6115075757575759
1.2074761904761906
1.222797619047619
1.054289294039294
1.1908881673881675
0.6137218614718614
1.0426139971139974
1.0446334776334776
1.2017032134532135
1.1342817460317463
0.8156969141969141
0.99

### View captured data

In [219]:
import time

# the data capture may take a few seconds to appear
time.sleep(60)

s3_client = boto3.Session().client("s3")
current_endpoint_capture_prefix = f"{prefix}/data-capture/{endpoint_name}"

result = s3_client.list_objects(Bucket=bucket, Prefix=current_endpoint_capture_prefix)
capture_files = [capture_file.get("Key") for capture_file in result.get("Contents")]
print("Found Capture Files:")
print("\n ".join(capture_files))

INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole


Found Capture Files:
nextera/monitoring/data-capture/nextera-monitoring-2023-10-06-01-23-28-768/AllTraffic/2023/10/06/01/26-00-831-fe893b9d-5dd8-4f42-a611-b8195684bafa.jsonl


In [220]:
def get_obj_body(obj_key):
    return s3_client.get_object(Bucket=bucket, Key=obj_key).get("Body").read().decode("utf-8")

capture_file = get_obj_body(capture_files[-1])
print(capture_file)

{"captureData":{"endpointInput":{"observedContentType":"text/csv","mode":"INPUT","data":"0.0280255740907158,0.5911467324138753,0.0599007932290646,0.569113320570193,0.5429393475493252","encoding":"CSV"},"endpointOutput":{"observedContentType":"text/csv; charset=utf-8","mode":"OUTPUT","data":"0.8393766233766238","encoding":"CSV"}},"eventMetadata":{"eventId":"be8b18d4-a5b3-45cf-ae8e-19e3e78ebff3","inferenceTime":"2023-10-06T01:26:00Z"},"eventVersion":"0"}
{"captureData":{"endpointInput":{"observedContentType":"text/csv","mode":"INPUT","data":"0.7712715537818506,0.3593302345612462,0.1456648117288652,0.2220712371942508,0.1637907072893926","encoding":"CSV"},"endpointOutput":{"observedContentType":"text/csv; charset=utf-8","mode":"OUTPUT","data":"0.7136541514041512","encoding":"CSV"}},"eventMetadata":{"eventId":"986d5f3f-18af-413c-afb2-cca026db67c8","inferenceTime":"2023-10-06T01:26:00Z"},"eventVersion":"0"}
{"captureData":{"endpointInput":{"observedContentType":"text/csv","mode":"INPUT","dat

In [221]:
import json

print(json.dumps(json.loads(capture_file.split("\n")[0]), indent=2))

{
  "captureData": {
    "endpointInput": {
      "observedContentType": "text/csv",
      "mode": "INPUT",
      "data": "0.0280255740907158,0.5911467324138753,0.0599007932290646,0.569113320570193,0.5429393475493252",
      "encoding": "CSV"
    },
    "endpointOutput": {
      "observedContentType": "text/csv; charset=utf-8",
      "mode": "OUTPUT",
      "data": "0.8393766233766238",
      "encoding": "CSV"
    }
  },
  "eventMetadata": {
    "eventId": "be8b18d4-a5b3-45cf-ae8e-19e3e78ebff3",
    "inferenceTime": "2023-10-06T01:26:00Z"
  },
  "eventVersion": "0"
}


### Model Monitoring

In [222]:
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat
from sagemaker.model_monitor import CronExpressionGenerator
from monitoringjob_utils import run_model_monitor_job_processor

#Create a monitoring object
my_default_monitor = DefaultModelMonitor(
    role=get_execution_role(),
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: .
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


In [223]:
baseline_results_uri = f"s3://{bucket}/{prefix}/baseline"

In [224]:
#Start baseline job
my_default_monitor.suggest_baseline(
    baseline_dataset="X_train.csv",
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    wait=True,
)

INFO:sagemaker:Creating processing-job with name baseline-suggestion-job-2023-10-06-03-03-49-833


..............................[34m2023-10-06 03:08:51.308145: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory[0m
[34m2023-10-06 03:08:51.308176: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.[0m
[34m2023-10-06 03:08:52.806948: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory[0m
[34m2023-10-06 03:08:52.806975: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)[0m
[34m2023-10-06 03:08:52.806995: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (ip-10-0-66-163.us-west-2.compute.internal): /proc/d

[34m2023-10-06 03:08:59,777 - bootstrap - INFO - Failed to run /usr/hadoop-3.0.0/bin/hdfs --daemon start datanode, return code 1[0m
[34m2023-10-06 03:08:59,777 - bootstrap - INFO - Running command: /usr/hadoop-3.0.0/bin/yarn --daemon start resourcemanager[0m
[34m2023-10-06 03:09:01,844 - bootstrap - INFO - Failed to run /usr/hadoop-3.0.0/bin/yarn --daemon start resourcemanager, return code 1[0m
[34m2023-10-06 03:09:01,844 - bootstrap - INFO - Running command: /usr/hadoop-3.0.0/bin/yarn --daemon start nodemanager[0m
[34m2023-10-06 03:09:03,974 - bootstrap - INFO - Failed to run /usr/hadoop-3.0.0/bin/yarn --daemon start nodemanager, return code 1[0m
[34m2023-10-06 03:09:03,974 - bootstrap - INFO - Running command: /usr/hadoop-3.0.0/bin/yarn --daemon start proxyserver[0m
[34m2023-10-06 03:09:06,183 - bootstrap - INFO - Failed to run /usr/hadoop-3.0.0/bin/yarn --daemon start proxyserver, return code 1[0m
[34m2023-10-06 03:09:06,184 - DefaultDataAnalyzer - INFO - Total number

[34m2023-10-06 03:09:36,229 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.0.66.163:54680) with ID 1,  ResourceProfileId 0[0m
[34m2023-10-06 03:09:36,468 INFO storage.BlockManagerMasterEndpoint: Registering block manager algo-1:46567 with 5.8 GiB RAM, BlockManagerId(1, algo-1, 46567, None)[0m
[34m2023-10-06 03:09:49,765 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000000000(ns)[0m
[34m2023-10-06 03:09:49,945 WARN spark.SparkContext: Spark is not running in local mode, therefore the checkpoint directory must not be on the local filesystem. Directory '/tmp' appears to be on the local filesystem.[0m
[34m2023-10-06 03:09:49,998 INFO internal.SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.[0m
[34m2023-10-06 03:09:50,003 INFO internal.SharedState: Warehouse p

[34m2023-10-06 03:10:01,309 WARN util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.[0m
[34m2023-10-06 03:10:01,447 INFO scheduler.DAGScheduler: Registering RDD 16 (collect at AnalysisRunner.scala:326) as input to shuffle 0[0m
[34m2023-10-06 03:10:01,451 INFO scheduler.DAGScheduler: Got map stage job 2 (collect at AnalysisRunner.scala:326) with 1 output partitions[0m
[34m2023-10-06 03:10:01,451 INFO scheduler.DAGScheduler: Final stage: ShuffleMapStage 2 (collect at AnalysisRunner.scala:326)[0m
[34m2023-10-06 03:10:01,452 INFO scheduler.DAGScheduler: Parents of final stage: List()[0m
[34m2023-10-06 03:10:01,454 INFO scheduler.DAGScheduler: Missing parents: List()[0m
[34m2023-10-06 03:10:01,455 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 2 (MapPartitionsRDD[16] at collect at AnalysisRunner.scala:326), which has no missing parents[0m
[34m2023-10-06 03:10




<sagemaker.processing.ProcessingJob at 0x7fd53bf6e590>

### Explore the generated constraints and statistics

In [225]:
s3_client = boto3.Session().client("s3")
result = s3_client.list_objects(Bucket=bucket, Prefix=f"{prefix}/baseline")
report_files = [report_file.get("Key") for report_file in result.get("Contents")]
print("Found Files:")
print("\n ".join(report_files))

INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole


Found Files:
nextera/monitoring/baseline/constraints.json
 nextera/monitoring/baseline/statistics.json


In [226]:
import pandas as pd

baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df.head(10)

Unnamed: 0,name,inferred_type,numerical_statistics.common.num_present,numerical_statistics.common.num_missing,numerical_statistics.mean,numerical_statistics.sum,numerical_statistics.std_dev,numerical_statistics.min,numerical_statistics.max,numerical_statistics.distribution.kll.buckets,numerical_statistics.distribution.kll.sketch.parameters.c,numerical_statistics.distribution.kll.sketch.parameters.k,numerical_statistics.distribution.kll.sketch.data
0,Feature_1,Fractional,750,0,0.490422,367.816444,0.292162,0.000337,0.998874,"[{'lower_bound': 0.0003366049931977, 'upper_bo...",0.64,2048.0,"[[0.2145784840765139, 0.4795467028635293, 0.88..."
1,Feature_2,Fractional,750,0,0.498001,373.500676,0.286613,0.000427,0.995704,"[{'lower_bound': 0.0004266333575733, 'upper_bo...",0.64,2048.0,"[[0.7638973904193592, 0.5031499258174436, 0.45..."
2,Feature_3,Fractional,750,0,0.490931,368.198324,0.298426,0.006519,0.998166,"[{'lower_bound': 0.0065188267934968, 'upper_bo...",0.64,2048.0,"[[0.7378183417305062, 0.4360578982202232, 0.06..."
3,Feature_4,Fractional,750,0,0.494117,370.587769,0.289748,0.000117,0.996621,"[{'lower_bound': 0.0001166676378706, 'upper_bo...",0.64,2048.0,"[[0.5025094457763358, 0.0251246396213712, 0.88..."
4,Feature_5,Fractional,750,0,0.500548,375.411193,0.295853,6e-06,0.99972,"[{'lower_bound': 5.574711805622634e-06, 'upper...",0.64,2048.0,"[[0.0213988488146308, 0.2243047361588681, 0.11..."
5,Categorical,Integral,750,0,1.037333,778.0,0.792426,0.0,2.0,"[{'lower_bound': 0.0, 'upper_bound': 0.2, 'cou...",0.64,2048.0,"[[1.0, 2.0, 0.0, 2.0, 1.0, 1.0, 0.0, 1.0, 0.0,..."


In [227]:
constraints_df = pd.json_normalize(
    baseline_job.suggested_constraints().body_dict["features"]
)
constraints_df.head(10)

Unnamed: 0,name,inferred_type,completeness,num_constraints.is_non_negative
0,Feature_1,Fractional,1.0,True
1,Feature_2,Fractional,1.0,True
2,Feature_3,Fractional,1.0,True
3,Feature_4,Fractional,1.0,True
4,Feature_5,Fractional,1.0,True
5,Categorical,Integral,1.0,True


### Trigger job instantly

In [228]:
s3_stats = ''
s3_const = ""

for file in report_files:
    if "statistics" in file:
        s3_stats=f"s3://{bucket}/{file}"
    else:
        s3_const=f"s3://{bucket}/{file}"
print(s3_stats)
print(s3_const)

s3://sagemaker-us-west-2-376678947624/nextera/monitoring/baseline/statistics.json
s3://sagemaker-us-west-2-376678947624/nextera/monitoring/baseline/constraints.json


In [229]:
preprocess_prefix = f"{prefix}/preprocess"
preprocess_file = "preprocess_v7.py"

trainpath = sess.upload_data(
    path=preprocess_file, bucket=bucket, key_prefix=preprocess_prefix
)

preprocess_path = f"s3://{bucket}/{preprocess_prefix}/{preprocess_file}"
print(preprocess_path)

s3://sagemaker-us-west-2-376678947624/nextera/monitoring/preprocess/preprocess_v7.py


In [230]:
reports_path = name_from_base('reports')


processor = run_model_monitor_job_processor(
    region = region,
    instance_type = "ml.m5.xlarge",
    role = get_execution_role(),
    data_capture_path = f"{s3_capture_upload_path}/{endpoint_name}",
    statistics_path = s3_stats,
    constraints_path = s3_const,
    reports_path = f"s3://{bucket}/{prefix}/{reports_path}",
#     preprocessor_path=preprocess_path,
)

INFO:sagemaker:Creating processing-job with name sagemaker-model-monitor-analyzer-2023-10-06-03-10-50-923


................................[34m2023-10-06 03:16:04.833137: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory[0m
[34m2023-10-06 03:16:04.833175: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.[0m
[34m2023-10-06 03:16:06.343938: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory[0m
[34m2023-10-06 03:16:06.343966: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)[0m
[34m2023-10-06 03:16:06.343985: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (ip-10-0-177-84.us-west-2.compute.internal): /proc

[34m2023-10-06 03:16:19,920 - bootstrap - INFO - Failed to run /usr/hadoop-3.0.0/bin/yarn --daemon start proxyserver, return code 1[0m
[34m2023-10-06 03:16:19,921 - DefaultDataAnalyzer - INFO - Total number of hosts in the cluster: 1[0m
[34m2023-10-06 03:16:29,926 - DefaultDataAnalyzer - INFO - Running command: bin/spark-submit --master yarn --deploy-mode client --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider --conf spark.serializer=org.apache.spark.serializer.KryoSerializer /opt/amazon/sagemaker-data-analyzer-1.0-jar-with-dependencies.jar --analytics_input /tmp/spark_job_config.json[0m
[34m2023-10-06 03:16:31,543 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable[0m
[34m2023-10-06 03:16:31,933 INFO Main: Start analyzing with args: --analytics_input /tmp/spark_job_config.json[0m
[34m2023-10-06 03:16:31,969 INFO Main: Analytics input path: D

[34m2023-10-06 03:16:48,866 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.0.177.84:34014) with ID 1,  ResourceProfileId 0[0m
[34m2023-10-06 03:16:49,059 INFO storage.BlockManagerMasterEndpoint: Registering block manager algo-1:35715 with 5.8 GiB RAM, BlockManagerId(1, algo-1, 35715, None)[0m
[34m2023-10-06 03:17:03,659 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000000000(ns)[0m
[34m2023-10-06 03:17:03,856 WARN spark.SparkContext: Spark is not running in local mode, therefore the checkpoint directory must not be on the local filesystem. Directory '/tmp' appears to be on the local filesystem.[0m
[34m2023-10-06 03:17:03,922 INFO internal.SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.[0m
[34m2023-10-06 03:17:03,928 INFO internal.SharedState: Warehouse p

[34m2023-10-06 03:17:12,867 INFO codegen.CodeGenerator: Code generated in 15.423343 ms[0m
[34m2023-10-06 03:17:12,895 INFO spark.SparkContext: Starting job: head at DataAnalyzer.scala:124[0m
[34m2023-10-06 03:17:12,897 INFO scheduler.DAGScheduler: Got job 3 (head at DataAnalyzer.scala:124) with 1 output partitions[0m
[34m2023-10-06 03:17:12,897 INFO scheduler.DAGScheduler: Final stage: ResultStage 3 (head at DataAnalyzer.scala:124)[0m
[34m2023-10-06 03:17:12,897 INFO scheduler.DAGScheduler: Parents of final stage: List()[0m
[34m2023-10-06 03:17:12,900 INFO scheduler.DAGScheduler: Missing parents: List()[0m
[34m2023-10-06 03:17:12,906 INFO scheduler.DAGScheduler: Submitting ResultStage 3 (MapPartitionsRDD[34] at head at DataAnalyzer.scala:124), which has no missing parents[0m
[34m2023-10-06 03:17:12,987 INFO memory.MemoryStore: Block broadcast_8 stored as values in memory (estimated size 49.6 KiB, free 1458.1 MiB)[0m
[34m2023-10-06 03:17:12,989 INFO memory.MemoryStore: 


[34m2023-10-06 03:17:22,853 INFO spark.SparkContext: Successfully stopped SparkContext[0m
[34m2023-10-06 03:17:22,853 INFO Main: CompletedWithViolations: Job completed successfully with 2 violations.[0m
[34m2023-10-06 03:17:22,853 INFO Main: Write to file /opt/ml/output/message.[0m
[34m2023-10-06 03:17:22,889 INFO util.ShutdownHookManager: Shutdown hook called[0m
[34m2023-10-06 03:17:22,891 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-b3862b4a-5854-4368-a5e1-6fe824e32756[0m
[34m2023-10-06 03:17:22,907 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-3d7d0646-5b12-4015-bd5c-cc40adfcc61c[0m
[34m2023-10-06 03:17:22,979 - DefaultDataAnalyzer - INFO - Completed spark-submit with return code : 0[0m
[34m2023-10-06 03:17:22,979 - DefaultDataAnalyzer - INFO - Spark job completed.[0m


In [231]:
def get_latest_model_monitor_processing_job_name(base_job_name):
    client = boto3.client("sagemaker")
    response = client.list_processing_jobs(
        NameContains=base_job_name,
        SortBy="CreationTime",
        SortOrder="Descending",
        StatusEquals="Completed",
    )
    if len(response["ProcessingJobSummaries"]) > 0:
        return response["ProcessingJobSummaries"][0]["ProcessingJobName"]
    else:
        raise Exception("Processing job not found.")


def get_model_monitor_processing_job_s3_report(job_name):
    client = boto3.client("sagemaker")
    response = client.describe_processing_job(ProcessingJobName=job_name)
    s3_report_path = response["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
    return s3_report_path


MODEL_MONITOR_JOB_NAME = "sagemaker-model-monitor-analyzer"
latest_model_monitor_processing_job_name = get_latest_model_monitor_processing_job_name(
    MODEL_MONITOR_JOB_NAME
)
print(latest_model_monitor_processing_job_name)
report_path = get_model_monitor_processing_job_s3_report(latest_model_monitor_processing_job_name)
print(report_path)

sagemaker-model-monitor-analyzer-2023-10-06-03-10-50-923
s3://sagemaker-us-west-2-376678947624/nextera/monitoring/reports-2023-10-06-03-10-50-800/8


In [232]:
result = s3_client.list_objects(Bucket=bucket, Prefix=f"{prefix}/{reports_path}")
report_files = [report_file.get("Key") for report_file in result.get("Contents")]
print("Found Files:")
print("\n ".join(report_files))

Found Files:
nextera/monitoring/reports-2023-10-06-03-10-50-800/8/constraint_violations.json
 nextera/monitoring/reports-2023-10-06-03-10-50-800/8/constraints.json
 nextera/monitoring/reports-2023-10-06-03-10-50-800/8/statistics.json


In [233]:
capture_file = get_obj_body(report_files[0])
print(capture_file)

{
  "violations" : [ {
    "feature_name" : "Categorical",
    "constraint_check_type" : "data_type_check",
    "description" : "Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 0.0% of data is Integral."
  }, {
    "feature_name" : "Feature_1",
    "constraint_check_type" : "baseline_drift_check",
    "description" : "Baseline drift distance: 0.5498799195320935 exceeds threshold: 0.1"
  } ]
}
