Instalar dependencias

In [None]:
!pip install -U sagemaker

In [31]:
import os
import time
import boto3
import numpy as np
import pandas as pd
import sagemaker
from sagemaker import get_execution_role
from sagemaker.workflow.pipeline_context import PipelineSession

In [2]:
sess = boto3.Session()
sm = sess.client("sagemaker")
role = get_execution_role()
sagemaker_session = sagemaker.Session(boto_session=sess)
bucket = sagemaker_session.default_bucket()
default_bucket_prefix = sagemaker_session.default_bucket_prefix
region = boto3.Session().region_name


In [3]:
bucket, default_bucket_prefix

('sagemaker-us-east-2-244680720635', None)

In [4]:

pipeline_session = PipelineSession() # creo la sesión del pipeline

model_package_group_name = "PipelineModelPackageGroup" # asignamos un nombre al model package 
prefix = "pipeline-model-jei"

# If a default bucket prefix is specified, append it to the s3 path
if default_bucket_prefix:
    prefix = f"{default_bucket_prefix}/{prefix}"

pipeline_name = "jei-pipeline"  # SageMaker Pipeline name

Adquisición de Datos:

In [None]:
data_dir = os.path.join(os.getcwd(), "data")
os.makedirs(data_dir, exist_ok=True)

raw_dir = os.path.join(os.getcwd(), "data/raw")
os.makedirs(raw_dir, exist_ok=True)

In [None]:
!wget -P data/raw https://archive.ics.uci.edu/static/public/597/productivity+prediction+of+garment+employees.zip --no-check-certificate

In [None]:
!unzip data/raw/productivity+prediction+of+garment+employees.zip -d data/raw

In [5]:
df = pd.read_csv("data/raw/garments_worker_productivity.csv",sep=",") # leemos los datos
pd.set_option('display.max_columns', 500)     # Make sure we can see all of the columns
pd.set_option('display.max_rows', 20)         # Keep the output on one page


In [6]:
df

Unnamed: 0,date,quarter,department,day,team,targeted_productivity,smv,wip,over_time,incentive,idle_time,idle_men,no_of_style_change,no_of_workers,actual_productivity
0,1/1/2015,Quarter1,sweing,Thursday,8,0.80,26.16,1108.0,7080,98,0.0,0,0,59.0,0.940725
1,1/1/2015,Quarter1,finishing,Thursday,1,0.75,3.94,,960,0,0.0,0,0,8.0,0.886500
2,1/1/2015,Quarter1,sweing,Thursday,11,0.80,11.41,968.0,3660,50,0.0,0,0,30.5,0.800570
3,1/1/2015,Quarter1,sweing,Thursday,12,0.80,11.41,968.0,3660,50,0.0,0,0,30.5,0.800570
4,1/1/2015,Quarter1,sweing,Thursday,6,0.80,25.90,1170.0,1920,50,0.0,0,0,56.0,0.800382
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1192,3/11/2015,Quarter2,finishing,Wednesday,10,0.75,2.90,,960,0,0.0,0,0,8.0,0.628333
1193,3/11/2015,Quarter2,finishing,Wednesday,8,0.70,3.90,,960,0,0.0,0,0,8.0,0.625625
1194,3/11/2015,Quarter2,finishing,Wednesday,7,0.65,3.90,,960,0,0.0,0,0,8.0,0.625625
1195,3/11/2015,Quarter2,finishing,Wednesday,9,0.75,2.90,,1800,0,0.0,0,0,15.0,0.505889


In [7]:
df.describe

<bound method NDFrame.describe of            date   quarter  department        day  team  targeted_productivity  \
0      1/1/2015  Quarter1      sweing   Thursday     8                   0.80   
1      1/1/2015  Quarter1  finishing    Thursday     1                   0.75   
2      1/1/2015  Quarter1      sweing   Thursday    11                   0.80   
3      1/1/2015  Quarter1      sweing   Thursday    12                   0.80   
4      1/1/2015  Quarter1      sweing   Thursday     6                   0.80   
...         ...       ...         ...        ...   ...                    ...   
1192  3/11/2015  Quarter2   finishing  Wednesday    10                   0.75   
1193  3/11/2015  Quarter2   finishing  Wednesday     8                   0.70   
1194  3/11/2015  Quarter2   finishing  Wednesday     7                   0.65   
1195  3/11/2015  Quarter2   finishing  Wednesday     9                   0.75   
1196  3/11/2015  Quarter2   finishing  Wednesday     6                   0.

