In [None]:
import os
import boto3
import re
import json
import pandas as pd

import time
from time import gmtime, strftime, sleep
from threading import Thread

import sagemaker
from sagemaker import get_execution_role, session
from sagemaker.model import Model
from sagemaker.image_uris import retrieve
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer
from sagemaker.model_monitor import CronExpressionGenerator

from utils.processingjob_wrapper import ProcessingJob
import utils.monitor_render_utils as mu

import warnings
warnings.simplefilter("ignore")

---
# Declarando Parâmetros iniciais

In [None]:
sm_session = sagemaker.Session()
sm_client = boto3.client('sagemaker')
prefix = "demo-model-monitor"
code_prefix = "{}/code".format(prefix)
reports_prefix = "{}/reports".format(prefix)
data_capture_prefix = "{}/datacapture".format(prefix)

In [None]:
region = boto3.Session().region_name
print("AWS Region: {}\n".format(region))

role = get_execution_role()
print("RoleArn: {}\n".format(role))

bucket = sm_session.default_bucket()
print("Bucket: {}\n".format(bucket))

In [None]:
s3_capture_upload_path = "s3://{}/{}".format(bucket, data_capture_prefix)
print("Capture path: {}\n".format(s3_capture_upload_path))

s3_report_path = "s3://{}/{}".format(bucket, reports_prefix)
print("Report path: {}\n".format(s3_report_path))

s3_code_preprocessor_uri = "s3://{}/{}/{}".format(bucket, code_prefix, "preprocessor.py")
print("Preproc Code path: {}\n".format(s3_code_preprocessor_uri))

s3_code_postprocessor_uri = "s3://{}/{}/{}".format(bucket, code_prefix, "postprocessor.py")
print("Postproc Code path: {}\n".format(s3_code_postprocessor_uri))

---
## Criando o endpoint que vai ser monitorado

In [None]:
model_file = open("model/xgb-churn-prediction-model.tar.gz", "rb")
s3_key = os.path.join(prefix, "model/xgb-churn-prediction-model.tar.gz")
boto3.Session().resource("s3").Bucket(bucket).Object(s3_key).upload_fileobj(model_file)

#### Registrando o modelo

In [None]:
model_name = "demo-model-monitor"
model_url = "https://{}.s3-{}.amazonaws.com/{}/model/xgb-churn-prediction-model.tar.gz".format(
    bucket, region, prefix
)

image_uri = retrieve("xgboost", region, "0.90-1")

primary_container = {
    'Image': image_uri,
    'ModelDataUrl': model_url,
}

In [None]:
create_model_response = sm_client.create_model(
    ModelName = model_name,
    ExecutionRoleArn = role,
    PrimaryContainer = primary_container)

print(create_model_response['ModelArn'])

#### Criando endpoint config

In [None]:
s3_capture_upload_path

In [None]:
endpoint_config_name = 'DataCaptureEndpointConfig'

data_capture_configuration = {
    "EnableCapture": True, 
    "InitialSamplingPercentage": 100,
    "DestinationS3Uri": s3_capture_upload_path,
    "CaptureOptions": [
        {
            "CaptureMode": "Output"
        },
        {
            "CaptureMode": "Input" 
        }
    ],
    "CaptureContentTypeHeader": {
       "CsvContentTypes": ["text/csv"], 
       "JsonContentTypes": ["application/json"]
     }
}
print(endpoint_config_name)

In [None]:
create_endpoint_config_response = sm_client.create_endpoint_config(
    EndpointConfigName = endpoint_config_name,
    ProductionVariants=[{
        'InstanceType':'ml.t2.medium',
        'InitialInstanceCount':1,
        'InitialVariantWeight':1,
        'ModelName':model_name,
        'VariantName':'AllTrafficVariant'
    }],
    DataCaptureConfig = data_capture_configuration
)

print("Endpoint Config Arn: " + create_endpoint_config_response['EndpointConfigArn'])

#### Criando Endpoint

In [None]:
endpoint_name = 'DEMO-DataCaptureEndpoint'
print(endpoint_name)

In [None]:
create_endpoint_response = sm_client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name)
print(create_endpoint_response['EndpointArn'])

In [None]:
resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
status = resp['EndpointStatus']
print("Status: " + status)

In [None]:
while status=='Creating':
    time.sleep(60)
    resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
    status = resp['EndpointStatus']
    print("Status: " + status)

In [None]:
print("Arn: " + resp['EndpointArn'])
print("Status: " + status)

#### Gerando trafego no endpoint

In [None]:
!head -180 test_data/test-dataset-input-cols.csv > test_data/test_sample.csv

