# I SageMaker Pipelines

In [1]:
# !pip install sagemaker -q --upgrade

## 01. Librerías

In [2]:
import os
import json
import time
import boto3
import numpy as np
import pandas as pd
# entorno
import sagemaker
from sagemaker import get_execution_role
from sagemaker.workflow.pipeline_context import PipelineSession
# preprocesamiento
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
# entrenamiento
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
# evaluacion
from sagemaker.workflow.properties import PropertyFile
from sagemaker.sklearn.processing import ScriptProcessor
# registro
from sagemaker.model import Model
from sagemaker.sklearn.model import SKLearnModel
from sagemaker import PipelineModel
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.model_step import ModelStep
# condicion de registro
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo, ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
# pipeline
from sagemaker.workflow.pipeline import Pipeline

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [3]:
sagemaker.__version__

'2.229.0'

## 02. Definiciones de entorno

In [4]:
sess = boto3.Session()
# sm = sess.client('sagemaker')
role = get_execution_role()
bucket_input  = "sagemaker-us-west-2-916135041985"
bucket_output = "sagemaker-us-west-2-916135041985"

region_name = boto3.Session().region_name

pipeline_session = PipelineSession(default_bucket=bucket_output, default_bucket_prefix = "output")

model_package_group_name = "CreditScoreModel"
prefix = "credit-score-classification"
pipeline_name = "ClassificationScorePipelineModel"

# kms_key_value = "kms_key_value"

instance_type = "ml.m5.2xlarge"
instance_count = 1

In [5]:
raw_s3 = f's3://{bucket_input}/{prefix}/data/raw'

In [6]:
model_path = f"s3://{bucket_output}/{prefix}/model/"

### 02.01 Crear contenedor de modelos

In [7]:
# from botocore.exceptions import ClientError

# sm_client = boto3.client("sagemaker")

# # model_package_group_name = "PipelineModelPackageGroup"

# model_package_group_input_dict = {
#     "ModelPackageGroupName": model_package_group_name,
#     "ModelPackageGroupDescription": "Model package group for Proactiva models",
# }

# try:
#     create_model_pacakge_group_response = sm_client.create_model_package_group(
#         **model_package_group_input_dict
#     )
# except ClientError as e:
#         print(e.response["Error"]["Message"])
        
# model_package_group_arn = sm_client.describe_model_package_group(
#         ModelPackageGroupName = model_package_group_name
#     )['ModelPackageGroupArn']

## 03. Preprocesamiento

In [8]:
sklearn_processor = SKLearnProcessor(
    framework_version="1.2-1",
    instance_type="ml.m5.2xlarge",
    instance_count=1,
    base_job_name="pre_procesamiento",
    role=role,
    # output_kms_key=kms_key_value,
    sagemaker_session=pipeline_session,
    )

processor_args = sklearn_processor.run(
# kms_key = kms_key_value,
inputs=[
    ProcessingInput(source=raw_s3, destination="/opt/ml/processing/input"),
    ],
outputs=[
    ProcessingOutput(output_name="scaler_model", source="/opt/ml/processing/scaler_model"),
    ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
    ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
    ],
code="code/preprocessing.py",
arguments=["--bucket-input", bucket_input,
           "--bucket-prefix", prefix]
)

step_preprocess = ProcessingStep(name="etapa_pre_procesamiento", step_args=processor_args)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


## 04. Entrenamiento

In [9]:
sklearn_estimator = SKLearn(
    entry_point="code/training.py",
    role=role,
    # output_kms_key=kms_key_value,
    sagemaker_session=pipeline_session,
    instance_count=1,
    instance_type="ml.m5.2xlarge",
    framework_version="0.23-1",
    py_version='py3',
    base_job_name="entrenamiento_modelo",
    output_path=model_path,
    hyperparameters={'n_jobs':-1,
                     'criterion': 'gini',
                     'max_depth': 45,
                     'max_features': 'log2',
                     'n_estimators': 270,
                     'random_state': 42},)

train_args = sklearn_estimator.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
            ),
        "test": TrainingInput(
            s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            content_type="text/csv",
            ),
        }
    )

step_train_model = TrainingStep(name="etapa_entrenamiento_modelo", step_args=train_args)

## 05. Evaluación

In [10]:
image_uri = sagemaker.image_uris.retrieve(
    framework='sklearn',
    region=region_name,
    version='0.23-1',
    py_version="py3",
    instance_type="ml.m5.2xlarge",)

evaluate_model_processor = ScriptProcessor(
    role=role,
    image_uri=image_uri,
    command=["python3"],
    instance_count=1,
    instance_type="ml.m5.2xlarge",
    sagemaker_session=pipeline_session,
    # output_kms_key=kms_key_value,
    )

evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
    )
# comentar para modelo base
production_evaluation_report = PropertyFile(
    name="ProductionEvaluationReport", output_name="production", path="evaluation.json"
    )