In [8]:

#df.to_csv(f"./data/raw/garments_worker_productivity.csv", header=True, index=False)

rawdata_s3_prefix = "{}/data/raw".format(prefix)

raw_s3 = sagemaker_session.upload_data(path="./data/raw/", key_prefix=rawdata_s3_prefix) # lo que tenemos en local en raw lo subimos al s3 
print(raw_s3)

s3://sagemaker-us-east-2-244680720635/pipeline-model-jei/data/raw


Define parametros de PipeLines

In [17]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat

# Datos de entrada 
input_data = ParameterString(name="InputData", default_value=raw_s3)

# Estado del modelo en el registro 
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="Approved")

# Parámetros del paso de procesamiento 
processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value="ml.c4.4xlarge"
)
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)

# Parámetros de entrenamiento ajustados para XGBoost 
training_instance_type = ParameterString(
    name="TrainingInstanceType", default_value="ml.m5.large"
)
num_rounds = ParameterInteger(name="NumRounds", default_value=100)

# Parámetro de rendimiento del modelo (sin cambios)
accuracy_mse_threshold = ParameterFloat(name="AccuracyMseThreshold", default_value=0.75)


In [None]:
!mkdir -p code

In [10]:
%%writefile code/preprocess.py

import glob
import numpy as np
import pandas as pd
import os
import json
import joblib
from io import StringIO
from sklearn.preprocessing import StandardScaler, OneHotEncoder, OrdinalEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
import tarfile

try:
    from sagemaker_containers.beta.framework import (
        content_types,
        encoders,
        env,
        modules,
        transformer,
        worker,
        server,
    )
except ImportError:
    pass

# Columnas definidas
feature_columns = [
    "department",            # categórica -> OneHot
    "date", "quarter", "day",# ordinal
    "team",
    "targeted_productivity",
    "smv",
    "wip",
    "over_time",
    "incentive",
    "idle_time",
    "idle_men",
    "no_of_style_change",
    "no_of_workers"
]
label_column = "actual_productivity"

# Clasificación de columnas
ohe_vars = ["department"]
ordinal_vars = ["date", "quarter", "day"]
num_vars = [col for col in feature_columns if col not in ohe_vars + ordinal_vars]

# Rutas base
base_dir = "/opt/ml/processing"
base_output_dir = "/opt/ml/output/"

if __name__ == "__main__":
    df = pd.read_csv(f"{base_dir}/input/garments_worker_productivity.csv")
    feature_data = df[feature_columns]
    label_data = df[label_column]

    # Split
    x_train, x_test, y_train, y_test = train_test_split(feature_data, label_data, test_size=0.33)

    # Pipeline de preprocesamiento
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', StandardScaler(), num_vars),
            ('cat', OneHotEncoder(sparse_output=False, handle_unknown='ignore'), ohe_vars),
            ('ord', OrdinalEncoder(), ordinal_vars)
        ]
    )

    pipeline = Pipeline(steps=[('preprocessor', preprocessor)])

    # Fit y transform
    x_train_processed = pipeline.fit_transform(x_train)
    x_test_processed = pipeline.transform(x_test)

    # Recuperar nombres de columnas procesadas
    cat_names = pipeline.named_steps['preprocessor'].named_transformers_['cat'].get_feature_names_out(ohe_vars).tolist()
    feature_names = num_vars + cat_names + ordinal_vars

    # Construcción final de DataFrames
    train_dataset = pd.concat(
        [pd.DataFrame(x_train_processed, columns=feature_names), y_train.reset_index(drop=True)], axis=1
    )
    test_dataset = pd.concat(
        [pd.DataFrame(x_test_processed, columns=feature_names), y_test.reset_index(drop=True)], axis=1
    )

    # Crear directorios
    os.makedirs(f"{base_dir}/train", exist_ok=True)
    os.makedirs(f"{base_dir}/test", exist_ok=True)

    # Guardar CSVs con nombres reales
    train_dataset.to_csv(f"{base_dir}/train/train.csv", header=True, index=False)
    test_dataset.to_csv(f"{base_dir}/test/test.csv", header=True, index=False)

    # Guardar modelo preprocesador
    os.makedirs(f"{base_dir}/scaler_model", exist_ok=True)
    joblib.dump(pipeline, "model.joblib")
    with tarfile.open(f"{base_dir}/scaler_model/model.tar.gz", "w:gz") as tar_handle:
        tar_handle.add("model.joblib")


