# AAI540 - Module 5 Assignment

Victor Hugo Germano

## ML System Observability 

In [None]:
import boto3
import sagemaker
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import io
from sagemaker.session import Session, get_execution_role

# Defining model and quality monitoring

From m5

In [None]:

role = sagemaker.get_execution_role()
sess = sagemaker.Session()
region = sess.boto_region_name

bucket = sess.default_bucket()
prefix = "model-bias-monitoring"

print(f"Bucket: {bucket}")
print(f"Role: {role}")
print(f"Region: {region}")

In [None]:
# Setup S3 bucket
# You can use a different bucket, but make sure the role you chose for this notebook
# has the s3:PutObject permissions. This is the bucket into which the data is captured
bucket = session.default_bucket()
print("Demo Bucket:", bucket)
prefix = "sagemaker/Churn-ModelQualityMonitor-20201201"

##S3 prefixes
data_capture_prefix = f"{prefix}/datacapture"
s3_capture_upload_path = f"s3://{bucket}/{data_capture_prefix}"

ground_truth_upload_path = (
    f"s3://{bucket}/{prefix}/ground_truth_data/{datetime.now():%Y-%m-%d-%H-%M-%S}"
)

reports_prefix = f"{prefix}/reports"
s3_report_path = f"s3://{bucket}/{reports_prefix}"

##Get the model monitor image
monitor_image_uri = image_uris.retrieve(framework="model-monitor", region=region)

print("Image URI:", monitor_image_uri)
print(f"Capture path: {s3_capture_upload_path}")
print(f"Ground truth path: {ground_truth_upload_path}")
print(f"Report path: {s3_report_path}")

In [None]:
# Upload some test files
S3Uploader.upload("test_data/upload-test-file.txt", f"s3://{bucket}/test_upload")
print("Success! You are all set to proceed.")

In [None]:
##Upload the pretrained model to S3
s3_key = f"s3://{bucket}/{prefix}"
model_url = S3Uploader.upload("model/xgb-churn-prediction-model.tar.gz", s3_key)
model_url

In [None]:
model_name = f"DEMO-xgb-churn-pred-model-monitor-{datetime.utcnow():%Y-%m-%d-%H%M}"

image_uri = image_uris.retrieve(framework="xgboost", version="0.90-1", region=region)

model = Model(image_uri=image_uri, model_data=model_url, role=role, sagemaker_session=session)

In [None]:
endpoint_name = f"DEMO-xgb-churn-model-quality-monitor-{datetime.utcnow():%Y-%m-%d-%H%M}"
print("EndpointName =", endpoint_name)

data_capture_config = DataCaptureConfig(
    enable_capture=True, sampling_percentage=100, destination_s3_uri=s3_capture_upload_path
)

model.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.large",
    endpoint_name=endpoint_name,
    data_capture_config=data_capture_config,
)


model monitor data

In [None]:
churn_cutoff = 0.8
validate_dataset = "validation_with_predictions.csv"

In [None]:
limit = 200  # Need at least 200 samples to compute standard deviations
i = 0
with open(f"test_data/{validate_dataset}", "w") as baseline_file:
    baseline_file.write("probability,prediction,label\n")  # our header
    with open("test_data/validation.csv", "r") as f:
        for row in f:
            (label, input_cols) = row.split(",", 1)
            probability = float(predictor.predict(input_cols))
            prediction = "1" if probability > churn_cutoff else "0"
            baseline_file.write(f"{probability},{prediction},{label}\n")
            i += 1
            if i > limit:
                break
            print(".", end="", flush=True)
            sleep(0.5)
print()
print("Done!")

In [None]:
!head test_data/validation_with_predictions.csv

In [None]:
baseline_prefix = prefix + "/baselining"
baseline_data_prefix = baseline_prefix + "/data"
baseline_results_prefix = baseline_prefix + "/results"

baseline_data_uri = f"s3://{bucket}/{baseline_data_prefix}"
baseline_results_uri = f"s3://{bucket}/{baseline_results_prefix}"
print(f"Baseline data uri: {baseline_data_uri}")
print(f"Baseline results uri: {baseline_results_uri}")

In [None]:
baseline_dataset_uri = S3Uploader.upload(f"test_data/{validate_dataset}", baseline_data_uri)
baseline_dataset_uri

quality