In [None]:
predictor = Predictor(endpoint_name=endpoint_name, serializer=CSVSerializer())

In [None]:
print("Enviando trafego para o endpoint {}".format(endpoint_name))
with open("test_data/test_sample.csv", "r") as f:
    for row in f:
        payload = row.rstrip("\n")
        response = predictor.predict(data=payload)
        time.sleep(1)

print("Concluido!")

#### Visualizando os dados capturados

In [None]:
s3_client = boto3.Session().client('s3')
data_capture_sub_folder = f'{data_capture_prefix}/{endpoint_name}'
result = s3_client.list_objects(Bucket=bucket, Prefix=data_capture_sub_folder)
capture_files = [capture_file.get("Key") for capture_file in result.get("Contents")]

In [None]:
print("Localizacao dos dados capturados:\n")
print("\n ".join(capture_files))

In [None]:
def get_obj_body(obj_key):
    return s3_client.get_object(Bucket=bucket, Key=obj_key).get("Body").read().decode("utf-8")

In [None]:
capture_file = get_obj_body(capture_files[-1])
print(json.dumps(json.loads(capture_file.split("\n")[0]), indent=2))

---
## Criando a baseline e constraints

In [None]:
baseline_prefix = prefix + "/baselining"
baseline_data_prefix = baseline_prefix + "/data"
baseline_results_prefix = baseline_prefix + "/results"

baseline_data_uri = "s3://{}/{}".format(bucket, baseline_data_prefix)
baseline_results_uri = "s3://{}/{}".format(bucket, baseline_results_prefix)
print("Baseline data uri: {}".format(baseline_data_uri))
print("Baseline results uri: {}".format(baseline_results_uri))

In [None]:
training_data_file = open("test_data/training-dataset-with-header.csv", "rb")
s3_key = os.path.join(baseline_prefix, "data", "training-dataset-with-header.csv")
boto3.Session().resource("s3").Bucket(bucket).Object(s3_key).upload_fileobj(training_data_file)

In [None]:
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

my_default_monitor.suggest_baseline(
    baseline_dataset=baseline_data_uri + "/training-dataset-with-header.csv",
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    wait=True,
)

#### Explorando as estatísticas geradas

In [None]:
s3_client = boto3.Session().client("s3")
result = s3_client.list_objects(Bucket=bucket, Prefix=baseline_results_prefix)
report_files = [report_file.get("Key") for report_file in result.get("Contents")]
print("Found Files:")
print("\n ".join(report_files))

In [None]:
baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.io.json.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df.head(10)

In [None]:
baseline_job = my_default_monitor.latest_baselining_job
constraints_df = pd.io.json.json_normalize(
    baseline_job.suggested_constraints().body_dict["features"]
)
constraints_df.head(10)

In [None]:
statistics_file = get_obj_body(baseline_results_prefix+'/statistics.json')
statistics_file_json = json.loads(statistics_file)

In [None]:
feature_baselines = mu.get_features(statistics_file_json)
mu.show_distributions(feature_baselines)

#### Criando o agendamento

In [None]:
mon_schedule_name = "demo-model-monitor-schedule"

In [None]:
my_default_monitor.create_monitoring_schedule(
    monitor_schedule_name=mon_schedule_name,
    endpoint_input=predictor.endpoint,
    output_s3_uri=s3_report_path,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression = "cron(0 * ? * * *)",
    enable_cloudwatch_metrics=True,
)

In [None]:
desc_schedule_result = sm_client.describe_monitoring_schedule( MonitoringScheduleName=mon_schedule_name)
print('Schedule status: {}'.format(desc_schedule_result['MonitoringScheduleStatus']))

#### Gerando tráfego

In [None]:
endpoint_name = predictor.endpoint
runtime_client = sm_session.sagemaker_runtime_client

# (just repeating code from above for convenience/ able to run this section independently)
def invoke_endpoint(ep_name, file_name, runtime_client):
    with open(file_name, "r") as f:
        for row in f:
            payload = row.rstrip("\n")
            response = runtime_client.invoke_endpoint(
                EndpointName=ep_name, ContentType="text/csv", Body=payload
            )
            response["Body"].read()
            time.sleep(1)


def invoke_endpoint_forever():
    while True:
        try:
            invoke_endpoint(endpoint_name, "test_data/test-dataset-input-cols.csv", runtime_client)
        except runtime_client.exceptions.ValidationError:
            pass


thread = Thread(target=invoke_endpoint_forever)
thread.start()

#### Inspecionando o endpoint

In [None]:
mon_executions = sm_client.list_monitoring_executions(MonitoringScheduleName=mon_schedule_name.lower(), MaxResults=5)
mon_executions