# Función de entrada (input_fn)
def input_fn(input_data, content_type):
    if content_type == "text/csv":
        df = pd.read_csv(StringIO(input_data), header=None)
        if len(df.columns) == len(feature_columns) + 1:
            df.columns = feature_columns + [label_column]
        elif len(df.columns) == len(feature_columns):
            df.columns = feature_columns
        return df
    else:
        raise ValueError("{} not supported by script!".format(content_type))

# Función de salida (output_fn)
def output_fn(prediction, accept):
    if accept == "application/json":
        instances = [row for row in prediction.tolist()]
        return worker.Response(json.dumps({"instances": instances}), mimetype=accept)
    elif accept == "text/csv":
        return worker.Response(encoders.encode(prediction, accept), mimetype=accept)
    else:
        raise RuntimeError("{} accept type is not supported by this script.".format(accept))


Overwriting code/preprocess.py


In [11]:
input_data

ParameterString(name='InputData', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='s3://sagemaker-us-east-2-244680720635/pipeline-model-jei/data/raw')

Ejecucion del procesor

In [12]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

sklearn_framework_version = "1.2-1"

sklearn_processor = SKLearnProcessor(
    framework_version=sklearn_framework_version,
    instance_type="ml.c4.4xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-housing-data-process",
    role=role,
    sagemaker_session=pipeline_session,
)

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="scaler_model", source="/opt/ml/processing/scaler_model"),
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="code/preprocess.py",
)



In [13]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep


# firt step para pipeline 
step_process = ProcessingStep(
    name="PreprocessData",
    step_args=processor_args,
)

In [14]:
import os

# Intenta obtener las variables de entorno
train_data_dir = os.environ.get("SM_CHANNEL_TRAIN")
test_data_dir = os.environ.get("SM_CHANNEL_TEST")

print(f'Train Data Directory: {train_data_dir}')
print(f'Test Data Directory: {test_data_dir}')

Train Data Directory: None
Test Data Directory: None


In [None]:
!pip install -U sagemaker xgboost

In [38]:

from sagemaker.xgboost.estimator import XGBoost

xgb_estimator = XGBoost(
    entry_point="train.py",
    role=role,
    instance_count=1,
    instance_type="ml.m5.large",
    framework_version="1.7-1",
    py_version="py3",
    base_job_name="xgboost-pipeline-model",
    output_path=f"s3://{bucket}/{prefix}/training-jobs"
)


In [39]:

%%writefile code/train.py

import argparse
import os
import pandas as pd
import xgboost as xgb
import joblib

label_column = "actual_productivity"

