### Capítulo 10

Automatización del Pipeline
===========================

### Objetivo

Integrar todos los componentes previamente construidos en un workflow orquestado por **AWS Step Functions** y que permita ejecutar todo el pipeline sin necesidad de depender del Jupyter Notebook.

Una vez que terminemos, el resultado será el pipeline que se muestra en la animación, la representación gráfica es generada por **AWS Step Functions**. Lo crearemos reutilizando los componentes previamente creados.

![Pipeline](img/Pipeline.gif)

Cómo funciona Step Functions
============================

AWS Step Functions permite coordinar múltiples servicios AWS en un workflow _serverless_. Utilizando **AWS Step Functions**, es posible ejecutar workflows que combinen servicios como **Amazon SageMaker** y **AWS Lambda**, entre otros. Los workflows constan de _pasos_, en los cuales la salida de un _paso_ se convierte en la entrada del siguiente.

El SDK de Step Functions para Data Science permite crear y ejecutar workflows de Machine Learning con **AWS Step Functions** directamente desde Python, y desde un Jupyter Notebook. Cuando la definición del workflow esta terminada, es subido al servicio **AWS Step Functions** para su ejecución en el Cloud. De esa forma, una vez creado o actualizado el workflow, este vive en el Cloud y puede ser reutilizado.

El workflow puede ser ejecutado tantas veces como sea necesario, y opcionalmente es posible cambiar la entrada de datos del workflow en cada ejecución. Cada que el workflow es ejecutado, este crea una nueva instancia de ejecución en el Cloud. Es posible ejecutarlo varias veces en paralelo.

A través del SDK es posible crear los pasos, encadenarlos juntos para un workflow, crear el workflow en **AWS StepFunctions**, y ejecutar el workflow en AWS cloud.

![Step Functions](img/StepFunctions1.png?width=50pc)

Una vez en ejecución, puede ser inspeccionado el progreso mediante comandos SDK o a través de la consola web de administración.

![Step Functions](img/StepFunctions2.png?width=40pc)

Utilizaremos **AWS Step Functions** para crear un worflow que nos permita integrar y automatizar todos los pasos antes creados en **Amazon SageMaker**.

Creación de Roles
=================