In [None]:
mon_executions = sm_client.list_monitoring_executions(MonitoringScheduleName=mon_schedule_name.lower(), MaxResults=5)
latest_execution=None

# Wait till an execution occurs
while not mon_executions['MonitoringExecutionSummaries']:
    print("Waiting for the 1st execution to happen...")
    time.sleep(60)
    mon_executions = sm_client.list_monitoring_executions(MonitoringScheduleName=mon_schedule_name.lower(), MaxResults=5)
    

In [None]:
mon_executions = sm_client.list_monitoring_executions(MonitoringScheduleName=mon_schedule_name.lower(), MaxResults=5)

if len(mon_executions['MonitoringExecutionSummaries']) == 1: 
    execution = mon_executions['MonitoringExecutionSummaries'][0]
    while True:
        if execution['ProcessingJobArn']:
            job_name = execution['ProcessingJobArn'].split('/')[1]    
            resp = sm_client.describe_processing_job(ProcessingJobName=job_name)
            status = resp['ProcessingJobStatus']
            print("Processing Job Status: " + status)
            if status != 'InProgress':
                break
        time.sleep(60)

In [None]:
# Now get the latest execution details 
mon_executions = sm_client.list_monitoring_executions(MonitoringScheduleName=mon_schedule_name.lower(), MaxResults=5)

for execution_summary in mon_executions['MonitoringExecutionSummaries']:
    print("ProcessingJob: {}".format(execution_summary['ProcessingJobArn'].split('/')[1]))
    print('MonitoringExecutionStatus: {} \n'.format(execution_summary['MonitoringExecutionStatus']))
    print("execution_summary is " , execution_summary)
    print("latest_executions is " , latest_execution)    
    if not latest_execution:
        exec_status = execution_summary['MonitoringExecutionStatus']
        print("exec_status is " , exec_status)    
        if  exec_status == 'Completed' or exec_status == 'Failed' or exec_status == 'CompletedWithViolations':
            latest_execution = execution_summary
            
print("latest_executions is " , latest_execution)    

In [None]:
sm_client.list_monitoring_executions(MonitoringScheduleName=mon_schedule_name.lower(), MaxResults=5)

In [None]:
if latest_execution:
    job_name=latest_execution['ProcessingJobArn'].split('/')[1]
    print('Processing job ARN ', job_name)
    job_status=latest_execution['MonitoringExecutionStatus']
    desc_analytics_job_result=sm_client.describe_processing_job(ProcessingJobName=job_name)
    
    if job_status == 'Completed' or job_status == 'CompletedWithViolations':
        report_uri=desc_analytics_job_result['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']
        print('Report Uri: {}'.format(report_uri))
    else:
        print('Job failed.')
else:
    print("====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures.")

In [None]:
from urllib.parse import urlparse
s3uri = urlparse(report_uri)
report_bucket = s3uri.netloc
report_key = s3uri.path.lstrip('/')
print('Report bucket: {}'.format(report_bucket))
print('Report key: {}'.format(report_key))

result = s3_client.list_objects(Bucket=report_bucket, Prefix=report_key)
report_files = [report_file.get("Key") for report_file in result.get('Contents')]
print("Found Report Files:")
print("\n ".join(report_files))

In [None]:
violations_file = get_obj_body(report_key+'/constraint_violations.json')
print(violations_file)

# Criando Alarme

In [None]:
cw_client = boto3.Session().client('cloudwatch')
sns_notifications_topic='DRIFT_TOPIC_ARN'
alarm_name='BASELINE_DRIFT_FEATURE_Night_Calls'
alarm_desc='Alarme para drift no atributo Night Calls'
feature_drift_threshold=0.1 
metric_name='feature_baseline_drift_Night Calls'
namespace='aws/sagemaker/Endpoints/data-metrics'

endpoint_name=endpoint_name
monitoring_schedule_name=mon_schedule_name

cw_client.put_metric_alarm(
    AlarmName=alarm_name,
    AlarmDescription=alarm_desc,
    ActionsEnabled=True,
    AlarmActions=[sns_notifications_topic],
    MetricName=metric_name,
    Namespace=namespace,
    Statistic='Average',
    Dimensions=[
        {
            'Name': 'Endpoint',
            'Value': endpoint_name
        },
        {
            'Name': 'MonitoringSchedule',
            'Value': monitoring_schedule_name
        }
    ],
    Period=600,
    EvaluationPeriods=1,
    DatapointsToAlarm=1,
    Threshold=feature_drift_threshold,
    ComparisonOperator='GreaterThanOrEqualToThreshold',
    TreatMissingData='breaching'
)