def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    parser.add_argument("--test", type=str, default=os.environ.get("SM_CHANNEL_TEST"))
    parser.add_argument("--sm-model-dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
    return parser.parse_args()

if __name__ == "__main__":
    args = parse_args()

    train_df = pd.read_csv(os.path.join(args.train, "train.csv"))
    test_df = pd.read_csv(os.path.join(args.test, "test.csv"))

    X_train = train_df.drop(columns=[label_column])
    y_train = train_df[label_column]
    X_test = test_df.drop(columns=[label_column])
    y_test = test_df[label_column]

    model = xgb.XGBRegressor(objective="reg:squarederror", n_estimators=100, max_depth=4)
    model.fit(X_train, y_train)

    joblib.dump(model, os.path.join(args.sm_model_dir, "xgboost-model.joblib"))


Overwriting code/train.py


In [47]:
from sagemaker.inputs import TrainingInput
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.workflow.steps import TrainingStep

# Dónde almacenar el modelo entrenado
model_path = f"s3://{bucket}/{prefix}/model/"
#Hiperparametros
hyperparameters = {
    "objective": "reg:squarederror",
    "num_round": num_rounds,
    "max_depth": 5,
    "eta": 0.2,
}

# Define el estimator de XGBoost
xgb_estimator = XGBoost(
    entry_point="train.py",
    source_dir="code",
    framework_version="1.7-1",
    py_version="py3",
    role=role,
    instance_type=training_instance_type,
    instance_count=1,
    hyperparameters=hyperparameters,
    base_job_name="xgboost-train-model",
    output_path=model_path,
    sagemaker_session=pipeline_session,
)

pipeline_model = PipelineModel(
    models=[xgb_estimator],
    role=role,
    sagemaker_session=pipeline_session,
)

# Definir los argumentos para entrenamiento con XGBoost
train_args = xgb_estimator.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

# Paso de entrenamiento ajustado
step_train_model = TrainingStep(
    name="TrainXGBoostModel",
    step_args=train_args
)




In [48]:
%%writefile code/evaluate.py

import os
import json
import numpy as np
import pandas as pd
import pathlib
import tarfile
import joblib
from sklearn.metrics import mean_squared_error

label_column = "actual_productivity"

if __name__ == "__main__":
    model_path = "/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path, "r:gz") as tar:
        tar.extractall("./model")

    # Cargar el modelo XGBoost
    model = joblib.load("./model/xgboost-model.joblib")

    # Cargar columnas esperadas
    with open("./model/input_columns.json", "r") as f:
        input_columns = json.load(f)

    # Cargar y procesar dataset de test
    test_path = "/opt/ml/processing/test/"
    df = pd.read_csv(test_path + "test.csv")
    df = pd.get_dummies(df, columns=["team"])

    # Agregar columnas faltantes con 0 y ordenar
    for col in input_columns:
        if col not in df.columns:
            df[col] = 0
    df = df[input_columns + [label_column]]  # Orden correcto

    # Preparar datos
    x_test = df[input_columns].to_numpy()
    y_test = df[label_column].to_numpy()

    # Evaluar modelo
    predictions = model.predict(x_test)
    mse = mean_squared_error(y_test, predictions)
    print("\nTest MSE :", mse)

    # Reporte ajustado correctamente como float
    report_dict = {
        "regression_metrics": {
            "mse": {"value": float(mse), "standard_deviation": "NaN"},
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))


Overwriting code/evaluate.py


In [49]:
from sagemaker.workflow.properties import PropertyFile
from sagemaker.sklearn.processing import ScriptProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep


# Utilizar imagen oficial de SKLearn para procesamiento
sklearn_eval_image_uri = sagemaker.image_uris.retrieve(
    framework="sklearn",
    region=region,
    version="1.2-1",
    instance_type=processing_instance_type,
    image_scope="training"
)

evaluate_model_processor = ScriptProcessor(
    role=role,
    image_uri=sklearn_eval_image_uri,
    command=["python3"],
    instance_count=1,
    instance_type=processing_instance_type,
    sagemaker_session=pipeline_session,
)

evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

eval_args = evaluate_model_processor.run(
    inputs=[
        ProcessingInput(
            source=step_train_model.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="code/evaluate.py",
)

step_evaluate_model = ProcessingStep(
    name="EvaluateModelPerformance",
    step_args=eval_args,
    property_files=[evaluation_report],
)


In [50]:
from sagemaker.workflow.properties import PropertyFile
from sagemaker.sklearn.processing import ScriptProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

# Imagen SKLearn ajustada al scope "training"
sklearn_eval_image_uri = sagemaker.image_uris.retrieve(
    framework="sklearn",
    region=region,
    version="1.2-1",
    instance_type=processing_instance_type,
    image_scope="training", 
)

evaluate_model_processor = ScriptProcessor(
    role=role,
    image_uri=sklearn_eval_image_uri,
    command=["python3"],
    instance_count=1,
    instance_type=processing_instance_type,
    sagemaker_session=pipeline_session,
)

evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

eval_args = evaluate_model_processor.run(
    inputs=[
        ProcessingInput(
            source=step_train_model.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="code/evaluate.py",
)

step_evaluate_model = ProcessingStep(
    name="EvaluateModelPerformance",
    step_args=eval_args,
    property_files=[evaluation_report],
)


In [51]:
pipeline_session

<sagemaker.workflow.pipeline_context.PipelineSession at 0x7f0e7d5d7890>

In [52]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.model_step import ModelStep  # Asegúrate de importar ModelStep
from sagemaker.pipeline import PipelineModel
from sagemaker.xgboost.model import XGBoostModel

evaluation_s3_uri = "{}/evaluation.json".format(
    step_evaluate_model.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=evaluation_s3_uri,
        content_type="application/json",
    )
)

register_args = pipeline_model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.m5.large", "ml.m5.xlarge"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=model_package_group_name,
    model_metrics=model_metrics,
    approval_status=model_approval_status,
)

step_register_pipeline_model = ModelStep(
    name="PipelineModel",
    step_args=register_args,
)

DEFINICION DE HIPERPARAMETROS, ACCURACY

In [None]:
from sagemaker.workflow.functions import JsonGet, Join
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep

cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_evaluate_model.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value",
    ),
    right=str(accuracy_mse_threshold.default_value), # En realidad ambos valores deben ser strings
)

step_cond = ConditionStep(
    name="MSE-Lower-Than-Threshold-Condition",
    conditions=[cond_lte],
    if_steps=[step_register_pipeline_model],
    else_steps=[],
)


In [None]:
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        training_instance_type,
        processing_instance_type,
        processing_instance_count,
        input_data,
        model_approval_status,
        training_epochs,
        accuracy_mse_threshold,
    ],
    steps=[
        step_process,                 
        step_train_model,             
        step_evaluate_model,          
        step_cond,                    
    ],
)


