## Importación de librerías

In [5]:
!pip install -U sagemaker -q
!pip install fsspec -q
!pip install s3fs -q
!pip install awswrangler -q

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
mlflow 2.20.4 requires pyarrow<20,>=4.0.0, but you have pyarrow 20.0.0 which is incompatible.[0m[31m
[0m

In [1]:
import sys

import boto3
import pandas as pd
import numpy as np
import json

import awswrangler as wr

import sagemaker
from sagemaker import get_execution_role, Session

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.parameters import (ParameterInteger,ParameterString,ParameterFloat)
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.quality_check_step import (DataQualityCheckConfig,QualityCheckStep,)
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.functions import Join
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile

from sagemaker.model import Model
from sagemaker.model_metrics import ModelMetrics, MetricsSource
from sagemaker.model_monitor import DatasetFormat, model_monitoring

from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.processing import ScriptProcessor

from sagemaker.inputs import TrainingInput
from sagemaker.inputs import CreateModelInput

from sagemaker.estimator import Estimator

from sagemaker.sklearn.processing import SKLearnProcessor

from sagemaker.drift_check_baselines import DriftCheckBaselines

from sagemaker.image_uris import retrieve

#from sagemaker.network import NetworkConfig





sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


# Parametros y variables pipeline

In [65]:
proyecto="demo"
pipeline_name = f"{proyecto}-train-pipeline"
model_package_group_name = f"model-{proyecto}"

artifact_bucket = "bucket-s3"
model = f"proyecto/caso-uso/{proyecto}"

train = "training"
base_uri = f"s3://{artifact_bucket}/{model}"
input_data_train = f"{base_uri}/{train}"
model_path = f"{base_uri}/{train}"

monitoring = "monitoring"
folder_json = "folder-json"

sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()

script_s3_uri_preprocessing = sagemaker_session.upload_data("code/train/preprocessing.py", bucket=artifact_bucket, key_prefix=f"{model}/code/train")
script_s3_uri_evaluation = sagemaker_session.upload_data("code/train/evaluation.py", bucket=artifact_bucket, key_prefix=f"{model}/code/train")

# Generar dataset dummy 

In [66]:
# Semilla para reproducibilidad
np.random.seed(42)

# Paso 1: Generar las 10 primeras filas
def generar_datos_base():
    df = pd.DataFrame({
        'target': np.random.choice([0, 1], 10),
        'feature1': np.random.randint(0, 100, 10),
        'feature2': np.random.rand(10),
        'feature3': np.random.randint(0, 100, 10),

    })
    return df

# Paso 2: Concatenar los datos base n veces
def expandir_dataset(n):
    base = generar_datos_base()
    df_expandido = pd.concat([base] * n, ignore_index=True)
    return df_expandido

# Ejemplo de uso
df_final = expandir_dataset(n=500)
print(df_final.head())
df_final.info()

wr.s3.to_csv(df=df_final,path=f"{input_data_train}/data-original.csv",index=False)


   target  feature1  feature2  feature3
0       0        82  0.969910        41
1       1        86  0.832443        91
2       0        74  0.212339        59
3       0        74  0.181825        79
4       0        87  0.183405        14
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5000 entries, 0 to 4999
Data columns (total 4 columns):
 #   Column    Non-Null Count  Dtype  
---  ------    --------------  -----  
 0   target    5000 non-null   int64  
 1   feature1  5000 non-null   int64  
 2   feature2  5000 non-null   float64
 3   feature3  5000 non-null   int64  
dtypes: float64(1), int64(3)
memory usage: 156.4 KB


{'paths': ['s3://anta-acoe-san-084828584964-analytics-artifact/proyecto/caso-uso/demo/training/data-original.csv'],
 'partitions_values': {}}

## Definir Parámetros para Parametrizar la Ejecución del Pipeline

Defina los parámetros del pipeline que puede utilizar para parametrizar el pipeline. Los parámetros permiten realizar ejecuciones y programaciones personalizadas sin tener que modificar la definición del pipeline.

Tipos de parametros soportados:

* `ParameterString` - representa un tipo `str` de Python
* `ParameterInteger` - representa un tipo `int` de Python
* `ParameterFloat` - representa un tipo `float` de Python.

Parámetros definidos en este flujo de trabajo:

* `processing_instance_count` - Numero de instancias del trabajo de procesamiento.
* `instance_type` - El tipo de instancia `ml.*` del trabajo de entrenamiento.
* `model_approval_status` - El estado de aprobación a registrar con el modelo entrenado a efectos de CI/CD ("PendingManualApproval" es el predeterminado).
* `input_data` - La ubicación URI del bucket S3 de los datos de entrada.
* `mse_threshold` - El umbral de error cuadrático medio (MSE) utilizado para verificar la precisión de un modelo.

In [67]:
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="PendingManualApproval")
input_data = ParameterString(name="InputData", default_value=input_data_train)                     
mse_threshold = ParameterFloat(name="MseThreshold", default_value=9000000000000000000000.0)

## Paso preprocesamiento

In [68]:
framework_version = "1.0-1"
base_job_name_processing="processing-workload"

In [69]:
sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name=base_job_name_processing,
    role=role,
    sagemaker_session=pipeline_session,
    #network_config=network_config,
)

Por último, tomamos la salida del método `run` del procesador y la pasamos como argumentos al `ProcessingStep`. Al pasar la `pipeline_session` a la `sagemaker_session`, la llamada a `.run()` no lanza el trabajo de procesamiento, sino que devuelve los argumentos necesarios para ejecutar el trabajo como un paso en el pipeline.

Tenga en cuenta los canales con nombre "train_data" y "test_data" especificados en la configuración de salida para el trabajo de procesamiento. Las `Propiedades` de los pasos se pueden utilizar en pasos posteriores y se resuelven a sus valores de tiempo de ejecución en la ejecución. En concreto, este uso se indica cuando se define el paso de entrenamiento.

In [70]:
processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/input/train", destination=f"{base_uri}/processing/output/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/input/validation", destination=f"{base_uri}/processing/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/input/test", destination=f"{base_uri}/processing/output/test")
    ],
    code=script_s3_uri_preprocessing,
)

