In [5]:
import sagemaker
import numpy as np
import boto3
import os
import pandas as pd
from sklearn import datasets
from sagemaker import get_execution_role
from sagemaker.serializers import CSVSerializer
from sagemaker.model_monitor import DefaultModelMonitor
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Get execution role
try:
    role = get_execution_role()
    logger.info("Successfully retrieved execution role.")
except Exception as e:
    logger.error("Failed to retrieve execution role: %s", str(e))
    raise

# Load Iris dataset and prepare CSV
logger.info("Loading Iris dataset and preparing data...")
iris = datasets.load_iris()
X = iris.data
y = iris.target
dataset = np.insert(X, 0, y, axis=1)
csv_file = 'full_dataset.csv'
pd.DataFrame(data=dataset, columns=['iris_id'] + iris.feature_names).to_csv(csv_file, index=None)
logger.info(f"Dataset saved to {csv_file}.")

# Create SageMaker session and define bucket
try:
    sagemaker_session = sagemaker.Session()
    bucket = sagemaker_session.default_bucket()
    logger.info(f"Using default bucket: {bucket}")
except Exception as e:
    logger.error("Failed to create SageMaker session or retrieve default bucket: %s", str(e))
    raise

# Define prefixes and endpoint names
prefix = 'mlops/iris'
endpoint_name_file = 'endpoint_name.txt'
endpoint_name2_file = 'endpoint_name2.txt'

# Check for the first endpoint name file, then fall back to the second one
endpoint_name = None
if os.path.isfile(endpoint_name_file):
    endpoint_name = open(endpoint_name_file, 'r').read().strip()
if not endpoint_name and os.path.isfile(endpoint_name2_file):
    endpoint_name = open(endpoint_name2_file, 'r').read().strip()

# If neither file contains a valid endpoint name, raise an error
if not endpoint_name:
    logger.error("Both endpoint_name.txt and endpoint_name2.txt are missing or empty.")
    raise ValueError("Endpoint name is required to proceed.")

logger.info(f"Using endpoint: {endpoint_name}")

# Initialize the predictor
try:
    xgb_predictor = sagemaker.predictor.Predictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker_session)
    xgb_predictor.serializer = CSVSerializer()
    logger.info(f"Predictor initialized with endpoint: {endpoint_name}")
except Exception as e:
    logger.error("Failed to initialize the predictor: %s", str(e))
    raise Exception("You must run Part 1 before this. Train and deploy a model first.")

# Placeholder for Model Monitor setup (if needed)
try:
    logger.info("Setting up Model Monitor...")
    monitor = DefaultModelMonitor(
        role=role,
        instance_count=1,
        instance_type="ml.m5.large",
        volume_size_in_gb=20,
        max_runtime_in_seconds=3600,
        sagemaker_session=sagemaker_session
    )
    logger.info("Model Monitor setup complete.")
except Exception as e:
    logger.warning("Failed to set up Model Monitor: %s", str(e))

logger.info("Script completed successfully.")


INFO:__main__:Successfully retrieved execution role.
INFO:__main__:Loading Iris dataset and preparing data...
INFO:__main__:Dataset saved to full_dataset.csv.
INFO:__main__:Using default bucket: sagemaker-us-east-1-853973692277
INFO:__main__:Using endpoint: sagemaker-xgboost-250111-1253-017-02260edf
INFO:__main__:Predictor initialized with endpoint: sagemaker-xgboost-250111-1253-017-02260edf
INFO:__main__:Setting up Model Monitor...
INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: .
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:__main__:Model Monitor setup complete.
INFO:__main__:Script completed successfully.


In [6]:
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

endpoint_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)
endpoint_monitor.suggest_baseline(
    baseline_dataset='full_dataset.csv',
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri='s3://{}/{}/monitoring/baseline'.format(bucket, prefix),
    wait=True,
    logs=False
)

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: .
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker:Creating processing-job with name baseline-suggestion-job-2025-01-12-04-32-35-928


...........................................................!

<sagemaker.processing.ProcessingJob at 0x7f606be25e90>

In [7]:
baseline_job = endpoint_monitor.latest_baselining_job
schema_df = pd.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
constraints_df = pd.json_normalize(baseline_job.suggested_constraints().body_dict["features"])
report_df = schema_df.merge(constraints_df)
report_df.drop([
    'numerical_statistics.distribution.kll.buckets',
    'numerical_statistics.distribution.kll.sketch.data',
    'numerical_statistics.distribution.kll.sketch.parameters.c'
], axis=1).head(10)