Lo primero que tenemos que hacer antes de crear el workflow para automatizar el pipeline, es crear **dos** [roles](https://docs.aws.amazon.com/es_es/IAM/latest/UserGuide/id_roles.html) que serán utilizados por los servicios **AWS Step Functions** y **AWS Lambda**.

Empezamos por importar las dependencias que utilizaremos para la definición y ejecución del workflow.


In [1]:
!wget http://amazon-sagemaker.com/dependencies/utils.py

--2022-05-02 19:15:21--  http://amazon-sagemaker.com/dependencies/utils.py
Resolving amazon-sagemaker.com (amazon-sagemaker.com)... 13.227.92.115, 13.227.92.71, 13.227.92.94, ...
Connecting to amazon-sagemaker.com (amazon-sagemaker.com)|13.227.92.115|:80... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://amazon-sagemaker.com/dependencies/utils.py [following]
--2022-05-02 19:15:22--  https://amazon-sagemaker.com/dependencies/utils.py
Connecting to amazon-sagemaker.com (amazon-sagemaker.com)|13.227.92.115|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 6039 (5.9K) [text/x-python]
Saving to: ‘utils.py.4’


2022-05-02 19:15:22 (720 MB/s) - ‘utils.py.4’ saved [6039/6039]



In [2]:
import datetime
import stepfunctions
import logging
import time
import sagemaker
import sagemaker_utils
import utils

from stepfunctions import steps
from stepfunctions.steps import TrainingStep, ModelStep
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow

from sklearn.metrics import confusion_matrix
from sagemaker import Session, get_execution_role
from sagemaker.estimator import Estimator
from sagemaker.processing import Processor, ProcessingInput, ProcessingOutput
from sagemaker.tuner import HyperparameterTuner, ContinuousParameter, IntegerParameter, CategoricalParameter
from sagemaker.inputs import TrainingInput, CreateModelInput, TransformInput
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CreateModelStep, TransformStep
from sagemaker.workflow.parameters import ParameterString, ParameterFloat
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


from sagemaker import Session
from sagemaker.s3 import S3Uploader
session = Session()
sess = session

stepfunctions.set_stream_logger(level=logging.INFO)

sagemaker_role = "arn:aws:iam::829825986145:role/service-role/AmazonSageMaker-ExecutionRole-20220424T173630"

region = session.boto_region_name
account_id = session.account_id()
bucket = session.default_bucket()

prefix = 'churn-clf'
datasets_prefix = f'{prefix}/datasets'
processed_data_prefix = f'{prefix}/processed'
eval_prefix = f'{prefix}/eval'
transformed_data_prefix = f'{prefix}/transformed'
images_directory = f'{prefix}/images'
code_prefix = f'{prefix}/code'
model_prefix = f'{prefix}/models'

#processing
train_data_file = 'train_data.csv'
train_target_file = 'train_target.csv'
test_data_file = 'test_data.csv'
test_target_file = 'test_target.csv'
encoder_file = 'encoder.pkl'

  from .autonotebook import tqdm as notebook_tqdm


Y definimos algunas variables con nombres que más adelante utilizaremos.

In [3]:
workflow_role_name = '{}-StepFunctionsWorkflowExecutionRole'.format(prefix) 
lambda_role_name = '{}-LambdaExecutionRole'.format(prefix)
model_function_name = '{}-ModelFunction'.format(prefix)
select_model_function_name = '{}-SelectModelFunction'.format(prefix)

A continuación definimos la política de permisos para el _Rol de Ejecución_ para **AWS Step Functions**, es decir para el workflow como tal.

In [4]:
step_functions_policy_document={
    "Version": "2012-10-17",
    "Statement": [           
        {
            "Effect": "Allow",
            "Action": "iam:PassRole",
            "Resource": "*",
            "Condition": {
                "StringEquals": {
                    "iam:PassedToService": "sagemaker.amazonaws.com"
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "states:*",
                "sagemaker:*",
                "lambda:*",
                "cloudwatch:*",
                "events:*"
            ],
            "Resource": "*"
        }
    ]
}

In [5]:
step_functions_asume_role_document = {
    "Version": "2012-10-17",
    "Statement": [{
        "Effect": "Allow",
        "Principal": {
            "Service": "states.amazonaws.com"
        },
        "Action": "sts:AssumeRole"
    }]
}


Y utilizando el siguiente método para crear el _Rol de Ejecución_ para **AWS Step Functions**.

In [6]:
workflow_role = utils.create_or_update_iam_role(role_name = workflow_role_name, 
                          role_desc = 'Execution role for Step Functions workflow', 
                          asume_role_policy_document = step_functions_asume_role_document,
                          policy_name = 'StepFunctionsWorkflowExecutionPolicy',
                          policy_document = step_functions_policy_document)


INFO: Role already exists, updating it...
INFO: Role updated: churn-clf-StepFunctionsWorkflowExecutionRole


Siguiendo el mismo procedimiento creamos el _Rol de Ejecución_ para **AWS Lambda**.

In [7]:
lambda_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sagemaker:*",
                "s3:*",
                "states:StartExecution",
                "iam:PassRole",
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        }
    ]
}


In [8]:
lambda_asume_role_document = {
    "Version": "2012-10-17",
    "Statement": [{
        "Effect": "Allow",
        "Principal": {
            "Service": "lambda.amazonaws.com"
        },
        "Action": "sts:AssumeRole"
    }]
}


In [9]:
lambda_role = utils.create_or_update_iam_role(role_name = lambda_role_name, 
                                        role_desc = 'Execution role for Lambda functions', 
                                        asume_role_policy_document = lambda_asume_role_document,
                                        policy_name = 'LambdaExecutionPolicy',
                                        policy_document = lambda_policy_document)


INFO: Role already exists, updating it...
INFO: Role updated: churn-clf-LambdaExecutionRole


Una vez creado los roles, estamos listos para iniciar la definición del workflow.

Parámetros del workflow
=======================