step_process = ProcessingStep(name="PreprocessStep", step_args=processor_args)



## Definición Paso de Entrenamiento

Se utiliza el algoritmo de XGBoost de Amazon SageMaker para entrenar en este conjunto de datos. Un script de entrenamiento típico carga los datos de los canales de entrada, configura el entrenamiento con hiperparámetros, entrena un modelo, y guarda un modelo en `model_dir` para que pueda ser alojado más tarde.

También se especifica la ruta del modelo donde se guardan los modelos del entrenamiento.

In [71]:
image_uri = retrieve("xgboost", region, version="1.2-1")

xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=instance_type,
    instance_count=1,
    disable_profiler=True,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

Por último, utilizamos la salida del método `.fit()` del estimador como argumentos para el `TrainingStep`. Al pasar la `pipeline_session` a la `sagemaker_session`, la llamada a `.fit()` no lanza el trabajo de entrenamiento, sino que devuelve los argumentos necesarios para ejecutar el trabajo como un paso en el pipeline.

Pasa el `S3Uri` del canal de salida `"train_data"` al método `.fit()`. Además, utilice el otro canal de salida "test_data" para la evaluación del modelo en el proceso. 

In [72]:
step_train = TrainingStep(name="TrainStep", step_args=train_args,
)

## Definir un paso de evaluación del modelo

Se desarrolla un script de evaluación que se especifica en un paso de Procesamiento que realiza la evaluación del modelo.

El codigo se encuentra en el archivo `evaluation.json` en la carpeta "code"

El script de evaluación utiliza `xgboost` para hacer lo siguiente:

* Cargar el modelo.
* Leer los datos de prueba.
* Emitir predicciones contra los datos de prueba.
* Construir un informe de clasificación, incluyendo la precisión y la curva ROC.
* Guardar el informe de evaluación en el directorio de evaluación.

