# Modelo de Deserción

Este pipeline de nuestro proyecto se ve así:

![pipeline_completo](figuras/pipeline_completo.png)

## Dataset
Los datos provienen de  ...

In [1]:
# Parameters
kms_key = "arn:aws:kms:us-west-2:000000000000:1234abcd-12ab-34cd-56ef-1234567890ab"

In [2]:
import sys

!{sys.executable} -m pip install "sagemaker>=2.99.0"

import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession

sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = "itam-analytics-israel"
default_bucket_prefix = "modelo_desercion"
default_bucket_prefix_path = ""

# If a default bucket prefix is specified, append it to the s3 path
if default_bucket_prefix:
    default_bucket_prefix_path = f"/{default_bucket_prefix}"

model_package_group_name = f"DesertionModelPackageGroupName"

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [3]:
input_data_uri = f"s3://{default_bucket}{default_bucket_prefix_path}/data/desercion.csv"
batch_data_uri = f"s3://{default_bucket}{default_bucket_prefix_path}/data/desercion_batch.csv"
print(input_data_uri)
print(batch_data_uri)

s3://itam-analytics-israel/modelo_desercion/data/desercion.csv
s3://itam-analytics-israel/modelo_desercion/data/desercion_batch.csv


## Definición de parametros
![pipeline_parameters](figuras/pipeline_parameters.png)
Los parámetros definidos en este flujo de trabajo son:
* `processing_instance_count`: número de instancias que se usarán en el job de procesamiento.
* `instance_type`: tipo de instancia ml.* que se empleará en el job de entrenamiento.
* `model_approval_status`: estado con el que se registrará el modelo en el registro de modelos para fines de CI/CD (por defecto "PendingManualApproval").
* `input_data`: URI S3 donde se encuentran los datos de entrada para entrenamiento y validación.
* `batch_data`: URI S3 donde se encuentran los datos para el Batch Transform (inferencias por lotes).
* `mse_threshold`: Umbral de error cuadrático medio (MSE) que se usará para validar la calidad del modelo.

In [4]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value = 1)
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="PendingManualApproval")
input_data = ParameterString(name="InputData", default_value=input_data_uri, )
batch_data = ParameterString(name="BatchData", default_value=batch_data_uri, )
mse_threshold = ParameterFloat(name="MseThreshold", default_value = 6.0)

## Definición de Processing Step
![pipeline_processing](figuras/pipeline_processing.png)

In [5]:
from sagemaker.sklearn.processing import SKLearnProcessor

framework_version = "0.23-1"
sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-desertion-process",
    role=role,
    sagemaker_session=pipeline_session,
)

In [6]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
        ProcessingOutput(output_name="batch", source="/opt/ml/processing/batch"),
    ],
    code="code/preprocessing.py",
)

step_process = ProcessingStep(name="DesertionProcess", step_args=processor_args)



## Definición de Training Step
![pipeline_training](figuras/pipeline_training.png)

In [7]:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}{default_bucket_prefix_path}/DesertionTrain"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

In [8]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

step_train = TrainingStep(
    name="DesertionTrain",
    step_args=train_args,
)

## Definición de Training Step
![pipeline_evaluation](figuras/pipeline_evaluation.png)

In [9]:
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="script-desertion-eval",
    role=role,
    sagemaker_session=pipeline_session,
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="code/evaluation.py",
)

In [10]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="DesertionEval",
    step_args=eval_args,
    property_files=[evaluation_report],
)

## Definición de Create Model Step
![pipeline_create](figuras/pipeline_create.png)

In [11]:
from sagemaker.model import Model

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)

In [12]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep

step_create_model = ModelStep(
    name="DesertionCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

## Definición de Transfrom Step
![pipeline_transform](figuras/pipeline_transform.png)

In [13]:
from sagemaker.transformer import Transformer

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    strategy="SingleRecord",
    output_path=f"s3://{default_bucket}{default_bucket_prefix_path}/DesertionTransform",
)


In [14]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

step_transform = TransformStep(
    name="DesertionTransform", transformer=transformer,
    inputs=TransformInput(
        data=step_process.properties.ProcessingOutputConfig.Outputs["batch"].S3Output.S3Uri, 
        content_type="text/csv", split_type="Line")
)

## Definición de Register Step
![pipeline_register](figuras/pipeline_register.png)

In [15]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="DesertionRegisterModel", step_args=register_args)



## Definición de Fail Step
![pipeline_fail](figuras/pipeline_fail.png)

In [16]:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

step_fail = FailStep(
    name="DesertionMSEFail",
    error_message=Join(on=" ", values=["Execution failed due to MSE >", mse_threshold]),
)

## Definición de Condition Step
![pipeline_condition](figuras/pipeline_condition.png)

In [17]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value",
    ),
    right=mse_threshold,
)

step_cond = ConditionStep(
    name="DesertionMSECond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

## Definición del Pipeline
![pipeline](figuras/pipeline.png)

In [18]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"DesertionPipeline2025"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        input_data,
        batch_data,
        mse_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

## Visualización y ejecución del pipeline

In [19]:
import json

definition = json.loads(pipeline.definition())
definition



{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://itam-analytics-israel/modelo_desercion/data/desercion.csv'},
  {'Name': 'BatchData',
   'Type': 'String',
   'DefaultValue': 's3://itam-analytics-israel/modelo_desercion/data/desercion_batch.csv'},
  {'Name': 'MseThreshold', 'Type': 'Float', 'DefaultValue': 6.0}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'DesertionProcess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': {'Get': 'Para

In [20]:
pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:354918387942:pipeline/DesertionPipeline2025',
 'ResponseMetadata': {'RequestId': 'ab7ba02d-a99b-418c-b388-7a399119e910',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'ab7ba02d-a99b-418c-b388-7a399119e910',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '89',
   'date': 'Tue, 27 May 2025 04:23:51 GMT'},
  'RetryAttempts': 0}}

In [21]:
execution = pipeline.start()

## Examinando la evaluación

from pprint import pprint

evaluation_json = sagemaker.s3.S3Downloader.read_file(
    "{}/evaluation.json".format(
        step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
    )
)
pprint(json.loads(evaluation_json))

## Haciendo uso del modelo

from sagemaker.model import Model

# 1) Recupera el artefacto del modelo (puede ser de Model Registry o tu bucket)
model_data = "s3://<tu-bucket>/<tu-prefijo>/AbaloneTrain/model.tar.gz"
image_uri   = sagemaker.image_uris.retrieve("xgboost", region)
model       = Model(image_uri=image_uri, model_data=model_data, role=role)

# 2) Crea un transformer
transformer = model.transformer(
    instance_count=1,
    instance_type="ml.m5.xlarge",
    output_path=f"s3://{default_bucket}{default_bucket_prefix_path}/abalone/inference"
)

# 3) Ejecuta sobre tus datos nuevos
transformer.transform(
    data="s3://mi-bucket-personalizado/nuevos-datos/abalone_nuevo.csv",
    content_type="text/csv",
)
transformer.wait()

print("Predicciones disponibles en:", transformer.output_path)