In [None]:
from sagemaker.model_monitor import ModelQualityMonitor
from sagemaker.model_monitor import EndpointInput
from sagemaker.model_monitor.dataset_format import DatasetFormat

In [None]:
# Create the model quality monitoring object
churn_model_quality_monitor = ModelQualityMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.large",
    volume_size_in_gb=20,
    max_runtime_in_seconds=1800,
    sagemaker_session=session,
)

In [None]:
# Name of the model quality baseline job
baseline_job_name = f"DEMO-xgb-churn-model-baseline-job-{datetime.utcnow():%Y-%m-%d-%H%M}"

In [None]:
# Execute the baseline suggestion job.
# You will specify problem type, in this case Binary Classification, and provide other required attributes.
job = churn_model_quality_monitor.suggest_baseline(
    job_name=baseline_job_name,
    baseline_dataset=baseline_dataset_uri,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    problem_type="BinaryClassification",
    inference_attribute="prediction",
    probability_attribute="probability",
    ground_truth_attribute="label",
)
job.wait(logs=False)

In [None]:
baseline_job = churn_model_quality_monitor.latest_baselining_job

In [None]:
binary_metrics = baseline_job.baseline_statistics().body_dict["binary_classification_metrics"]
pd.json_normalize(binary_metrics).T

In [None]:
pd.DataFrame(baseline_job.suggested_constraints().body_dict["binary_classification_constraints"]).T

## Model Bias Monitor
In this section, we will set up a Model Bias Monitor to check for bias in our model's predictions. We need to prepare a dataset that includes both the model predictions and the features (converted to a format with headers) so that the monitor can identify the sensitive attributes (facets).

In [None]:
from sagemaker.model_monitor import ModelBiasMonitor, BiasAnalysisConfig

# Load validation data (Ground Truth + Features)
val_df = pd.read_csv("test_data/validation.csv", header=None)

feature_names = [f"Feature_{i}" for i in range(1, val_df.shape[1])]
val_df.columns = ["label"] + feature_names

pred_df = pd.read_csv("test_data/validation_with_predictions.csv")

# Create a combined dataset
bias_df = pd.concat([pred_df[["probability", "prediction"]], val_df], axis=1)

bias_dataset_file = "test_data/validation_for_bias.csv"
bias_df.to_csv(bias_dataset_file, index=False)

print(f"Created bias dataset with shape: {bias_df.shape}")
print(f"Columns: {list(bias_df.columns)[:5]}...")

In [None]:
# Upload the bias dataset to S3
bias_data_prefix = prefix + "/bias-baselining/data"
bias_data_uri = f"s3://{bucket}/{bias_data_prefix}"

bias_dataset_s3_uri = S3Uploader.upload(bias_dataset_file, bias_data_uri)
print(f"Uploaded bias dataset to: {bias_dataset_s3_uri}")


bias_results_prefix = prefix + "/bias-baselining/results"
bias_results_uri = f"s3://{bucket}/{bias_results_prefix}"

In [None]:
# Initialize the Model Bias Monitor
churn_model_bias_monitor = ModelBiasMonitor(
    role=role,
    sagemaker_session=session,
    max_runtime_in_seconds=1800,
    instance_count=1,
    instance_type="ml.m5.large",
    base_job_name="churn-bias-monitor"
)

facet_name = "Feature_1"  # In a real scenario, you would map this to 'Age', 'Gender', or 'Area Code'

bias_analysis_config = BiasAnalysisConfig(
    bias_config={
        "label_values_or_threshold": [1], 
        "facet_name": facet_name,
        "facet_values_or_threshold": [1], 
        "group_name": None
    },
    headers=list(bias_df.columns),
    label="label",
    probability="probability",
    probability_threshold_attribute=0.5 
)

In [None]:
# Start the Bias Baseline Job
bias_job = churn_model_bias_monitor.suggest_baseline(
    bias_config=bias_analysis_config,
    baseline_dataset=bias_dataset_s3_uri,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=bias_results_uri,
    job_name=f"bias-baseline-{datetime.utcnow():%Y-%m-%d-%H%M}"
)

print("Started Bias Baseline Job. Waiting for completion...")
bias_job.wait(logs=False)

In [None]:
# View the results
latest_bias_job = churn_model_bias_monitor.latest_baselining_job
bias_metrics = latest_bias_job.baseline_statistics().body_dict
print("Bias Metrics:")
pd.json_normalize(bias_metrics)