In [None]:
import json

definition = json.loads(pipeline.definition())
definition

Enviar modelo a Sage Maker

In [None]:
role

In [None]:
pipeline.upsert(role_arn=role)

In [None]:
execution = pipeline.start()

In [None]:
execution.wait()

End Point

In [None]:
%%writefile utils.py
import argparse
import boto3
import logging
import os
from botocore.exceptions import ClientError
import tarfile
import zipfile

logger = logging.getLogger(__name__)
sm_client = boto3.client("sagemaker")


def get_approved_package(model_package_group_name):
    """Gets the latest approved model package for a model package group.

    Args:
        model_package_group_name: The model package group name.

    Returns:
        The SageMaker Model Package ARN.
    """
    try:
        # Get the latest approved model package
        response = sm_client.list_model_packages(
            ModelPackageGroupName=model_package_group_name,
            ModelApprovalStatus="Approved",
            SortBy="CreationTime",
            MaxResults=100,
        )
        approved_packages = response["ModelPackageSummaryList"]

        # Fetch more packages if none returned with continuation token
        while len(approved_packages) == 0 and "NextToken" in response:
            logger.debug("Getting more packages for token: {}".format(response["NextToken"]))
            response = sm_client.list_model_packages(
                ModelPackageGroupName=model_package_group_name,
                ModelApprovalStatus="Approved",
                SortBy="CreationTime",
                MaxResults=100,
                NextToken=response["NextToken"],
            )
            approved_packages.extend(response["ModelPackageSummaryList"])

        # Return error if no packages found
        if len(approved_packages) == 0:
            error_message = (
                f"No approved ModelPackage found for ModelPackageGroup: {model_package_group_name}"
            )
            logger.error(error_message)
            raise Exception(error_message)

        # Return the pmodel package arn
        model_package_arn = approved_packages[0]["ModelPackageArn"]
        logger.info(f"Identified the latest approved model package: {model_package_arn}")
        return approved_packages[0]
        # return model_package_arn
    except ClientError as e:
        error_message = e.response["Error"]["Message"]
        logger.error(error_message)
        raise Exception(error_message)

In [None]:
from utils import get_approved_package

sm_client = boto3.client("sagemaker")

pck = get_approved_package(
    model_package_group_name
)  # Reminder: model_package_group_name was defined as "NominetAbaloneModelPackageGroupName" at the beginning of the pipeline definition
model_description = sm_client.describe_model_package(ModelPackageName=pck["ModelPackageArn"])

model_description