Unnamed: 0,name,inferred_type,numerical_statistics.common.num_present,numerical_statistics.common.num_missing,numerical_statistics.mean,numerical_statistics.sum,numerical_statistics.std_dev,numerical_statistics.min,numerical_statistics.max,numerical_statistics.approximate_num_distinct_values,numerical_statistics.completeness,numerical_statistics.distribution.kll.sketch.parameters.k,completeness,num_constraints.is_non_negative
0,iris_id,Fractional,150,0,1.0,150.0,0.816497,0.0,2.0,3,1.0,2048.0,1.0,True
1,sepal length (cm),Fractional,150,0,5.843333,876.5,0.825301,4.3,7.9,33,1.0,2048.0,1.0,True
2,sepal width (cm),Fractional,150,0,3.057333,458.6,0.434411,2.0,4.4,24,1.0,2048.0,1.0,True
3,petal length (cm),Fractional,150,0,3.758,563.7,1.759404,1.0,6.9,44,1.0,2048.0,1.0,True
4,petal width (cm),Fractional,150,0,1.199333,179.9,0.759693,0.1,2.5,22,1.0,2048.0,1.0,True


In [14]:
import boto3
import os
from time import sleep
from sagemaker.model_monitor import DefaultModelMonitor, CronExpressionGenerator
from sagemaker import get_execution_role
from sagemaker import Session

# Initialize necessary clients and variables
client = boto3.client('sagemaker')
s3_client = boto3.client('s3')  # Use boto3 client to interact with S3
role = get_execution_role()
sagemaker_session = Session()
bucket = s3_client.list_buckets()['Buckets'][0]['Name']  # Get the first S3 bucket name
prefix = 'mlops/iris'

# Read endpoint name from the correct file
endpoint_name_file = 'endpoint_name2.txt'  # Update to the correct file name
endpoint_name = None
if os.path.isfile(endpoint_name_file):
    with open(endpoint_name_file, 'r') as file:
        endpoint_name = file.read().strip()
else:
    raise ValueError(f"Endpoint name file '{endpoint_name_file}' does not exist.")

# Ensure the endpoint is in service
def wait_for_endpoint(endpoint_name):
    print(f"Checking status for endpoint: {endpoint_name}")
    while True:
        try:
            response = client.describe_endpoint(EndpointName=endpoint_name)
            status = response['EndpointStatus']
            if status == 'InService':
                print(f"Endpoint {endpoint_name} is InService")
                break
            elif status == 'Failed':
                raise Exception(f"Endpoint {endpoint_name} failed. Status: {status}")
            else:
                print(f"Endpoint {endpoint_name} is in {status} state. Waiting...")
        except client.exceptions.ResourceNotFound:
            print(f"Endpoint {endpoint_name} not found.")
            raise Exception(f"Endpoint {endpoint_name} does not exist or is invalid.")
        
        sleep(30)  # Wait for 30 seconds before checking again

# Check if the endpoint exists and is in service
wait_for_endpoint(endpoint_name)

# Initialize model monitor
endpoint_monitor = DefaultModelMonitor(
    role=role,
    sagemaker_session=sagemaker_session
)

# Create monitoring schedule
try:
    endpoint_monitor.create_monitoring_schedule(
        endpoint_input=endpoint_name,
        output_s3_uri=f's3://{bucket}/{prefix}/monitoring/reports',
        statistics=endpoint_monitor.baseline_statistics(),
        constraints=endpoint_monitor.suggested_constraints(),
        schedule_cron_expression=CronExpressionGenerator.hourly(),
        enable_cloudwatch_metrics=True
    )
    print(f"Monitoring schedule created successfully for endpoint {endpoint_name}")
except Exception as e:
    print(f"Error while creating monitoring schedule: {str(e)}")


INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: .


Checking status for endpoint: sagemaker-xgboost-250111-1253-017-02260edf
Endpoint sagemaker-xgboost-250111-1253-017-02260edf is InService


INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


Error while creating monitoring schedule: 'NoneType' object has no attribute 'baseline_statistics'


In [17]:
from sagemaker.model_monitor import CronExpressionGenerator
from time import gmtime, strftime

endpoint_monitor.create_monitoring_schedule(
    endpoint_input=endpoint_name,
    output_s3_uri='s3://{}/{}/monitoring/reports'.format(bucket, prefix),
    statistics=endpoint_monitor.baseline_statistics(),
    constraints=endpoint_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

AttributeError: 'NoneType' object has no attribute 'baseline_statistics'