eval_args = evaluate_model_processor.run(
    inputs=[
        ProcessingInput(
            source=step_train_model.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
            ),
        ProcessingInput(
            source=step_preprocess.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
            destination="/opt/ml/processing/validation",
            ),
        ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation")
        ,ProcessingOutput(output_name="production", source="/opt/ml/processing/production") # comentar para modelo base
    ],
    code="code/evaluation.py",
    arguments=["--bucket-output", bucket_output,
               "--region-name", region_name]
    )

step_evaluate_model = ProcessingStep(
    name="evaluacion_rendimiento_modelo",
    step_args=eval_args,
    property_files=[evaluation_report
                    ,production_evaluation_report # comentar para modelo base
                    ],
    )

## 06. Registro

In [11]:
# etapa de registro de modelo, permite controla el versionamiento, y por ende la inferencia que se haga con él.
model_approval_status = "Approved"

scaler_model_s3 = "{}/model.tar.gz".format(
    step_preprocess.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
    )

scaler_model = SKLearnModel(
    model_data=scaler_model_s3,
    role=role,
    sagemaker_session=pipeline_session,
    entry_point="code/preprocessing.py",
    framework_version="1.2-1",
    # model_kms_key=kms_key_value
    )

RF_model = Model(
    image_uri=image_uri,
    model_data=step_train_model.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    # model_kms_key=kms_key_value,
    role=role
    )

scaler_model = PipelineModel(
    models=[scaler_model, RF_model], role=role, sagemaker_session=pipeline_session
    )

evaluation_s3_uri = "{}/evaluation.json".format(
    step_evaluate_model.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=evaluation_s3_uri,
        content_type="application/json",
    )
)

register_args = scaler_model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.m5.large", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    model_metrics=model_metrics,
    approval_status= model_approval_status,
    )

step_register_scaler_model = ModelStep(
    name="registro_modelo_CSC",
    step_args=register_args,
    )



## 07. Condición de registro

In [12]:
# valor manual para modelo base
accuracy_mse_threshold=0.60

# se crea condición de acurracy para asegurar que el modelo reentrenado sea mejor que el anterior
# de lo contrario este no pasara a la etapa de registro
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name= step_evaluate_model.name,
        property_file=evaluation_report,
        json_path="multiclass_classification_metrics.accuracy.value",
        ),
    # se debe comentar para generar modelo base
    right=JsonGet(
        step_name= step_evaluate_model.name,
        property_file=production_evaluation_report,
        json_path="multiclass_classification_metrics.accuracy.value",
        ),
    # right=accuracy_mse_threshold, # descomentar para generar modelo base
    )

# paso condicional para registro de modelo True/False
step_cond = ConditionStep(
    name="condicion_de_precision_mayor_a_modelo_productivo",
    conditions=[cond_lte],
    if_steps=[step_register_scaler_model],
    else_steps=[],
    )

## 08. Constructor

In [13]:
pipeline = Pipeline(
    name=pipeline_name,
    sagemaker_session=pipeline_session,
    parameters=[
        raw_s3,
        model_path,
        model_approval_status,
        # training_epochs,
        # accuracy_mse_threshold,
        # kms_key_value
                ],
    steps=[step_preprocess
           ,step_train_model
           ,step_evaluate_model
           ,step_cond
          ],
    )

In [14]:
# impresion de pipeline
definition = json.loads(pipeline.definition())
definition



