# MLOps Deployment Pipeline


## Overview

En este notebook, iremos paso a paso por un pipeline de MLOps para construir, entrenar, implementar y monitorear un modelo de regresión XGBoost que predice la tarifa de taxi esperada usando el [dataset](https://registry.opendata.aws/nyc-tlc-trip-records-pds/) "New York City Taxi". Este pipeline presenta una estrategia de [implementación canaria](https://docs.aws.amazon.com/wellarchitected/latest/machine-learning-lens/canary-deployment.html) con reversión en caso de error. La idea es poder entender cómo activar y monitorear el pipeline, inspeccionar el flujo de trabajo de entrenamiento, usar model monitor para configurar alertas y crear una implementación canary.

### Contenido

Este notebook contiene las siguientes secciones:

1. [Data Prep](#Data-Prep)
2. [Build](#Build)
3. [Train Model](#Train-Model)
4. [Deploy Dev](#Deploy-Dev)
5. [Deploy Prod](#Deploy-Prod)
6. [Monitor](#Monitor)
6. [Cleanup](#Cleanup)

### Arquitectura

El diagrama de arquitectura a continuación muestra todo el pipeline de MLOps a un alto nivel.

Usaremos la plantilla de CloudFormation proporcionada en este repositorio (`pipeline.yml`) para crear una demo en su propia cuenta de AWS. CloudFormation implementa varios recursos:
   
1. A customer-managed encryption key in in Amazon KMS for encrypting data and artifacts.
1. A secret in Amazon Secrets Manager to securely store your GitHub Access Token.
1. Several AWS IAM roles so CloudFormation, SageMaker, and other AWS services can perform actions in your AWS account, following the principle of [least privilege](https://docs.aws.amazon.com/IAM/latest/UserGuide/best-practices.html#grant-least-privilege)⇗.
1. A messaging service in Amazon SNS to notify you when CodeDeploy has successfully deployed the API, and to receive alerts for retraining and drift detection (signing up for these notifications is optional).
1. Two Amazon CloudWatch event rules: one which schedules the pipeline to run every month, and one which triggers the pipeline to run when SageMaker Model Monitor detects certain metrics.
1. An Amazon SageMaker Jupyter notebook with this workshop content pre-loaded.
1. An Amazon S3 bucket for storing model artifacts.
1. An AWS CodePipeline instance with several pre-defined stages. 

Take a moment to look at all of these resources now deployed in your account. 

![MLOps pipeline architecture](../docs/mlops-architecture.png)

En este notebook, trabajaremos a través de una instancia de CodePipeline creada por la plantilla de CloudFormation. Tiene varias etapas:

1. **Source** - The pipeline is already configured with two sources. If you upload a new dataset to a specific location in the S3 data bucket, this will trigger the pipeline to run. The Git source can be GitHub, or CodeCommit if you don’t supply your access token. If you commit new code to your repository, this will trigger the pipeline to run. El pipeline ya está configurado con dos fuentes. Si carga un nuevo conjunto de datos en una ubicación específica en el bucket de S3, esto activará la ejecución del pipeline. La fuente de Git puede ser GitHub o CodeCommit si no proporciona su token de acceso. Si envías código nuevo a tu repositorio, esto activará la ejecución del pipeline.
1. **Build** - En esta etapa, CodeBuild configurado por el archivo `model / buildspec.yml` ejecutará` model / run.py` para generar plantillas de AWS CloudFormation para crear un Step Function (incluidos los recursos personalizados de AWS Lambda) y las plantillas de implementación utilizadas en las siguientes etapas según los conjuntos de datos y los hiperparámetros especificados para esta ejecución del pipeline.
1. **Train** The Step Functions workflow created in the Build stage is run in this stage. The workflow creates a baseline for the model monitor using a SageMaker processing job, and trains an XGBoost model on the taxi ride dataset using a SageMaker training job.
1. **Deploy Dev** In this stage, a CloudFormation template created in the build stage (from `assets/deploy-model-dev.yml`) deploys a dev endpoint. This will allow you to run tests on the model and decide if the model is of sufficient quality to deploy into production.
1. **Deploy Production** The final stage of the pipeline is the only stage which does not run automatically as soon as the previous stage is complete. It waits for a user to manually approve the model which was previously deployed to dev. As soon as the model is approved, a CloudFormation template (packaged from `assets/deploy-model-prod.yml` to include the Lambda functions saved and uploaded as ZIP files in S3) deploys the production endpoint. It configures autoscaling and enables data capture. It creates a model monitoring schedule and sets CloudWatch alarms for certain metrics. It also sets up an AWS CodeDeploy instance which deploys a set of AWS Lambda functions and an Amazon API Gateway to sit in front of the SageMaker endpoint. This stage can make use of canary deployment to safely switch from an old model to a new model. 

In [None]:
# Importamos las librerías necesarias
import sys
!{sys.executable} -m pip install --upgrade pip
!{sys.executable} -m pip install -qU awscli boto3 "sagemaker>=2.1.0<3" tqdm
!{sys.executable} -m pip install -qU "stepfunctions==2.0.0"
!{sys.executable} -m pip show sagemaker stepfunctions

Podría ser necesario reiniciar el kernel de Sagemaker para continuar.

## Data Prep

En esta sección del cuaderno, descargaremos el dataset disponible públicamente como preparación para cargarlo en S3.

### Descargar Dataset

Primero, descargamos una muestra del [dataset](https://registry.opendata.aws/nyc-tlc-trip-records-pds/)⇗. This dataset contains information on trips taken by taxis and for-hire vehicles in New York City, including pick-up and drop-off times and locations, fares, distance traveled, and more. 

In [None]:
!aws s3 cp 's3://nyc-tlc/trip data/green_tripdata_2018-02.csv' 'nyc-tlc.csv'

Cargamos el dataset en un dataframe de pandas, teniendo cuidado de parsear correctamente las fechas.

In [None]:
import pandas as pd

parse_dates= ['lpep_dropoff_datetime', 'lpep_pickup_datetime']
trip_df = pd.read_csv('nyc-tlc.csv', parse_dates=parse_dates)

trip_df.head()

### Data manipulation

En lugar de usar las fechas y horas de recojo y llegada, usaremos estos features para calcular el tiempo total del viaje en minutos, los cuáles serám fáciles de trabajar con nuestor modelo.

In [None]:
trip_df['duration_minutes'] = (trip_df['lpep_dropoff_datetime'] - trip_df['lpep_pickup_datetime']).dt.seconds/60

El dataset contiene un monton de columnas que no necesitamos, vamos a seleccionar una muestra de las columnas para nuestro modelo de ML. Mantenemos sólo `total_amount` (fare), `duration_minutes`, `passenger_count`, y `trip_distance`.

In [None]:
cols = ['total_amount', 'duration_minutes', 'passenger_count', 'trip_distance']
data_df = trip_df[cols]
print(data_df.shape)
data_df.head()

Generamos algunas estadísticas del dataset para entender la calidad.

In [None]:
data_df.describe()

La tabla de arriba muestra algunos outliers, por ejemplo -400 o 2626 como tarifa, o 0 pasajeros. Hay muchos métodos inteligemtes para identificar y remover outliers pero la limpieza de datos no es el foco de este notebook, asi que solo removemos los outliers configurando valores mínimos y máximos los cuáles parecen más razonables. Eliminando los outliers obtenermos 754,671 filas en el dataset final.

In [None]:
data_df = data_df[(data_df.total_amount > 0) & (data_df.total_amount < 200) & 
                  (data_df.duration_minutes > 0) & (data_df.duration_minutes < 120) & 
                  (data_df.trip_distance > 0) & (data_df.trip_distance < 121) & 
                  (data_df.passenger_count > 0)].dropna()
print(data_df.shape)

### Data visualization

Since this notebook will build a regression model for the taxi data, it's a good idea to check if there is any correlation between the variables in our data. Use scatter plots on a sample of the data to compare trip distance with duration in minutes, and total amount (fare) with duration in minutes.

In [None]:
import seaborn as sns 

sample_df = data_df.sample(1000)
sns.scatterplot(data=sample_df, x='duration_minutes', y='trip_distance')

In [None]:
sns.scatterplot(data=sample_df, x='duration_minutes', y='total_amount')

These scatter plots look fine and show at least some correlation between our variables. 

### Data splitting and saving

We are now ready to split the dataset into train, validation, and test sets. 

In [None]:
from sklearn.model_selection import train_test_split
train_df, val_df = train_test_split(data_df, test_size=0.20, random_state=42)
val_df, test_df = train_test_split(val_df, test_size=0.05, random_state=42)

# Reset the index for our test dataframe
test_df.reset_index(inplace=True, drop=True)

print('Size of\n train: {},\n val: {},\n test: {} '.format(train_df.shape[0], val_df.shape[0], test_df.shape[0]))

Save the train, validation, and test files as CSV locally on this notebook instance. Notice that you save the train file twice - once as the training data file and once as the baseline data file. The baseline data file will be used by [SageMaker Model Monitor](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor.html)⇗ to detect data drift. Data drift occurs when the statistical nature of the data that your model receives while in production drifts away from the nature of the baseline data it was trained on, which means the model begins to lose accuracy in its predictions.

In [None]:
train_cols = ['total_amount', 'duration_minutes','passenger_count','trip_distance']
train_df.to_csv('train.csv', index=False, header=False)
val_df.to_csv('validation.csv', index=False, header=False)
test_df.to_csv('test.csv', index=False, header=False)

# Save test and baseline with headers
train_df.to_csv('baseline.csv', index=False, header=True)

Now upload these CSV files to your default SageMaker S3 bucket. 

In [None]:
import sagemaker

# Get the session and default bucket
session = sagemaker.session.Session()
bucket = session.default_bucket()

# Specify data prefix and version
prefix = 'nyc-tlc/v1'

s3_train_uri = session.upload_data('train.csv', bucket, prefix + '/data/training')
s3_val_uri = session.upload_data('validation.csv', bucket, prefix + '/data/validation')
s3_test_uri = session.upload_data('test.csv', bucket, prefix + '/data/test')
s3_baseline_uri = session.upload_data('baseline.csv', bucket, prefix + '/data/baseline')

You will use the datasets which you have prepared and saved in this section to trigger the pipeline to train and deploy a model in the next section.

## Build

Si vamos a CodePipeline, notaremos que la etapa "Source" está inicialmente en un estado "Fallido". Esto sucede porque el conjunto de datos, que es una de las fuentes que pueden desencadenar el pipeline, aún no se ha cargado en la ubicación S3 esperada.

![Failed code pipeline](../docs/pipeline_failed.png)

### Trigger Build
 
En esta sección, iniciaremos un pipeline de implementación y compilación de modelos empaquetando los datasets que preparamos en la sección anterior y cargándolos en la ubicación de origen de S3 que activa la instancia de CodePipeline creada.

Primero, importamos algunas librerías y cargaremos algunas variables de entorno que se van a necesitar. Estas variables de entorno se han establecido a través de un script de [lifecycle configuration](https://docs.aws.amazon.com/sagemaker/latest/dg/notebook-lifecycle-config.html)⇗ adjunto a este notebook.

In [None]:
import boto3
from botocore.exceptions import ClientError
import os
import time

region = boto3.Session().region_name
artifact_bucket = os.environ['ARTIFACT_BUCKET']
pipeline_name = os.environ['PIPELINE_NAME']
model_name = os.environ['MODEL_NAME']
workflow_pipeline_arn = os.environ['WORKFLOW_PIPELINE_ARN']

print('region: {}'.format(region))
print('artifact bucket: {}'.format(artifact_bucket))
print('pipeline: {}'.format(pipeline_name))
print('model name: {}'.format(model_name))
print('workflow: {}'.format(workflow_pipeline_arn))

From the AWS CodePipeline [documentation](https://docs.aws.amazon.com/codepipeline/latest/userguide/tutorials-simple-s3.html)⇗:

> Cuando Amazon S3 es el proveedor de origen de un pipeline, puede comprimir un archivo o archivos de origen en un solo .zip y cargar el .zip en el backet de origen. También puede cargar un solo archivo descomprimido; sin embargo, las acciones posteriores que esperan un archivo .zip fallarán.

Para entrenar un modelo, necesitaremos varios datasets (entrenamiento, validación y prueba) junto con un archivo que especifica los hiperparámetros. En este ejemplo, crearemos un archivo JSON que contiene las ubicaciones de los datasets en S3 y un archivo JSON que contiene los valores de los hiperparámetros. Luego, comprimiremos ambos archivos en un zip que se utilizará como entrada para la ejecución del pipeline.

In [None]:
from io import BytesIO
import zipfile
import json

input_data = {
    'TrainingUri': s3_train_uri,
    'ValidationUri': s3_val_uri,
    'TestUri': s3_test_uri,
    'BaselineUri': s3_baseline_uri
}

hyperparameters = {
    'num_round': 50
}

zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, 'a') as zf:
    zf.writestr('inputData.json', json.dumps(input_data))
    zf.writestr('hyperparameters.json', json.dumps(hyperparameters))
zip_buffer.seek(0)

data_source_key = '{}/data-source.zip'.format(pipeline_name)

Now upload the zip package to your artifact S3 bucket - this action will trigger the pipeline to train and deploy a model.

In [None]:
s3 = boto3.client('s3')
s3.put_object(Bucket=artifact_bucket, Key=data_source_key, Body=bytearray(zip_buffer.read()))

Click the link below to open the AWS console at the Code Pipeline if you don't have it open in another tab.

<div class="alert alert-block alert-info">
    Tip: You may need to wait a minute to see the DataSource stage turn green. The page will refresh automatically.
</div>

![Source Green](../docs/datasource-after.png)

In [None]:
from IPython.core.display import HTML

HTML('<a target="_blank" href="https://{0}.console.aws.amazon.com/codesuite/codepipeline/pipelines/{1}/view?region={0}">Code Pipeline</a>'.format(region, pipeline_name))

### Inspect Build Logs

Once the build stage is running, you will see the AWS CodeBuild job turn blue with a status of **In progress**.

![Failed code pipeline](../docs/codebuild-inprogress.png)

You can click on the **Details** link displayed in the CodePipeline UI or click the link below to jump directly to the CodeBuild logs.

<div class="alert alert-block alert-info">
    Tip: You may need to wait a few seconds for the pipeline to transition into the active (blue) state and for the build to start.
</div>

In [None]:
codepipeline = boto3.client('codepipeline')

def get_pipeline_stage(pipeline_name, stage_name):
    response = codepipeline.get_pipeline_state(name=pipeline_name)
    for stage in response['stageStates']:
        if stage['stageName'] == stage_name:
            return stage

# Get last execution id
build_stage = get_pipeline_stage(pipeline_name, 'Build')    
if not 'latestExecution' in build_stage:
    raise(Exception('Please wait.  Build not started'))

build_url = build_stage['actionStates'][0]['latestExecution']['externalExecutionUrl']

# Out a link to the code build logs
HTML('<a target="_blank" href="{0}">Code Build Logs</a>'.format(build_url))

The AWS CodeBuild process is responsible for creating a number of AWS CloudFormation templates which we will explore in more detail in the next section.  Two of these templates are used to set up the **Train** step by creating the AWS Step Functions worklow and the custom AWS Lambda functions used within this workflow.

## Train Model

### Inspect Training Job

Wait until the pipeline has started running the Train step (see screenshot) before continuing with the next cells in this notebook. 

![Training in progress](../docs/train-in-progress.png)

When the pipeline has started running the train step, you can click on the **Details** link displayed in the CodePipeline UI (see screenshot above) to view the Step Functions workflow which is running the training job. 

Alternatively, you can click on the Workflow link from the cell output below once it's available.

In [None]:
from stepfunctions.workflow import Workflow
while True:
    try:
        workflow = Workflow.attach(workflow_pipeline_arn)
        break
    except ClientError as e:
        print(e.response["Error"]["Message"])
    time.sleep(10)

workflow

Or simply run the cell below to display the Step Functions workflow, and re-run it after a few minutes to see the progress.

In [None]:
executions = workflow.list_executions()
if not executions:
    raise(Exception('Please wait.  Training not started'))
    
executions[0].render_progress()

### Revisamos el script de Build

Mientras esperamos a que se complete el trabajo de entrenamiento, revisemos el código `run.py` que fue utilizado por el proceso de AWS CodeBuild.

Este script toma todos los parámetros de entrada, incluidas las ubicaciones de los datasets y los hiperparámetros que guardó en archivos JSON anteriormente en este notebook, y los usa para generar las plantillas que el pipeline necesita para ejecutar el job de entrenamiento. * No * crea la instancia de Step Functions real; solo genera las plantillas que definen el workflow de Step Functions, así como las plantillas de entrada de CloudFormation que CodePipeline usa para crear la instancia de Step Functions.

Paso a paso, el script hace lo siguiente:

1. Recopila todos los parámetros de entrada que necesita para generar las plantillas. Esto incluye información sobre el contenedor de entorno necesario para ejecutar el job de entrenamiento, las ubicaciones de los datos de entrada y salida, los roles de IAM necesarios para varios componentes, las claves de cifrado y más. Luego configura algunos parámetros básicos como la región de AWS y los nombres de las funciones.
1. Si los parámetros de entrada especifican un contenedor de entorno almacenado en ECR, obtiene ese contenedor. De lo contrario, obtiene el URI del contenedor del entorno administrado de AWS necesario para el job de entrenamiento.
1. Lee el archivo JSON de datos de entrada que generó anteriormente en este notebook (y que se incluyó en la fuente zip para el pipeline), obteniendo así las ubicaciones de entrenamiento, validación y los archivos de datos baseline. Luego, formatea más parámetros que se necesitarán más adelante en el script, incluidos los ID de versión y las ubicaciones de los datos de salida.
1. Lee el archivo JSON de hiperparámetro que generó anteriormente en este notebook.
1. Define el workflow de Step Functions, comenzando con el esquema de entrada, seguido de cada paso del flujo de trabajo (es decir, crear experimento, trabajo de referencia, trabajo de entrenamiento) y finalmente combina esos pasos en un gráfico de flujo de trabajo.
1. El gráfico del flujo de trabajo se guarda en un archivo, junto con un archivo que contiene todos los parámetros de entrada guardados de acuerdo con el esquema definido en el flujo de trabajo.
1. Guarda los parámetros en un archivo que CloudFormation utilizará para crear una instancia de workflow de Step Functions.

In [None]:
!pygmentize ../model/run.py

### Customize Workflow (Optional)

If you are interested in customising the workflow used in the Build Script, store the `input_data` to be used within the local [workflow.ipynb](workflow.ipynb) notebook. The workflow notebook can be used to experiment with the Step Functions workflow and training job definitions for your model.

In [None]:
%store input_data

### Training Analytics

Once the training and baseline jobs are complete (meaning they are displayed in a green color in the Step Functions workflow, this takes around 5 minutes), you can inspect the experiment metrics. The code below will display all experiments in a table. Note that the baseline processing job won't have RMSE metrics - it calculates metrics based on the training data, but does not train a machine learning model. 

You will [explore the baseline](#Explore-Baseline) results later in this notebook. <a id="validation-results"></a>

In [None]:
from sagemaker import analytics
experiment_name = 'mlops-{}'.format(model_name)
model_analytics = analytics.ExperimentAnalytics(experiment_name=experiment_name)
analytics_df = model_analytics.dataframe()

if (analytics_df.shape[0] == 0):
    raise(Exception('Please wait.  No training or baseline jobs'))

pd.set_option('display.max_colwidth', 100) # Increase column width to show full copmontent name
cols = ['TrialComponentName', 'DisplayName', 'SageMaker.InstanceType', 
        'train:rmse - Last', 'validation:rmse - Last'] # return the last rmse for training and validation
analytics_df[analytics_df.columns & cols].head(2)

## Deploy Dev

### Test Dev Deployment

Cuando el pipeline ha terminado de entrenar un modelo, pasa automáticamente al siguiente paso, donde el modelo se implementa como un endpoint de SageMaker. Este endpoint es parte de la implementación de desarrollo, por lo tanto, en esta sección, ejecutaremos algunas pruebas en el endpoint para decidir si deseamos implementar este modelo en producción.

Primero, ejecutamos la celda a continuación para buscar el nombre del SageMaker Endpoint.

In [None]:
codepipeline = boto3.client('codepipeline')

deploy_dev = get_pipeline_stage(pipeline_name, 'DeployDev')
if not 'latestExecution' in deploy_dev:
    raise(Exception('Please wait.  Deploy dev not started'))
    
execution_id = deploy_dev['latestExecution']['pipelineExecutionId']
dev_endpoint_name = 'mlops-{}-dev-{}'.format(model_name, execution_id)

print('endpoint name: {}'.format(dev_endpoint_name))

Si pasamos por la sección anterior muy rápidamente, deberemos esperar hasta que el endpoint de desarrollo se haya implementado con éxito y el pipeline esté esperando la aprobación para implementar en producción (ver captura de pantalla). SageMaker puede tardar hasta 10 minutos en crear un punto final.

![Deploying dev endpoint in code pipeline](../docs/dev-deploy-ready.png)

Alternativamente, ejecutaremos el código a continuación para verificar el estado del endpoint. Esperamos hasta que el estado sea 'InService'.

In [None]:
sm = boto3.client('sagemaker')

while True:
    try:
        response = sm.describe_endpoint(EndpointName=dev_endpoint_name)
        print("Endpoint status: {}".format(response['EndpointStatus']))
        if response['EndpointStatus'] == 'InService':
            break
    except ClientError as e:
        print(e.response["Error"]["Message"])
    time.sleep(10)

Ahora que el endpoint está listo, escribamos un código para ejecutar los datos de prueba (que separamos aneriormente del conjunto de datos y se guardó en un archivo al comienzo de este notebook) a través del endpoint de inferencia. El siguiente código es compatible con la v1 y la v2 del SDK de SageMaker, pero recomendamos utilizar la v2 del SDK en todos sus proyectos futuros.

In [None]:
import numpy as np
from tqdm import tqdm

try:
    # Support SageMaker v2 SDK: https://sagemaker.readthedocs.io/en/stable/v2.html
    from sagemaker.predictor import Predictor
    from sagemaker.serializers import CSVSerializer
    def get_predictor(endpoint_name):
        xgb_predictor = Predictor(endpoint_name)
        xgb_predictor.serializer = CSVSerializer()
        return xgb_predictor
except:
    # Fallback to SageMaker v1.70 SDK
    from sagemaker.predictor import RealTimePredictor, csv_serializer
    def get_predictor(endpoint_name):
        xgb_predictor = RealTimePredictor(endpoint_name)
        xgb_predictor.content_type = 'text/csv'
        xgb_predictor.serializer = csv_serializer
        return xgb_predictor

def predict(predictor, data, rows=500):
    split_array = np.array_split(data, round(data.shape[0] / float(rows)))
    predictions = ''
    for array in tqdm(split_array):
        predictions = ','.join([predictions, predictor.predict(array).decode('utf-8')])
    return np.fromstring(predictions[1:], sep=',')

Now use the `predict` function, which was defined in the code above, to run the test data through the endpoint and generate the predictions.

In [None]:
dev_predictor = get_predictor(dev_endpoint_name)
predictions = predict(dev_predictor, test_df[test_df.columns[1:]].values)

Next, load the predictions into a data frame, and join it with your test data. Then, calculate absolute error as the difference between the actual taxi fare and the predicted taxi fare. Display the results in a table, sorted by the highest absolute error values.

In [None]:
pred_df = pd.DataFrame({'total_amount_predictions': predictions })
pred_df = test_df.join(pred_df) # Join on all
pred_df['error'] = abs(pred_df['total_amount']-pred_df['total_amount_predictions'])

pred_df.sort_values('error', ascending=False).head()

From this table, we note that some short trip distances have large errors because the low predicted fare does not match the high actual fare. This could be the result of a generous tip which we haven't included in this dataset.

You can also analyze the results by plotting the absolute error to visualize outliers. In this graph, we see that most of the outliers are cases where the model predicted a much lower fare than the actual fare. There are only a few outliers where the model predicted a higher fare than the actual fare.

In [None]:
sns.scatterplot(data=pred_df, x='total_amount_predictions', y='total_amount', hue='error')

If you want one overall measure of quality for the model, you can calculate the root mean square error (RMSE) for the predicted fares compared to the actual fares. Compare this to the [results calculated on the validation set](#validation-results) at the end of the 'Inspect Training Job' section.

In [None]:
from math import sqrt
from sklearn.metrics import mean_squared_error

def rmse(pred_df):
    return sqrt(mean_squared_error(pred_df['total_amount'], pred_df['total_amount_predictions']))

print('RMSE: {}'.format(rmse(pred_df)))

## Deploy Prod

### Approve Deployment to Production

If you are happy with the results of the model, you can go ahead and approve the model to be deployed into production. You can do so by clicking the **Review** button in the CodePipeline UI, leaving a comment to explain why you approve this model, and clicking on **Approve**. 

Alternatively, you can create a Jupyter widget which (when enabled) allows you to comment and approve the model directly from this notebook. Run the cell below to see this in action.

In [None]:
import ipywidgets as widgets

def on_click(obj):
    result = { 'summary': approval_text.value, 'status': obj.description }
    response = codepipeline.put_approval_result(
      pipelineName=pipeline_name,
      stageName='DeployDev',
      actionName='ApproveDeploy',
      result=result,
      token=approval_action['token']
    )
    button_box.close()
    print(result)
    
# Create the widget if we are ready for approval
deploy_dev = get_pipeline_stage(pipeline_name, 'DeployDev')
if not 'latestExecution' in deploy_dev['actionStates'][-1]:
    raise(Exception('Please wait.  Deploy dev not complete'))

approval_action = deploy_dev['actionStates'][-1]['latestExecution']
if approval_action['status'] == 'Succeeded':
    print('Dev approved: {}'.format(approval_action['summary']))
elif 'token' in approval_action:
    approval_text = widgets.Text(placeholder='Optional approval message')   
    approve_btn = widgets.Button(description="Approved", button_style='success', icon='check')
    reject_btn = widgets.Button(description="Rejected", button_style='danger', icon='close')
    approve_btn.on_click(on_click)
    reject_btn.on_click(on_click)
    button_box = widgets.HBox([approval_text, approve_btn, reject_btn])
    display(button_box)
else:
    raise(Exception('Please wait. No dev approval'))

### Test Production Deployment

Aproximadamente un minuto después de aprobar la implementación del modelo, debería ver que el pipeline comienza en el paso final: implementar el modelo en producción. En esta sección, comprobaremos el estado de implementación y probaremos el endpoint de producción después de que se haya implementado.

![Deploy production endpoint in code pipeline](../docs/deploy-production.png)

Este paso del pipeline utiliza CloudFormation para implementar una serie de recursos en su nombre. En particular, crea:

1. Endpoint de sagemaker listo para producción , con [data capture](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-data-capture.html)⇗  (used by SageMaker Model Monitor) y [autoscaling](https://docs.aws.amazon.com/sagemaker/latest/dg/endpoint-auto-scaling.html)⇗ enabled.
1. Un [model monitoring schedule](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-scheduling.html)⇗ que envía los resultados a de CloudWatch Metrics, junto con un [CloudWatch Alarm](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/AlarmThatSendsEmail.html)⇗ que le notificará cuando ocurra una infracción.
1. Una instancia de CodeDeploy que crea una aplicación simple mediante la implementación de API Gateway, tres funciones de Lambda y una alarma para notificar el éxito o el fracaso de esta implementación. El código para las funciones de Lambda se puede encontrar en `api/app.py`,` api/pre_traffic_hook.py` y `api/ post_traffic_hook.py`. Estas funciones actualizan el endpoint para permitir la captura de datos, formatear y enviar el tráfico entrante al endpoint de SageMaker y capturar los registros de datos.

![Components of production deployment](../docs/cloud-formation.png)

Veamos cómo avanza la implementación. Utilice el siguiente código para obtener el ID de ejecución del paso de implementación. Luego, genere una tabla que enumere los recursos creados por el stack de CloudFormation y su estado de creación. Puede volver a ejecutar la celda después de unos minutos para ver cómo avanzan los pasos.

In [None]:
deploy_prd = get_pipeline_stage(pipeline_name, 'DeployPrd')
if not 'latestExecution' in deploy_prd or not 'latestExecution' in deploy_prd['actionStates'][0]:
    raise(Exception('Please wait.  Deploy prd not started'))
    
execution_id = deploy_prd['latestExecution']['pipelineExecutionId']

In [None]:
from datetime import datetime, timedelta
from dateutil.tz import tzlocal

def get_event_dataframe(events):
    stack_cols = ['LogicalResourceId', 'ResourceStatus', 'ResourceStatusReason', 'Timestamp']
    stack_event_df = pd.DataFrame(events)[stack_cols].fillna('')
    stack_event_df['TimeAgo'] = (datetime.now(tzlocal())-stack_event_df['Timestamp'])
    return stack_event_df.drop('Timestamp', axis=1)

cfn = boto3.client('cloudformation')

stack_name = stack_name='{}-deploy-prd'.format(pipeline_name)
print('stack name: {}'.format(stack_name))

# Get latest stack events
while True:
    try:
        response = cfn.describe_stack_events(StackName=stack_name)
        break
    except ClientError as e:
        print(e.response["Error"]["Message"])
    time.sleep(10)
    
get_event_dataframe(response['StackEvents']).head()

El recurso que más nos interesa es el endpoint. Esto demora una promedio de 10 minutos en implementarse. Mientras tanto, puede echar un vistazo al código Python utilizado para la aplicación. 

`App.py` es el principal punto de entrada que invoca el endpoint de Amazon SageMaker. Devuelve resultados junto con un encabezado personalizado para el endpoint que invocamos.

In [None]:
!pygmentize ../api/app.py

The `pre_traffic_hook.py` lambda is invoked prior to deployment and confirms the endpoint has data capture enabled.

In [None]:
!pygmentize ../api/pre_traffic_hook.py

The `post_traffic_hook.py` lambda is invoked to perform any final checks, in this case to verify that we have received log data from data capature.

In [None]:
!pygmentize ../api/post_traffic_hook.py

Use the code below to fetch the name of the endpoint, then run a loop to wait for the endpoint to be fully deployed. You need the status to be 'InService'.

In [None]:
prd_endpoint_name='mlops-{}-prd-{}'.format(model_name, execution_id)
print('prod endpoint: {}'.format(prd_endpoint_name))

In [None]:
sm = boto3.client('sagemaker')

while True:
    try:
        response = sm.describe_endpoint(EndpointName=prd_endpoint_name)
        print("Endpoint status: {}".format(response['EndpointStatus']))
        # Wait until the endpoint is in service with data capture enabled
        if response['EndpointStatus'] == 'InService' \
            and 'DataCaptureConfig' in response \
            and response['DataCaptureConfig']['EnableCapture']:
            break
    except ClientError as e:
        print(e.response["Error"]["Message"])
    time.sleep(10)

Cuando el estado del endpoint es 'InService', puede continuar. Anteriormente en este cuaderno, creamos un código para enviar datos al endpoint de desarrollo. Reutilice este código ahora para enviar una muestra de los datos de prueba al endpoint de producción. Dado que la captura de datos está habilitada en este punto final, desea enviar registros individuales a la vez, de modo que el monitor del modelo pueda asignar estos registros a la línea de base.

Más tarde haremos uso de [inspect the model monitor](#Inspect-Model-Monitor). For now, just check if you can send data to the endpoint and receive predictions in return. Por ahora, solo verifique si puede enviar datos al punto final y recibir predicciones a cambio.

In [None]:
prd_predictor = get_predictor(prd_endpoint_name)
sample_values = test_df[test_df.columns[1:]].sample(100).values
predictions = predict(prd_predictor, sample_values, rows=1)
predictions

### Test REST API

Aunque ya probó el endpoint de SageMaker en la sección anterior, también es una buena idea probar la aplicación creada con API Gateway.

![Traffic shift between endpoints](../docs/lambda-deploy-create.png)

Dar clic el enlace a continuación para abrir la implementación en Lambda, donde puede ver las implementaciones en progreso y completadas. También puede hacer clic para expandir la ** plantilla SAM ** para ver la plantilla de CloudFormation empaquetada utilizada en la implementación.

In [None]:
HTML('<a target="_blank" href="https://{0}.console.aws.amazon.com/lambda/home?region={0}#/applications/{1}-deploy-prd?tab=deploy">Lambda Deployment</a>'.format(region, model_name))

Ejecute el siguiente código para confirmar que el punto final está en servicio. Se completará una vez que la API REST esté disponible.

In [None]:
def get_stack_status(stack_name):
    response = cfn.describe_stacks(StackName=stack_name)
    if response['Stacks']:
        stack = response['Stacks'][0]
        outputs = None
        if 'Outputs' in stack:
            outputs = dict([(o['OutputKey'], o['OutputValue']) for o in stack['Outputs']])
        return stack['StackStatus'], outputs 

outputs = None
while True:
    try:
        status, outputs = get_stack_status(stack_name)
        response = sm.describe_endpoint(EndpointName=prd_endpoint_name)
        print("Endpoint status: {}".format(response['EndpointStatus']))
        if outputs:
            break
        elif status.endswith('FAILED'):
            raise(Exception('Stack status: {}'.format(status)))
    except ClientError as e:
        print(e.response["Error"]["Message"])
    time.sleep(10)

if outputs:
    print('deployment application: {}'.format(outputs['DeploymentApplication']))
    print('rest api: {}'.format(outputs['RestApi']))

Si está realizando una actualización en su implementación de producción como resultado de ejecutar [Trigger Retraining](#Trigger-Retraining), podrá expandir la pestaña Implementación de Lambda para revelar los recursos. Haga clic en el enlace **ApiFunctionAliaslive** para ver la implementación de Lambda en curso.

![Traffic shift between endpoints](../docs/lambda-deploy-update.png)

Esta página se actualizará para enumerar los eventos de implementación. También tiene un enlace a la aplicación de implementación a la que puede acceder en la salida de la siguiente celda.

In [None]:
HTML('<a target="_blank" href="https://{0}.console.aws.amazon.com/codesuite/codedeploy/applications/{1}?region={0}">CodeDeploy application</a>'.format(region, outputs['DeploymentApplication']))

CodeDeploy realizará una implementación canary y enviará el 10% del tráfico al nuevo endpoint durante un período de 5 minutos.

![Traffic shift between endpoints](../docs/code-deploy.gif)

Podemos invocar la API REST e inspeccionar los encabezados que se devuelven para ver qué endpoint estamos alcanzando. Ocasionalmente veremos que la celda a continuación muestra un endpoint diferente que se ajusta a la nueva versión una vez que se completa el stack.

In [None]:
%%time

from urllib import request

headers = {"Content-type": "text/csv"}
payload = test_df[test_df.columns[1:]].head(1).to_csv(header=False, index=False).encode('utf-8')
rest_api = outputs['RestApi']

while True:
    try:
        resp = request.urlopen(request.Request(rest_api, data=payload, headers=headers))
        print("Response code: %d: endpoint: %s" % (resp.getcode(), resp.getheader('x-sagemaker-endpoint')))
        status, outputs = get_stack_status(stack_name) 
        if status.endswith('COMPLETE'):
            print('Deployment complete\n')
            break
        elif status.endswith('FAILED'):
            raise(Exception('Stack status: {}'.format(status)))
    except ClientError as e:
        print(e.response["Error"]["Message"])
    time.sleep(10)

## Monitor

### Inspect Model Monitor

Cuando preparamos los conjuntos de datos para el entrenamiento de modelos al comienzo de este cuaderno, guardó un conjunto de datos de referencia (una copia del conjunto de datos del tren). Luego, cuando aprobó el modelo para la implementación en producción, la canalización configuró un SageMaker Endpoint con la captura de datos habilitada y un programa de monitoreo del modelo. En esta sección, observará más de cerca los resultados del monitor modelo.

Para comenzar, obtenga el ID de ejecución de implementación de producción más reciente.

In [None]:
deploy_prd = get_pipeline_stage(pipeline_name, 'DeployPrd')
if not 'latestExecution' in deploy_prd:
    raise(Exception('Please wait.  Deploy prod not complete'))
    
execution_id = deploy_prd['latestExecution']['pipelineExecutionId']

Bajo el capó, el monitor modelo SageMaker se ejecuta en trabajos de procesamiento de SageMaker. Utilice el ID de ejecución para obtener los nombres del trabajo de procesamiento y la scheduling.

In [None]:
processing_job_name='mlops-{}-pbl-{}'.format(model_name, execution_id)
schedule_name='mlops-{}-pms'.format(model_name)

print('processing job name: {}'.format(processing_job_name))
print('schedule name: {}'.format(schedule_name))

### Explore Baseline

Ahora obtenga los resultados del baseline del trabajo de procesamiento. Esta celda generará una excepción si el trabajo de procesamiento no está completo; si eso sucede, espere varios minutos y vuelva a intentarlo. <a id="view-baseline-results"></a>

In [None]:
import sagemaker
from sagemaker.model_monitor import BaseliningJob, MonitoringExecution
from sagemaker.s3 import S3Downloader

sagemaker_session = sagemaker.Session()
baseline_job = BaseliningJob.from_processing_name(sagemaker_session, processing_job_name)
status = baseline_job.describe()['ProcessingJobStatus']
if status != 'Completed':
    raise(Exception('Please wait. Processing job not complete, status: {}'.format(status)))
    
baseline_results_uri  = baseline_job.outputs[0].destination

SageMaker model monitor generates two types of files. Take a look at the statistics file first. It calculates various statistics for each feature of the dataset, including the mean, standard deviation, minimum value, maximum value, and more. 

In [None]:
import pandas as pd
import json

baseline_statistics = baseline_job.baseline_statistics().body_dict
schema_df = pd.json_normalize(baseline_statistics["features"])
schema_df[["name", "numerical_statistics.mean", "numerical_statistics.std_dev",
           "numerical_statistics.min", "numerical_statistics.max"]].head()

Now look at the suggested [constraints files](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-byoc-constraints.html)⇗. As the name implies, these are constraints which SageMaker model monitor recommends. If the live data which is sent to your production SageMaker Endpoint violates these constraints, this indicates data drift, and model monitor can raise an alert to trigger retraining. Of course, you can set different constraints based on the statistics which you viewed previously.

In [None]:
baseline_constraints = baseline_job.suggested_constraints().body_dict
constraints_df = pd.json_normalize(baseline_constraints["features"])
constraints_df.head()

### View data capture

When the "Deploy Production" stage of the MLOps pipeline deploys a SageMaker endpoint, it also enables data capture. This means the incoming requests to the endpoint, as well as the results from the ML model, are stored in an S3 location. Model monitor can analyze this data and compare it to the baseline to ensure that no constraints are violated. 

Use the code below to check how many files have been created by the data capture, and view the latest file in detail. Note, data capture relies on data being sent to the production endpoint. If you don't see any files yet, wait several minutes and try again.

In [None]:
bucket = sagemaker_session.default_bucket()
data_capture_logs_uri = 's3://{}/{}/datacapture/{}'.format(bucket, model_name, prd_endpoint_name)

capture_files = S3Downloader.list(data_capture_logs_uri)
print('Found {} files'.format(len(capture_files)))

if capture_files:
    # Get the first line of the most recent file    
    event = json.loads(S3Downloader.read_file(capture_files[-1]).split('\n')[0])
    print('\nLast file:\n{}'.format(json.dumps(event, indent=2)))

### View monitoring schedule

There are some useful functions for plotting and rendering distribution statistics or constraint violations provided in a `utils` file in the [SageMaker Examples GitHub](https://github.com/aws/amazon-sagemaker-examples/tree/master/sagemaker_model_monitor/visualization)⇗. Grab a copy of this code to use in this notebook. 

In [None]:
!wget -O utils.py --quiet https://raw.githubusercontent.com/awslabs/amazon-sagemaker-examples/master/sagemaker_model_monitor/visualization/utils.py
import utils as mu

The [minimum scheduled run time](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-scheduling.html)⇗ for model monitor is one hour, which means you will need to wait at least an hour to see any results. Use the code below to check the schedule status and list the next run. If you are completing this notebook as part of a workshop, your host will have activities which you can complete while you wait. 

In [None]:
sm = boto3.client('sagemaker')

response = sm.describe_monitoring_schedule(MonitoringScheduleName=schedule_name)
print('Schedule Status: {}'.format(response['MonitoringScheduleStatus']))

now = datetime.now(tzlocal())
next_hour = (now+timedelta(hours=1)).replace(minute=0)
scheduled_diff = (next_hour-now).seconds//60
print('Next schedule in {} minutes'.format(scheduled_diff))

While you wait, you can take a look at the CloudFormation template which is used as a base for the CloudFormation template built by CodeDeploy to deploy the production application. 

Alterntively, you can jump ahead to [Trigger Retraining](#Trigger-Retraining) which will kick off another run of the code pipeline whilst you wait.

In [None]:
!cat ../assets/deploy-model-prd.yml

A couple of minutes after the model monitoring schedule has run, you can use the code below to fetch the latest schedule status.  A completed schedule run may have found violations. 

In [None]:
processing_job_arn = None

while processing_job_arn == None:
    try:
        response = sm.list_monitoring_executions(MonitoringScheduleName=schedule_name)
    except ClientError as e:
        print(e.response["Error"]["Message"])
    for mon in response['MonitoringExecutionSummaries']:
        status = mon['MonitoringExecutionStatus']
        now = datetime.now(tzlocal())
        created_diff = (now-mon['CreationTime']).seconds//60
        print('Schedule status: {}, Created: {} minutes ago'.format(status, created_diff))
        if status in ['Completed', 'CompletedWithViolations']:
            processing_job_arn = mon['ProcessingJobArn']
            break
        if status == 'InProgress':
            break
    else:
        raise(Exception('Please wait.  No Schedules executing'))
    time.sleep(10)

### View monitoring results

Once the model monitoring schedule has had a chance to run at least once, you can take a look at the results. First, load the monitoring execution results from the latest scheduled run.

In [None]:
if processing_job_arn:
    execution = MonitoringExecution.from_processing_arn(sagemaker_session=sagemaker.Session(),
                                                        processing_job_arn=processing_job_arn)
    exec_inputs = {inp['InputName']: inp for inp in execution.describe()['ProcessingInputs']}
    exec_results_uri = execution.output.destination

    print('Monitoring Execution results: {}'.format(exec_results_uri))

Eche un vistazo a los archivos que se han guardado en la ubicación de salida de S3. Si se encontraron infracciones, debería ver un archivo de infracciones de restricciones además del archivo de estadísticas y restricciones que vio antes.

In [None]:
!aws s3 ls $exec_results_uri/

Ahora, buscamos las estadísticas de seguimiento y las infracciones. Luego usamos el código de utils para visualizar los resultados en una tabla. Esta resaltará cualquier desviación del baseline encontrada por el monitor del modelo. La desviación puede ocurrir para features categóricos (para estilos de cadena inferidos) o para features numéricas (por ejemplo, monto total de la tarifa).

In [None]:
# Get the baseline and monitoring statistics & violations
baseline_statistics = baseline_job.baseline_statistics().body_dict
execution_statistics = execution.statistics().body_dict
violations = execution.constraint_violations().body_dict['violations']

In [None]:
mu.show_violation_df(baseline_statistics=baseline_statistics, 
                     latest_statistics=execution_statistics, 
                     violations=violations)

### Trigger Retraining

La instancia de CodePipeline es configurada con [CloudWatch Events](https://docs.aws.amazon.com/codepipeline/latest/userguide/create-cloudtrail-S3-source.html)⇗ para empezar el pipeline para reentrenar cuando la detección de deriva (drift detection) activa alarmas métricas específicas.

Puede simular la desviación colocando un valor de métrica por encima del umbral de "0,2" directamente en CloudWatch. Esto activará la alarma e iniciará code pipeline.

<div class="alert alert-block alert-info">
    Tip:Esta alarma está configurada solo para el último endpoint de producción, por lo que el reentrenamiento solo se producirá si compara las métricas con el último endpoint.
</div>

![Metric graph in CloudWatch](../docs/cloudwatch-alarm.png)

Ejecutamos el siguiente código para activar la alarma métrica. La salida de la celda será un enlace a CloudWatch, donde puede ver la alarma (similar a la captura de pantalla anterior) y un enlace a CodePipeline que verá que se ejecuta nuevamente. Tenga en cuenta que pueden pasar un par de minutos hasta que todo se active.

In [None]:
from datetime import datetime
import random

cloudwatch = boto3.client('cloudwatch')

# Define the metric name and threshold
metric_name = 'feature_baseline_drift_total_amount'
metric_threshold = 0.2

# Put a new metric to trigger an alaram
def put_drift_metric(value):
    print('Putting metric: {}'.format(value))
    response = cloudwatch.put_metric_data(
        Namespace='aws/sagemaker/Endpoints/data-metrics',
        MetricData=[
            {
                'MetricName': metric_name,
                'Dimensions': [
                    {
                        'Name': 'MonitoringSchedule',
                        'Value': schedule_name
                    },
                    {
                        'Name': 'Endpoint',
                        'Value': prd_endpoint_name
                    },
                ],
                'Timestamp': datetime.now(),
                'Value': value,
                'Unit': 'None'
            },
        ]
    )
    
def get_drift_stats():
    response = cloudwatch.get_metric_statistics(
        Namespace='aws/sagemaker/Endpoints/data-metrics',
        MetricName=metric_name,
        Dimensions=[
            {
                'Name': 'MonitoringSchedule',
                'Value': schedule_name
            },
            {
                'Name': 'Endpoint',
                'Value': prd_endpoint_name
            },
        ],
        StartTime=datetime.now() - timedelta(minutes=2),
        EndTime=datetime.now(),
        Period=1,
        Statistics=['Average'],
        Unit='None'
    )
    if 'Datapoints' in response and len(response['Datapoints']) > 0:        
        return response['Datapoints'][0]['Average']
    return 0    

print('Simluate drift on endpoint: {}'.format(prd_endpoint_name))

while True:
    put_drift_metric(round(random.uniform(metric_threshold, 1.0), 4))
    drift_stats = get_drift_stats()
    print('Average drift amount: {}'.format(get_drift_stats()))
    if drift_stats > metric_threshold:
        break
    time.sleep(1)

Haga clic en el historial de ejecución de alarmas y CodePipeline con los enlaces a continuación.

In [None]:
# Output a html link to the cloudwatch dashboard
metric_alarm_name = 'mlops-{}-metric-gt-threshold'.format(model_name)
HTML('''<a target="_blank" href="https://{0}.console.aws.amazon.com/cloudwatch/home?region={0}#alarmsV2:alarm/{1}">CloudWatch Alarm</a> triggers
     <a target="_blank" href="https://{0}.console.aws.amazon.com/codesuite/codepipeline/pipelines/{2}/executions?region={0}">Code Pipeline Execution</a>'''.format(region, metric_alarm_name, pipeline_name))

Una vez que el pipeline se esté ejecutando de nuevo, podemos volver a [Inspect Training Job](#Inspect-Training-Job)

### Create Synthetic Monitoring

[Amazon CloudWatch Synthetics](https://aws.amazon.com/blogs/aws/new-use-cloudwatch-synthetics-to-monitor-sites-api-endpoints-web-workflows-and-more/) permite monitorear sitios, API REST y otros servicios implementados en AWS. Puede configurar un canary para probar que su API REST está devolviendo un valor esperado en un intervalo regular. Esta es una excelente manera de validar que la implementación blue/green no está causando ningún tiempo de inactividad para sus usuarios finales.

Usamos el código a continuación para configurar un canary para probar continuamente la implementación de producción. Este canario simplemente hace ping a la API REST para probar si está en vivo, usando el código de `notebook / canary.js`.

In [None]:
from urllib.parse import urlparse
from string import Template
from io import BytesIO
import zipfile

# Format the canary_js with rest_api and payload
rest_url = urlparse(rest_api)

with open('canary.js') as f:
    canary_js = Template(f.read()).substitute(hostname=rest_url.netloc, path=rest_url.path, 
                                              data=payload.decode('utf-8').strip())
# Write the zip file
zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, 'w') as zf:
    zip_path = 'nodejs/node_modules/apiCanaryBlueprint.js' # Set a valid path
    zip_info = zipfile.ZipInfo(zip_path)
    zip_info.external_attr = 0o0755 << 16 # Ensure the file is readable
    zf.writestr(zip_info, canary_js)
zip_buffer.seek(0)

# Create the canary
synth = boto3.client('synthetics')

role = sagemaker.get_execution_role()
s3_canary_uri = 's3://{}/{}'.format(artifact_bucket, model_name)
canary_name = 'mlops-{}'.format(model_name)

try:
    response = synth.create_canary(
        Name=canary_name,
        Code={
            'ZipFile': bytearray(zip_buffer.read()),
            'Handler': 'apiCanaryBlueprint.handler'
        },
        ArtifactS3Location=s3_canary_uri,
        ExecutionRoleArn=role,
        Schedule={ 
            'Expression': 'rate(10 minutes)', 
            'DurationInSeconds': 0 },
        RunConfig={
            'TimeoutInSeconds': 60,
            'MemoryInMB': 960
        },
        SuccessRetentionPeriodInDays=31,
        FailureRetentionPeriodInDays=31,
        RuntimeVersion='syn-nodejs-2.0',
    )
    print('Creating canary: {}'.format(canary_name))    
except ClientError as e:
    if e.response["Error"]["Code"] == "AccessDeniedException":
        print('Canary not supported.') # Not supported in event engine
    else:
        raise(e)

Ahora creamos una alarma de CloudWatch que se activará si la tasa de éxito del canario cae por debajo del 90%.

In [None]:
cloudwatch = boto3.client('cloudwatch')

canary_alarm_name = '{}-synth-lt-threshold'.format(canary_name)

response = cloudwatch.put_metric_alarm(
    AlarmName=canary_alarm_name,
    ComparisonOperator='LessThanThreshold',
    EvaluationPeriods=1,
    DatapointsToAlarm=1,
    Period=600, # 10 minute interval
    Statistic='Average',
    Threshold=90.0,
    ActionsEnabled=False,
    AlarmDescription='SuccessPercent LessThanThreshold 90%',
    Namespace='CloudWatchSynthetics',
    MetricName='SuccessPercent',
    Dimensions=[
        {
          'Name': 'CanaryName',
          'Value': canary_name
        },
    ],
    Unit='Seconds'
)

print('Creating alarm: {}'.format(canary_alarm_name))

Ejecute el código a continuación para verificar si el canary se está ejecutando correctamente. La celda generará un enlace a la interfaz de usuario de CloudWatch Canaries, donde puede ver los resultados a lo largo del tiempo (ver captura de pantalla). El canary puede tardar un par de minutos en desplegarse.

![Canary graph in CloudWatch](../docs/canary-green-1hr.png)

In [None]:
while True:
    try:
        response = synth.get_canary(Name=canary_name)
        status = response['Canary']['Status']['State']    
        print('Canary status: {}'.format(status))
        if status == 'ERROR':
            raise(Exception(response['Canary']['Status']['StateReason']))    
        elif status == 'READY':
            synth.start_canary(Name=canary_name)
        elif status == 'RUNNING':
            break        
    except ClientError as e:
        if e.response["Error"]["Code"] == "ResourceNotFoundException":
            print('No canary found.')
            break
        elif e.response["Error"]["Code"] == "AccessDeniedException":
            print('Canary not supported.') # Not supported in event engine
            break
        print(e.response["Error"]["Message"])
    time.sleep(10)

# Output a html link to the cloudwatch console
HTML('<a target="_blank" href="https://{0}.console.aws.amazon.com/cloudwatch/home?region={0}#synthetics:canary/detail/{1}">CloudWatch Canary</a>'.format(region, canary_name))

### Crear un dashboard en CloudWatch

Por último, utilizaremos el código a continuación para crear un panel de CloudWatch para visualizar las alarmas y métricas de rendimiento clave que ha creado durante esta demo. La celda generará un enlace al dashboard. Este panel muestra 9 gráficos en tres filas, donde la primera fila muestra las métricas de Lambda, la segunda fila muestra las métricas de SageMaker y la tercera fila (que se muestra en la captura de pantalla a continuación) muestra las alarmas configuradas para el pipeline.

![Graphs in CloudWatch dashboard](../docs/cloudwatch-dashboard.png)

In [None]:
sts = boto3.client('sts')
account_id = sts.get_caller_identity().get('Account')
dashboard_name = 'mlops-{}'.format(model_name)

with open('dashboard.json') as f:
    dashboard_body = Template(f.read()).substitute(region=region, account_id=account_id, model_name=model_name)
    response = cloudwatch.put_dashboard(
        DashboardName=dashboard_name,
        DashboardBody=dashboard_body
    )

# Output a html link to the cloudwatch dashboard
HTML('<a target="_blank" href="https://{0}.console.aws.amazon.com/cloudwatch/home?region={0}#dashboards:name={1}">CloudWatch Dashboard</a>'.format(region, canary_name))

Congratulations! You have made it to the end of this notebook, and have automated a safe MLOps pipeline using a wide range of AWS services. 

You can use the other notebook in this repository [workflow.ipynb](workflow.ipynb) to implement your own ML model and deploy it as part of this pipeline. Or, if you are finished with the content, follow the instructions in the next section to clean up the resources you have deployed.

## Cleanup

Execute the following cell to delete the stacks created in the pipeline. For a model name of **nyctaxi** these would be:

1. *nyctaxi*-deploy-prd
2. *nyctaxi*-deploy-dev
3. *nyctaxi*-workflow
4. sagemaker-custom-resource

In [None]:
cfn = boto3.client('cloudformation')

# Delete the prod and then dev stack
for stack_name in [f'{pipeline_name}-deploy-prd', 
                   f'{pipeline_name}-deploy-dev',
                   f'{pipeline_name}-workflow',
                   'sagemaker-custom-resource']:
    print('Deleting stack: {}'.format(stack_name))
    cfn.delete_stack(StackName=stack_name)
    cfn.get_waiter('stack_delete_complete').wait(StackName=stack_name)

The following code will stop and delete the canary you created.

In [None]:
while True:
    try:
        response = synth.get_canary(Name=canary_name)
        status = response['Canary']['Status']['State']    
        print('Canary status: {}'.format(status))
        if status == 'ERROR':
            raise(Exception(response['Canary']['Status']['StateReason']))    
        elif status == 'STOPPED':
            synth.delete_canary(Name=canary_name)
        elif status == 'RUNNING':
            synth.stop_canary(Name=canary_name)
    except ClientError as e:
        if e.response["Error"]["Code"] == "ResourceNotFoundException":
            print('Canary succesfully deleted.')
            break
        elif e.response["Error"]["Code"] == "AccessDeniedException":
            print('Canary not created.') # Not supported in event engine
            break
        print(e.response["Error"]["Message"])
    time.sleep(10)

The following code will delete the dashboard.

In [None]:
cloudwatch.delete_alarms(AlarmNames=[canary_alarm_name])
print('Alarm deleted')

cloudwatch.delete_dashboards(DashboardNames=[dashboard_name])
print('Dashboard deleted')

Finally, close this notebook and you can delete the CloudFormation you created to launch this MLOps sample.