# Lab 10 : SageMaker Model Monitoring

Ce notebook montre comment configurer SageMaker Model Monitor sur un endpoint déjà déployé (issu du lab 5).

**Étapes principales :**
1. Récupérer le nom de l'endpoint du lab 5
2. Configurer un job de monitoring (baseline, schedule)
3. Déployer un moniteur de données
4. Visualiser les résultats


In [None]:
# 1. Prérequis et configuration
import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.model_monitor import DefaultModelMonitor, CronExpressionGenerator
import time
import os

# Configuration universelle
try:
    from utils.sagemaker_config import get_sagemaker_config
    config = get_sagemaker_config(s3_prefix='lab10-ml-monitoring')
    role = config['role']
    session = config['session']
    bucket = config['bucket']
    region = config['region']
except ImportError:
    role = get_execution_role()
    session = sagemaker.Session()
    bucket = session.default_bucket()
    region = session.boto_region_name

print(f"S3 Bucket: {bucket}")
print(f"Region: {region}")

In [None]:
# 2. Récupérer le nom de l'endpoint du lab 5
# Remplacez par le nom réel de l'endpoint déployé lors du lab 5
endpoint_name = 'realtime-endpoint-REPLACE_WITH_YOURS'  # À adapter !

In [None]:
# 3. Configurer le Model Monitor
monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.large',
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
    sagemaker_session=session
)

baseline_data_uri = f's3://{bucket}/lab5-monitoring/baseline.csv'  # À adapter si besoin
output_s3_uri = f's3://{bucket}/lab10-monitoring-results/'

# Générer un baseline (exemple, à adapter selon vos données)
import pandas as pd
import numpy as np

baseline_data = pd.DataFrame(np.random.randn(100, 20))
baseline_data.to_csv('baseline.csv', header=False, index=False)
session.upload_data('baseline.csv', bucket=bucket, key_prefix='lab5-monitoring')

# Créer le baseline job
monitor.suggest_baseline(
    baseline_dataset=baseline_data_uri,
    dataset_format={'csv': {'header': False}},
    output_s3_uri=output_s3_uri,
    wait=True
)

In [None]:
# 4. Planifier le monitoring sur l'endpoint
schedule_expression = CronExpressionGenerator.hourly()

monitor.create_monitoring_schedule(
    endpoint_input=endpoint_name,
    output_s3_uri=output_s3_uri,
    schedule_cron_expression=schedule_expression
)
print(f"Monitoring schedule created for endpoint: {endpoint_name}")

In [None]:
# 5. Visualiser les résultats
# Lister les jobs de monitoring
schedules = session.sagemaker_client.list_monitoring_schedules()
for sched in schedules['MonitoringScheduleSummaries']:
    print(sched['MonitoringScheduleName'], sched['MonitoringScheduleStatus'])

# Télécharger les résultats depuis S3 (à faire après exécution d'un job de monitoring)
# Utilisez AWS Console ou boto3 pour explorer les résultats dans output_s3_uri

## Monitoring avancé : détection de dérive, analyse des features et alertes

Nous allons enrichir le monitoring pour :
- Générer un baseline statistique détaillé (features, distributions, quantiles)
- Activer la capture d’inférences (inputs/outputs) sur l’endpoint
- Planifier un monitoring de dérive de données et de qualité des prédictions
- Visualiser les rapports de dérive et de violations
- (Optionnel) Déclencher des alertes CloudWatch en cas de dérive


In [None]:
# 6. Activer la capture d’inférences sur l’endpoint
from sagemaker.model_monitor import DataCaptureConfig

# Capture 100% des requêtes (inputs/outputs)
data_capture_config = DataCaptureConfig(
    enable_capture=True,
    sampling_percentage=100,
    destination_s3_uri=f's3://{bucket}/lab10-monitoring-capture/'
)

# (À exécuter lors du déploiement de l’endpoint, déjà fait dans lab 5)
# Pour activer sur un endpoint existant :
sm_client = boto3.client('sagemaker', region_name=region)
sm_client.update_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=sm_client.describe_endpoint(EndpointName=endpoint_name)['EndpointConfigName'],
    DataCaptureConfig=data_capture_config._to_request_dict()
)
print(f"Data capture activée sur l'endpoint {endpoint_name}")

In [None]:
# 7. Générer un baseline statistique détaillé
from sagemaker.model_monitor import DatasetFormat

# Utiliser le même baseline_data que précédemment ou un vrai jeu de données de référence
monitor.suggest_baseline(
    baseline_dataset=baseline_data_uri,
    dataset_format=DatasetFormat.csv(header=False),
    output_s3_uri=output_s3_uri,
    wait=True
)

# Le rapport baseline inclura :
# - Statistiques descriptives (moyenne, quantiles, min/max, etc.)
# - Distributions des features
# - Schéma de données (types, nulls, etc.)
print(f"Rapport baseline généré dans : {output_s3_uri}")

In [None]:
# 8. Planifier un monitoring de dérive avancé
from sagemaker.model_monitor import MonitoringOutput

monitor.create_monitoring_schedule(
    endpoint_input=endpoint_name,
    output_s3_uri=output_s3_uri,
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    statistics=output_s3_uri + 'statistics.json',
    constraints=output_s3_uri + 'constraints.json',
    monitoring_outputs=[MonitoringOutput(s3_output=output_s3_uri)]
)
print(f"Monitoring avancé planifié pour l'endpoint : {endpoint_name}")

In [None]:
# 9. Visualiser les rapports de dérive et violations
import json
import boto3

# Télécharger le rapport de dérive depuis S3
def download_s3_file(s3_uri, local_path):
    s3 = boto3.client('s3')
    bucket = s3_uri.split('/')[2]
    key = '/'.join(s3_uri.split('/')[3:])
    s3.download_file(bucket, key, local_path)

# Exemple : télécharger le rapport de statistiques
statistics_report = output_s3_uri + 'statistics.json'
constraints_report = output_s3_uri + 'constraints.json'

try:
    download_s3_file(statistics_report, 'statistics.json')
    with open('statistics.json') as f:
        stats = json.load(f)
    print('Statistiques du baseline:')
    print(json.dumps(stats, indent=2))
except Exception as e:
    print(f"Impossible de lire le rapport de statistiques : {e}")

try:
    download_s3_file(constraints_report, 'constraints.json')
    with open('constraints.json') as f:
        constraints = json.load(f)
    print('Contraintes du baseline:')
    print(json.dumps(constraints, indent=2))
except Exception as e:
    print(f"Impossible de lire le rapport de contraintes : {e}")

In [None]:
# 10. (Optionnel) Déclencher des alertes CloudWatch en cas de dérive
# Exemple : créer une alarme sur le nombre de violations détectées
import boto3

cloudwatch = boto3.client('cloudwatch', region_name=region)

alarm_name = 'SageMaker-ModelMonitor-Drift-Alarm'
metric_name = 'Violations'
namespace = '/aws/sagemaker/Endpoints'

cloudwatch.put_metric_alarm(
    AlarmName=alarm_name,
    MetricName=metric_name,
    Namespace=namespace,
    Statistic='Sum',
    Period=3600,
    EvaluationPeriods=1,
    Threshold=1,
    ComparisonOperator='GreaterThanOrEqualToThreshold',
    AlarmActions=[],  # Ajouter un ARN SNS pour notification
    AlarmDescription='Détecte une dérive de données sur l’endpoint',
    Dimensions=[{'Name': 'EndpointName', 'Value': endpoint_name}]
)
print(f"Alarme CloudWatch créée pour la dérive sur l'endpoint {endpoint_name}")