{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'etapa_pre_procesamiento',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.2xlarge',
      'InstanceCount': 1,
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-scikit-learn:1.2-1-cpu-py3',
     'ContainerArguments': ['--bucket-input',
      'sagemaker-us-west-2-916135041985',
      '--bucket-prefix',
      'credit-score-classification'],
     'ContainerEntrypoint': ['python3',
      '/opt/ml/processing/input/code/preprocessing.py']},
    'RoleArn': 'arn:aws:iam::916135041985:role/sagemaker-immersion-day-SageMakerExecutionRole-C9TM1XltnG23',
    'ProcessingInputs': [{'InputName': 'input-1',
      'AppManaged': False,
      'S3Input': {'S

## 09.  Presentar y ejecutar pipeline


In [15]:
execution = pipeline.upsert(role_arn= role)



In [17]:
execution = pipeline.start()

In [None]:
# execution.wait(delay=300, max_attempts=25)

In [18]:
execution.list_steps()

[{'StepName': 'registro_modelo_CSC-RegisterModel',
  'StartTime': datetime.datetime(2024, 8, 26, 14, 33, 40, 502000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 8, 26, 14, 33, 41, 596000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-west-2:916135041985:model-package/CreditScoreModel/4'}},
  'AttemptCount': 1},
 {'StepName': 'condicion_de_precision_mayor_a_modelo_productivo',
  'StartTime': datetime.datetime(2024, 8, 26, 14, 33, 39, 531000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 8, 26, 14, 33, 40, 128000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'Condition': {'Outcome': 'True'}},
  'AttemptCount': 1},
 {'StepName': 'evaluacion_rendimiento_modelo',
  'StartTime': datetime.datetime(2024, 8, 26, 14, 31, 5, 733000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 8, 26, 14, 33, 38, 512000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'ProcessingJob': {

# II Deployment

In [19]:
import sys
sys.path.append('code/')
# model package
from evaluation import get_approved_package
from sagemaker import ModelPackage
# prediccion
from sagemaker.predictor import Predictor

## 10. Generar endpoint de ultimo modelo registrado y aprobado

In [20]:
# llamado ultimo modelo registrado
sm_client = boto3.client("sagemaker")

pck = get_approved_package(model_package_group_name, sm_client)
model_description = sm_client.describe_model_package(ModelPackageName=pck["ModelPackageArn"])

model_description

INFO:evaluation:Identified the latest approved model package: arn:aws:sagemaker:us-west-2:916135041985:model-package/CreditScoreModel/4


{'ModelPackageGroupName': 'CreditScoreModel',
 'ModelPackageVersion': 4,
 'ModelPackageArn': 'arn:aws:sagemaker:us-west-2:916135041985:model-package/CreditScoreModel/4',
 'CreationTime': datetime.datetime(2024, 8, 26, 14, 33, 41, 484000, tzinfo=tzlocal()),
 'InferenceSpecification': {'Containers': [{'Image': '246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-scikit-learn:1.2-1-cpu-py3',
    'ImageDigest': 'sha256:95fdd7f6c22784f7ccb3064fd5f1c4d8b5e05ec1292a51da72c181cc9db4b228',
    'ModelDataUrl': 's3://sagemaker-us-west-2-916135041985/output/pre_procesamiento-2024-08-26-14-19-36-669/output/scaler_model/model.tar.gz',
    'Environment': {'SAGEMAKER_CONTAINER_LOG_LEVEL': '20',
     'SAGEMAKER_PROGRAM': 'preprocessing.py',
     'SAGEMAKER_REGION': 'us-west-2',
     'SAGEMAKER_SUBMIT_DIRECTORY': 's3://sagemaker-us-west-2-916135041985/output/sagemaker-scikit-learn-2024-08-26-14-19-37-049/sourcedir.tar.gz'}},
   {'Image': '246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-scikit-

In [21]:
# creacion de endpoint
sess = boto3.Session()
sm = sess.client("sagemaker")
sagemaker_session = sagemaker.Session(boto_session=sess)

model_package_arn = model_description["ModelPackageArn"]
model = ModelPackage(
    role=role, model_package_arn=model_package_arn, sagemaker_session=sagemaker_session
)

endpoint_name = "Modelo-Score-Classification-" + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
print("Endpoint Name: {}".format(endpoint_name))
model.deploy(initial_instance_count=1, instance_type="ml.m5.xlarge", endpoint_name=endpoint_name)

INFO:sagemaker:Creating model with name: CreditScoreModel-2024-08-26-15-14-21-675


Endpoint Name: Modelo-Score-Classification-2024-08-26-15-14-21


INFO:sagemaker:Creating endpoint-config with name Modelo-Score-Classification-2024-08-26-15-14-21
INFO:sagemaker:Creating endpoint with name Modelo-Score-Classification-2024-08-26-15-14-21


------------------------------------------------*

In [None]:
# prediccion
predictor = Predictor(endpoint_name=endpoint_name)

In [None]:
path_output = os.path.join(os.path.dirname(os.getcwd()), 'data/output/')

In [None]:
data_test = pd.read_csv(path_output + 'test.csv')
# data_test
muestra = 10
payload = data_test.sample(muestra).to_csv(header=False, index=False)
y_pred = predictor.predict(payload, initial_args={"ContentType": "text/csv"})
print(y_pred.decode("utf-8"))

## 11. Eliminación de endpoint
**Nota**: esto se hace para evitar cargos

In [None]:
# import boto3
# from botocore.exceptions import ClientError

# region="ap-southeast-2"
# account="numero"
# domain_ID="id-dominio"
# sm_client = boto3.client("sagemaker", region_name=region)
# domainArn = f"arn:aws:sagemaker:{region}:{account}:domain/{domain_ID}"
# domain_tags = sm_client.list_tags(ResourceArn=domainArn)["Tags"]
# domain_arn_tag = {'Key': 'sagemaker:domain-arn', 'Value': f'{domainArn}'}
# domain_tags.append(domain_arn_tag)

In [None]:
# sm_client = boto3.client("sagemaker")

# for d in sm_client.list_model_packages(ModelPackageGroupName=model_package_group_name)["ModelPackageSummaryList"]:
#     print(d["ModelPackageArn"])
#     sm_client.delete_model_package(ModelPackageName=d["ModelPackageArn"])

# sm_client.delete_model_package_group(ModelPackageGroupName=model_package_group_name)

In [None]:
# elimina el endpoint
# predictor.delete_endpoint()

In [None]:
# elimina el pipeline
# pipeline.delete()