A continuación, cree una instancia de un procesador `ScriptProcessor` y utilícelo en el `ProcessingStep`.

In [73]:
base_job_name_eval="eval-workload"

script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name=base_job_name_eval,
    role=role,
    sagemaker_session=pipeline_session,
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation", destination=f"{base_uri}/evaluation"), 
    ],
    code=script_s3_uri_evaluation,
)

Utiliza los argumentos del procesador devueltos por `.run()` para construir un `ProcessingStep`, junto con los canales de entrada y salida y el código que se ejecutará cuando el pipeline invoque la ejecución del pipeline.

In [74]:
evaluation_report = PropertyFile(name="EvaluationReport", output_name="evaluation", path="evaluation.json")
step_eval = ProcessingStep(
    name="EvalStep",
    step_args=eval_args,
    property_files=[evaluation_report],
)

## Data Drift

In [75]:
check_job_config = CheckJobConfig(
    role=role,
    instance_count=1,
    instance_type="ml.c5.xlarge",
    volume_size_in_gb=120,
    sagemaker_session=sagemaker_session,
)
#configurar data entrenamiento para monitorear

data_quality_check_config = DataQualityCheckConfig(
    baseline_dataset=step_process.properties.ProcessingOutputConfig.Outputs['train'].S3Output.S3Uri,
    dataset_format=DatasetFormat.csv(header=False, output_columns_position="START"),
    output_s3_uri=Join(on='/', values=[base_uri, monitoring, folder_json, 'dataqualitycheckstep'])
)

In [76]:
# Pipeline Step - 'QualityCheckStep'
baseline_model_data_step = QualityCheckStep(
        name="DataQualityCheckStep",
        # skip_check, indicates a baselining job
        skip_check=True,
        register_new_baseline=True,
        quality_check_config=data_quality_check_config,
        check_job_config=check_job_config,
        model_package_group_name=model_package_group_name
    )

## Definir un Paso para Crear un Modelo

Para realizar la transformación por lotes utilizando el modelo de ejemplo, cree un modelo SageMaker.

In [77]:
model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
    #vpc_config = VpcConfig,
)

Define el `ModelStep` proporcionando los valores de retorno de `model.create()` como argumentos del paso.

In [78]:
step_create_model = ModelStep(
    name="CreateModelStep",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

## Definir un Paso de Registro de Modelo

Un paquete de modelo es una abstracción de artefactos de modelo reutilizables que empaqueta todos los ingredientes requeridos para la inferencia. Principalmente, consiste en una especificación de inferencia que define la imagen de inferencia a utilizar junto con una localización opcional de pesos del modelo.

Un grupo de paquetes de modelos es una colección de paquetes de modelos. Se puede crear un grupo de paquetes de modelos para un problema de negocio de ML específico, y se le pueden añadir nuevas versiones de los paquetes de modelos. Normalmente, se espera que los clientes creen un ModelPackageGroup para un pipeline de SageMaker, de forma que las versiones de los paquetes de modelos se puedan añadir al grupo en cada ejecución de SageMaker Pipeline.

Para registrar un modelo en el Registro de Modelos, tomamos el modelo creado en los pasos anteriores
```
model = Modelo(
    image_uri=image_uri,
    model_data=paso_entrenamiento.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sesión_de_pipeline,
    role=role,
)
```
y llamamos a la función `.register()` mientras le pasamos todos los parámetros necesarios para registrar el modelo.

Tomamos los resultados de la llamada `.register()` y los pasamos al `ModelStep` como argumentos del paso.

In [79]:
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

In [80]:
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="RegisterModel", step_args=register_args)

## Definir un Paso de Fallo para Terminar la Ejecución del pipeline y Marcarla como Fallida

Esta sección le guiará a través de los siguientes pasos:

* Definir un `FailStep` con un mensaje de error personalizado, que indique la causa del fallo de ejecución.
* Introducir el mensaje de error `FailStep` con una función `Join`, que añade una cadena de texto estática con el parámetro dinámico `mse_threshold` para construir un mensaje de error más informativo.

In [81]:
step_fail = FailStep(name="MSEFailStep", error_message=Join(on=" ", values=["Execution failed due to MSE >", mse_threshold]),
)

## Definir un Paso Condicional para Comprobar la Precisión y Crear Condicionalmente un Modelo y Ejecutar una Transformación por Lotes y Registrar un Modelo en el Registro de Modelos, O Terminar la Ejecución en Estado Fallido

En este paso, el modelo se registra sólo si la precisión del modelo, determinada por el paso de evaluación `step_eval`, supera un valor especificado. En caso contrario, la ejecución del pipeline falla y finaliza. Un `ConditionStep` permite a los pipelines soportar una ejecución condicional en el DAG del pipeline basada en las condiciones de las propiedades del paso.

En la siguiente sección, usted:

* Definir una `ConditionLessThanOrEqualTo` sobre el valor de precisión encontrado en la salida del paso de evaluación, `step_eval`.
* Usar la condición en la lista de condiciones en un `ConditionStep`.
* Pasar los pasos `CreateModelStep` y `TransformStep`, y la colección de pasos `RegisterModel` a los `if_steps` del `ConditionStep`, que sólo se ejecutan si la condición se evalúa como `True`.
* Pasa el paso `FailStep` al `else_steps` del `ConditionStep`, que sólo se ejecuta si la condición es `False`.

In [82]:
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value",
    ),
    right=mse_threshold,
)

step_cond = ConditionStep(
    name="MSECond",
    conditions=[cond_lte],
    if_steps=[baseline_model_data_step, step_register, step_create_model],# step_transform],
    else_steps=[step_fail],
)

## Definir un pipeline de Parámetros, Pasos y Condiciones

En esta sección, combinar los pasos en un pipeline para que pueda ser ejecutado.

Un pipeline requiere un `nombre`, `parámetros` y `pasos`. Los nombres deben ser únicos dentro de un par `(cuenta, región)`.

Nota:

* Todos los parámetros utilizados en las definiciones deben estar presentes.
* Los pasos introducidos en el proceso no tienen que aparecer en el orden de ejecución. El servicio SageMaker Pipeline resuelve el DAG de dependencia de datos como pasos para que la ejecución se complete.
* Los pasos deben ser únicos en toda la lista de pasos de la canalización y en todas las listas if/else de pasos de condición.

In [83]:
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        input_data,
        mse_threshold,
        #subnet_a_param,
        #subnet_b_param,
        #group_param
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

### (Opcional) Examinar la definición del pipeline

El JSON de la definición del pipeline puede examinarse para confirmar que el pipeline está bien definido y que los parámetros y las propiedades de los pasos se resuelven correctamente.

In [85]:
definition = json.loads(pipeline.definition())
definition

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://anta-acoe-san-084828584964-analytics-artifact/proyecto/caso-uso/demo/training'},
  {'Name': 'MseThreshold', 'Type': 'Float', 'DefaultValue': 9e+21}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'PreprocessStep',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '683313688378.dk

## Enviar el pipeline a SageMaker e iniciar la ejecución

Envíe la definición del pipeline al servicio Pipeline. El servicio Pipeline utiliza el rol pasado para crear todos los trabajos definidos en los pasos.

In [86]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:084828584964:pipeline/demo-train-pipeline',
 'ResponseMetadata': {'RequestId': 'f8aceaba-dfdc-4fcf-86b7-c116d1e95e4c',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'f8aceaba-dfdc-4fcf-86b7-c116d1e95e4c',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '87',
   'date': 'Mon, 02 Jun 2025 20:08:46 GMT'},
  'RetryAttempts': 0}}

Inicie el pipeline y acepte todos los parámetros por defecto.

In [87]:
#execution = pipeline.start()
execution = pipeline.start(execution_display_name="execution-name")