Empezamos por definir los parámetros de entrada que recibirá el workflow, estos parámetros podremos posteriormente utilizarlos en la ejecución de los pasos del workflow y de esta forma parametrizar algunas cosas y volver más flexible nuestro pipeline. Esto lo hacemos mediante el uso de la función [ExecutionInput()](https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/placeholders.html?highlight=ExecutionInput#stepfunctions.inputs.ExecutionInput) .

In [10]:
execution_input = ExecutionInput(schema={
        'ProcessingJobName': str,           
        'GradientBoostingTuningJobName': str,
        'RandomForestTuningJobName': str,
        'ExtraTreesTuningJobName': str,
        'GradientBoostingTransformJobName': str,
        'RandomForestTransformJobName': str,
        'ExtraTreesTransformJobName': str,
        'GradientBoostingModelName': str,
        'RandomForestModelName': str,
        'ExtraTreesModelName': str,
        'GradientBoostingEvaluationJobName': str,
        'RandomForestEvaluationJobName': str,
        'ExtraTreesEvaluationJobName': str,
        'ModelLambdaFunctionName': str, 
        'EndpointConfigName': str,
        'EndpointName': str
    })

Subir scripts a S3
==================

Para poder agregar la creación de los Jobs de **Amazon SageMaker** para el procesamiento, entrenamiento, evaluación y despliegue; necesitamos subir a **Amazon S3** los scripts necesarios para cada uno de estos procesos. Para esto utilizamos el método [S3Uploader.upload( )](https://sagemaker.readthedocs.io/en/stable/api/utility/s3.html#sagemaker.s3.S3Uploader) .

In [11]:
import tarfile
import os.path

processing_code_path = S3Uploader.upload(local_path='processing.py',
                                            desired_s3_uri='s3://{}/{}'.format(bucket, code_prefix),
                                            sagemaker_session=sess)

with tarfile.open('train_and_deploy.tar.gz', 'w:gz') as tar:
    tar.add('train_and_deploy.py')
    
train_and_deploy_code_path = S3Uploader.upload(local_path='train_and_deploy.tar.gz',
                                            desired_s3_uri='s3://{}/{}'.format(bucket, code_prefix),
                                            sagemaker_session=sess)

model_evaluation_code_path = S3Uploader.upload(local_path='evaluate_models.py',
                                            desired_s3_uri='s3://{}/{}'.format(bucket, code_prefix),
                                            sagemaker_session=sess)   


In [12]:
train_and_deploy_code_path

's3://sagemaker-us-east-1-829825986145/churn-clf/code/train_and_deploy.tar.gz'


Una vez teniendo los scripts arriba, podemos empezar a crear cada uno de los pasos que agregaremos al workflow.

Agregar procesamiento
=====================

Para crear un primer paso en el workflow, para el pre-procesamiento de los datos, utilizamos el método [steps.sagemaker.ProcessingStep( )](https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/sagemaker.html?highlight=steps.sagemaker.ProcessingStep#stepfunctions.steps.sagemaker.ProcessingStep)

Proporcionando los siguientes parámetros:

*   `state_id` – Identificador del estado (paso) del workflow, este aparecerá como el nombre en el diagrama del workflow
*   `processor` – El processor que será utilizado para ejecutar el job, en este caso el que fue creado en [3.4 Crear Processing Job](/es/sagemaker/dataprep/processingjob)
*   `job_name` – Nombre del job a crear, en este caso será el valor recibido en le parámetro `ProcessingJobName` del workflow
*   `inputs` – Rutas de **Amazon S3** de las cuales se tomarán los archivos de entrada para el proceso
*   `outputs` – Ruta de **Amazon S3** en la cual queremos que se deposite el resultado de la ejecución del proceso
*   `container_arguments` – Parámetros que recibirá el container en el cual se ejecutará el script
*   `container_entrypoint` – Ruta del script a ejecutar dentro del container

In [13]:
docker_images = {'Processing': {'libraries': {'pandas': '1.2.4',
   'numpy': '1.20.2',
   'scikit-learn': '0.24.2'},
  'build_id': 'churn-clf-processing-build-image:7e3a3c1e-4038-4a84-aae6-408606f73789',
  'image_uri': '829825986145.dkr.ecr.us-east-1.amazonaws.com/churn-clf-processing:latest'},
 'Training': {'libraries': {'pandas': '1.2.4',
   'numpy': '1.20.2',
   'scikit-learn': '0.24.2',
   'sagemaker-training': '3.9.2'},
  'build_id': 'churn-clf-training-build-image:acd0f05b-4dba-48d4-85ca-c06c2addfd4e',
  'image_uri': '829825986145.dkr.ecr.us-east-1.amazonaws.com/churn-clf-training:latest'},
 'Inference': {'libraries': {'pandas': '1.2.4',
   'numpy': '1.20.2',
   'scikit-learn': '0.24.2',
   'multi-model-server': '1.1.8',
   'sagemaker-inference': '1.5.11',
   'boto3': '1.21.43',
   'itsdangerous': '2.0.1'},
  'dependencies': [('serving', '/opt/ml/serving')],
  'others': ['RUN pip install -e /opt/ml/serving',
   'LABEL com.amazonaws.sagemaker.capabilities.multi-models=false',
   'LABEL com.amazonaws.sagemaker.capabilities.accept-bind-to-port=true'],
  'entrypoint': ['python', '/opt/ml/serving/custom_inference/serving.py'],
  'cmd': ['serve'],
  'build_id': 'churn-clf-inference-build-image:c096f995-2a79-4ea7-8364-0aa4afa43752',
  'image_uri': '829825986145.dkr.ecr.us-east-1.amazonaws.com/churn-clf-inference:latest'}}

In [14]:
data_prep_script_file = 'code/processing.py'

In [15]:
processor = Processor(
    image_uri=docker_images['Processing']['image_uri'],
    role=sagemaker_role,
    instance_count=1,
    instance_type='ml.m5.4xlarge',
    entrypoint=['python3',f'/opt/ml/processing/input/code/{os.path.basename(data_prep_script_file)}'],
    volume_size_in_gb=5,
    max_runtime_in_seconds=60*60*2)# dos horas 

In [16]:
processing_step = steps.sagemaker.ProcessingStep(
state_id= 'Preparación de Datos',
processor= processor,
job_name=execution_input['ProcessingJobName'],
inputs=[ProcessingInput(input_name='input',
                        source='s3://{}/{}'.format(bucket, datasets_prefix), 
                        destination='/opt/ml/processing/input'),
        ProcessingInput(input_name='code',
                        source=processing_code_path, 
                        destination='/opt/ml/processing/input/code')],
outputs=[ProcessingOutput(output_name='output',
                            source='/opt/ml/processing/output/data',
                            destination='s3://{}/{}'.format(bucket, processed_data_prefix))],
container_arguments=['--test-size', '0.1',
                        '--data-file', 'churn.txt',
                        '--train-data-file', train_data_file,
                        '--train-target-file', train_target_file,
                        '--test-data-file', test_data_file,
                        '--test-target-file', test_target_file,
                        '--encoder-file', encoder_file],
container_entrypoint=['python3','/opt/ml/processing/input/code/processing.py'])

Función registrar modelo
========================

Antes de agregar el entrenamiento del modelo al worflow, crearemos una función en **AWS Lambda** que nos permitirá registrar en **Amazon SageMaker** el modelo que mejor desempeño haya tenido después de realizar la optimización de hiperparámetros.

Para esto primero debemos crear el script con la lógica que esta función ejecutará. En este caso utilizaremos el método [create\_model( )](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.create_model) de `boto3` (SDK de AWS para Python).

In [17]:
!rm -rf lambda/register_model/; mkdir lambda/register_model/

In [18]:
%%writefile lambda/register_model/app.py
import boto3

sm_client = boto3.client('sagemaker')

def get_parameter(key, event):
    parameter_value = None
    if (key in event):
        parameter_value = event[key]

    else:
        raise KeyError('{} key not found in function input!'.format(key)+
                      ' The input received was: {}.'.format(event))
        
    return parameter_value
    

#Retrieve training job name from event and create a model.
def lambda_handler(event, context):
    response = None
    
    print(event)
    
    job_name = get_parameter('TrainingJobName', event)
    model_name = get_parameter('ModelName', event)
    code_path = get_parameter('CodePath', event)
        
    try:
        
        response = sm_client.describe_training_job(TrainingJobName=job_name)
        model_data = response['ModelArtifacts']['S3ModelArtifacts']
        container = response['AlgorithmSpecification']['TrainingImage']
        role_arn = response['RoleArn']
        
        print("Training job: {} has artifacts at: {}.".format(job_name, model_data))
        
        try:
            response = sm_client.create_model(ModelName=model_name,
                                   PrimaryContainer={
                                       'Image':container,
                                       'Mode':'SingleModel',
                                       'ModelDataUrl':model_data,
                                       'Environment':{
                                           'SAGEMAKER_PROGRAM': 'train_and_deploy.py',
                                           'SAGEMAKER_SUBMIT_DIRECTORY': code_path
                                       }
                                   },
                                   ExecutionRoleArn=role_arn
            )

            print(response)

        except Exception as e:
            response = ('Failed to create model')
            print(e)
            print('{} Attempted to create a model for job name: {}.'.format(response, job_name))

    except Exception as e:
        response = ('Failed to read training job artifacts!'+ 
                    ' The training job may not exist or the job name may be incorrect.'+ 
                    ' Check SageMaker to confirm the job name.')
        print(e)
        print('{} Attempted to read job name: {}.'.format(response, job_name))
            
    return {
        'statusCode': 200,
        'ModelArn': response['ModelArn'],
        'TrainingJobName': job_name
    }


Writing lambda/register_model/app.py


# Creación de contenedor para función Lambda

Para crear los contenedores Docker utilizaremos el servicio AWS Code Build y debido a que las imágenes bases serán descargadas del DockerHub repository podríamos llegar a obtener un error indicando que se han realizado demasiadas solicitudes, para conocer más detalles de esta limitante visitar este enlace.

Para evitar ese error necesitamos autenticarnos y para esto debemos obtener una cuenta en DockerHub y sustituir usuario y constraseña por los valores correspondientes.

In [19]:
secret_name = 'dockerhub'
#sagemaker_utils.create_secret(secret_name,'user','pwd')

Necesitaremos un rol de ejecución para ser utilizado en el proyecto de AWS Code Build. Si estamos ejecutando el Notebook con permisos suficientes para crear un rol de IAM, podemos crear el rol simplemente ejecutando el siguiente método, de lo contrario tendría que ser creado de forma manual.

In [20]:
policy_document={
        "Version": "2012-10-17",
        "Statement": [               
            {
                "Effect": "Allow",
                "Action": [
                    "ecr:BatchCheckLayerAvailability",
                    "ecr:CompleteLayerUpload",
                    "ecr:GetAuthorizationToken",
                    "ecr:InitiateLayerUpload",
                    "ecr:PutImage",
                    "ecr:UploadLayerPart",
                    "ecr:BatchGetImage",
                    "ecr:GetDownloadUrlForLayer",
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream",
                    "logs:PutLogEvents",
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:GetObjectVersion",
                    "secretsmanager:GetSecretValue"
                ],
                "Resource": "*"
            }
        ]
    }

#codebuild_role = sagemaker_utils.create_codebuild_execution_role('CodeBuildExecutionRole', policy_document)
codebuild_role = "arn:aws:iam::829825986145:role/CodeBuildExecutionRole"



In [21]:
codebuild_role

'arn:aws:iam::829825986145:role/CodeBuildExecutionRole'

In [23]:
docker_images['Lambda']={'libraries':{'boto3':'1.17.77'},
                         'dependencies':[('lambda/register_model/','${LAMBDA_TASK_ROOT}')],                                                  
                         'cmd':['app.lambda_handler'],
                         'lambda': True,}

lambda_docker_parameters = {'image_name': f'{prefix}-lambda',
                            'base_image': "public.ecr.aws/lambda/python:3.8",
                            's3_path': f's3://{bucket}/{images_directory}',
                            'role': codebuild_role,  
                            #'secret': secret_name
                            }

lambda_docker_parameters.update(docker_images['Lambda'])
docker_images['Lambda']['image_uri'] = sagemaker_utils.create_docker_image(**lambda_docker_parameters)  


INFO: Repository churn-clf-lambda already exists
lambda/register_model/
[Kchurn-clf-lambda................................SUCCEEDED
[K[34m✅[0m Building docker image


Creación de función
===================

Obtener nombres de columnas del dataset para poder ordenarlas en la función


In [24]:
policy_document = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "sagemaker:*",
                    "s3:*",
                    "states:StartExecution",
                    "iam:PassRole",
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream",
                    "logs:PutLogEvents"
                ],
                "Resource": "*"
            }
        ]
    }

lambda_role = sagemaker_utils.create_lambda_execution_role(f'{prefix}-LambdaExecutionRole', policy_document)


INFO: Role already exists, updating it...
INFO: Role updated: churn-clf-LambdaExecutionRole


In [None]:
endpoint_function_name = f'{prefix}-register-model'
lambda_function_params = {
    'FunctionName': endpoint_function_name,
    'Role': lambda_role,
    'Code': {'ImageUri':docker_images['Lambda']['image_uri']},
    'Description': 'Queries a SageMaker training job and creates a model.',
    'Timeout': 15, # 5 mins
    'MemorySize': 128, # MB
    'PackageType': 'Image'#,
    #'Environment': {
    #    'Variables': {
    #        'ENDPOINT_NAME': 'Colocar aqui el nombre del endpoint',
    #        'ENCODER_S3_PATH': f"{sagemaker_utils.get_processor_output_path(processor,'encoder')}/{encoder_file}",
    #        'THRESHOLDS_S3_PATH': f"{sagemaker_utils.get_processor_output_path(evaluation_processor,'eval')}/{thresholds_file}"
    #    }
    #}
}



lambda_response = sagemaker_utils.create_lambda_function(**lambda_function_params)
