In [1]:
from sagemaker.workflow.function_step import step
from sagemaker.workflow.pipeline import Pipeline
import sagemaker
from sagemaker.workflow.parameters import ParameterString, ParameterInteger
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.fail_step import FailStep
import utils

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [2]:
user = utils.get_username()

In [3]:
user

'luis-lazarte'

## Global variables

In [4]:
role = sagemaker.get_execution_role()
default_bucket = "mlops-chester"
default_prefix = f"sagemaker/bank-attrition-detection"
default_path = default_bucket + "/" + default_prefix
sagemaker_session = sagemaker.Session(default_bucket=default_bucket,
                                      default_bucket_prefix=default_prefix)

In [5]:
default_path

'mlops-chester/sagemaker/bank-attrition-detection'

In [6]:
instance_type = "ml.m5.2xlarge"
pipeline_name = "pipeline-train"
model_name = "attrition-detection"
cod_month = ParameterString(name="PeriodoCargaClientes")
cod_month_start = ParameterInteger(name="PeriodoCargaRequerimientosInicio")
cod_month_end = ParameterInteger(name="PeriodoCargaRequerimientosFin")

In [7]:
tracking_server_arn = 'arn:aws:sagemaker:us-east-2:762233743642:mlflow-tracking-server/mlops-mlflow-server'
experiment_name = "pipeline-train-attrition-detection"

## Data pull

In [8]:
@step(
    name="DataPull",
    instance_type=instance_type
)
def data_pull(experiment_name: str, run_name: str, cod_month: str, cod_month_start: int, cod_month_end: int) -> tuple[str, str]:
    import mlflow
    from mlflow.artifacts import download_artifacts
    mlflow.set_tracking_uri(tracking_server_arn)
    mlflow.set_experiment(experiment_name)
    import subprocess
    subprocess.run(['pip', 'install', 'awswrangler==3.12.0']) 
    import awswrangler as wr
    import os
    from sklearn.preprocessing import LabelEncoder
    from sklearn.preprocessing import StandardScaler
    import numpy as np
    import pandas as pd
    import pickle
    from sklearn.model_selection import train_test_split
    import tempfile
    output_dir = tempfile.mkdtemp()
    TARGET_COL = 'ATTRITION'
    query_clientes = """
       SELECT
        TRY_CAST(id_correlativo AS BIGINT) AS id_correlativo,
        TRY_CAST(codmes AS BIGINT) AS codmes,
        TRY_CAST(flg_bancarizado AS BIGINT) AS flg_bancarizado,
        rang_ingreso,
        flag_lima_provincia,
        TRY_CAST(edad AS DOUBLE) AS edad,
        TRY_CAST(antiguedad AS DOUBLE) AS antiguedad,
        TRY_CAST(attrition AS BIGINT) AS attrition,
        rang_sdo_pasivo_menos0,
        TRY_CAST(sdo_activo_menos0 AS BIGINT) AS sdo_activo_menos0,
        TRY_CAST(sdo_activo_menos1 AS BIGINT) AS sdo_activo_menos1,
        TRY_CAST(sdo_activo_menos2 AS BIGINT) AS sdo_activo_menos2,
        TRY_CAST(sdo_activo_menos3 AS BIGINT) AS sdo_activo_menos3,
        TRY_CAST(sdo_activo_menos4 AS BIGINT) AS sdo_activo_menos4,
        TRY_CAST(sdo_activo_menos5 AS BIGINT) AS sdo_activo_menos5,
        TRY_CAST(flg_seguro_menos0 AS BIGINT) AS flg_seguro_menos0,
        TRY_CAST(flg_seguro_menos1 AS BIGINT) AS flg_seguro_menos1,
        TRY_CAST(flg_seguro_menos2 AS BIGINT) AS flg_seguro_menos2,
        TRY_CAST(flg_seguro_menos3 AS BIGINT) AS flg_seguro_menos3,
        TRY_CAST(flg_seguro_menos4 AS BIGINT) AS flg_seguro_menos4,
        TRY_CAST(flg_seguro_menos5 AS BIGINT) AS flg_seguro_menos5,
        rang_nro_productos_menos0,
        TRY_CAST(flg_nomina AS BIGINT) AS flg_nomina,
        TRY_CAST(nro_acces_canal1_menos0 AS BIGINT) AS nro_acces_canal1_menos0,
        TRY_CAST(nro_acces_canal1_menos1 AS BIGINT) AS nro_acces_canal1_menos1,
        TRY_CAST(nro_acces_canal1_menos2 AS BIGINT) AS nro_acces_canal1_menos2,
        TRY_CAST(nro_acces_canal1_menos3 AS BIGINT) AS nro_acces_canal1_menos3,
        TRY_CAST(nro_acces_canal1_menos4 AS BIGINT) AS nro_acces_canal1_menos4,
        TRY_CAST(nro_acces_canal1_menos5 AS BIGINT) AS nro_acces_canal1_menos5,
        TRY_CAST(nro_acces_canal2_menos0 AS BIGINT) AS nro_acces_canal2_menos0,
        TRY_CAST(nro_acces_canal2_menos1 AS BIGINT) AS nro_acces_canal2_menos1,
        TRY_CAST(nro_acces_canal2_menos2 AS BIGINT) AS nro_acces_canal2_menos2,
        TRY_CAST(nro_acces_canal2_menos3 AS BIGINT) AS nro_acces_canal2_menos3,
        TRY_CAST(nro_acces_canal2_menos4 AS BIGINT) AS nro_acces_canal2_menos4,
        TRY_CAST(nro_acces_canal2_menos5 AS BIGINT) AS nro_acces_canal2_menos5,
        TRY_CAST(nro_acces_canal3_menos0 AS BIGINT) AS nro_acces_canal3_menos0,
        TRY_CAST(nro_acces_canal3_menos1 AS BIGINT) AS nro_acces_canal3_menos1,
        TRY_CAST(nro_acces_canal3_menos2 AS BIGINT) AS nro_acces_canal3_menos2,
        TRY_CAST(nro_acces_canal3_menos3 AS BIGINT) AS nro_acces_canal3_menos3,
        TRY_CAST(nro_acces_canal3_menos4 AS BIGINT) AS nro_acces_canal3_menos4,
        TRY_CAST(nro_acces_canal3_menos5 AS BIGINT) AS nro_acces_canal3_menos5,
        TRY_CAST(nro_entid_ssff_menos0 AS BIGINT) AS nro_entid_ssff_menos0,
        TRY_CAST(nro_entid_ssff_menos1 AS BIGINT) AS nro_entid_ssff_menos1,
        TRY_CAST(nro_entid_ssff_menos2 AS BIGINT) AS nro_entid_ssff_menos2,
        TRY_CAST(nro_entid_ssff_menos3 AS BIGINT) AS nro_entid_ssff_menos3,
        TRY_CAST(nro_entid_ssff_menos4 AS BIGINT) AS nro_entid_ssff_menos4,
        TRY_CAST(nro_entid_ssff_menos5 AS BIGINT) AS nro_entid_ssff_menos5,
        TRY_CAST(flg_sdo_otssff_menos0 AS BIGINT) AS flg_sdo_otssff_menos0,
        TRY_CAST(flg_sdo_otssff_menos1 AS BIGINT) AS flg_sdo_otssff_menos1,
        TRY_CAST(flg_sdo_otssff_menos2 AS BIGINT) AS flg_sdo_otssff_menos2,
        TRY_CAST(flg_sdo_otssff_menos3 AS BIGINT) AS flg_sdo_otssff_menos3,
        TRY_CAST(flg_sdo_otssff_menos4 AS BIGINT) AS flg_sdo_otssff_menos4,
        TRY_CAST(flg_sdo_otssff_menos5 AS BIGINT) AS flg_sdo_otssff_menos5
        FROM train_clientes_sample
        WHERE codmes = '{}';
    """.format(cod_month)

    query_requerimientos = """
        SELECT *
        FROM train_requerimientos
        WHERE codmes between {} and {};
        """.format(cod_month_start, cod_month_end)
    
    train_s3_path = f"s3://{default_path}"

    # Funciones extraccion de datos
    def buscar_indices_coincidentes(df_clientes):
        ids_comunes = list(set(df_clientes['ID_CORRELATIVO']))
        return ids_comunes

    def split_data(ids_comunes):
        return train_test_split(ids_comunes, test_size=0.3, random_state=42)

    # Funciones preprocesamiento
    def save_y_col_name(y_col):
        df_y_col_name = pd.DataFrame({'y_col':[y_col]})
        df_y_col_name.to_csv(os.path.join(train_s3_path, "outputs", "preprocess", "y_col_name.csv"), index=False)
        df_y_col_name.to_csv(os.path.join(output_dir, "y_col_name.csv"), index=False)
        mlflow.log_artifact(os.path.join(output_dir, "y_col_name.csv"), artifact_path="outputs/preprocess")

    def save_x_col_names(df_final, y_col):
        x_cols = [col for col in df_final.columns if col != y_col and col not in ['ID_CORRELATIVO', 'CODMES']]
        df_x_col_names = pd.DataFrame({'x_col': x_cols})
        df_x_col_names.to_csv(os.path.join(train_s3_path, "outputs", "preprocess", "x_col_names.csv"), index=False)
        df_x_col_names.to_csv(os.path.join(output_dir, "x_col_names.csv"), index=False)
        mlflow.log_artifact(os.path.join(output_dir, "x_col_names.csv"), artifact_path="outputs/preprocess")
        
    def generar_variables_ingenieria(clientes_df):
        clientes_df["VAR_SDO_ACTIVO_6M"] = clientes_df["SDO_ACTIVO_MENOS0"] - clientes_df["SDO_ACTIVO_MENOS5"]
        clientes_df["PROM_SDO_ACTIVO_0M_2M"] = clientes_df[[f"SDO_ACTIVO_MENOS{i}" for i in range(3)]].mean(axis=1)
        clientes_df["PROM_SDO_ACTIVO_3M_5M"] = clientes_df[[f"SDO_ACTIVO_MENOS{i}" for i in range(3, 6)]].mean(axis=1)
        clientes_df["VAR_SDO_ACTIVO_3M"] = clientes_df["PROM_SDO_ACTIVO_0M_2M"] - clientes_df["PROM_SDO_ACTIVO_3M_5M"]
        clientes_df["PROM_SDO_ACTIVO_6M"] = clientes_df[[f"SDO_ACTIVO_MENOS{i}" for i in range(6)]].mean(axis=1)
        clientes_df["MESES_CON_SEGURO"] = clientes_df[[f"FLG_SEGURO_MENOS{i}" for i in range(6)]].sum(axis=1)
        for canal in [1, 2, 3]:
            base = f"NRO_ACCES_CANAL{canal}_MENOS"
            clientes_df[f"VAR_NRO_ACCES_CANAL{canal}_6M"] = clientes_df[f"{base}0"] - clientes_df[f"{base}5"]
            clientes_df[f"PROM_NRO_ACCES_CANAL{canal}_6M"] = clientes_df[[f"{base}{i}" for i in range(6)]].mean(axis=1)
            clientes_df[f"PROM_NRO_ACCES_CANAL{canal}_0M_2M"] = clientes_df[[f"{base}{i}" for i in range(3)]].mean(axis=1)
            clientes_df[f"PROM_NRO_ACCES_CANAL{canal}_3M_5M"] = clientes_df[[f"{base}{i}" for i in range(3, 6)]].mean(axis=1)
            clientes_df[f"VAR_NRO_ACCES_CANAL{canal}_3M"] = (clientes_df[f"PROM_NRO_ACCES_CANAL{canal}_0M_2M"] - clientes_df[f"PROM_NRO_ACCES_CANAL{canal}_3M_5M"])
        clientes_df["PROM_NRO_ENTID_SSFF_6M"] = clientes_df[[f"NRO_ENTID_SSFF_MENOS{i}" for i in range(6)]].mean(axis=1)
        clientes_df["VAR_NRO_ENTID_SSFF_6M"] = clientes_df["NRO_ENTID_SSFF_MENOS0"] - clientes_df["NRO_ENTID_SSFF_MENOS5"]
        clientes_df["PROM_NRO_ENTID_SSFF_0M_2M"] = clientes_df[[f"NRO_ENTID_SSFF_MENOS{i}" for i in range(3)]].mean(axis=1)
        clientes_df["PROM_NRO_ENTID_SSFF_3M_5M"] = clientes_df[[f"NRO_ENTID_SSFF_MENOS{i}" for i in range(3, 6)]].mean(axis=1)
        clientes_df["VAR_NRO_ENTID_SSFF_3M"] = (clientes_df["PROM_NRO_ENTID_SSFF_0M_2M"] - clientes_df["PROM_NRO_ENTID_SSFF_3M_5M"])
        clientes_df["MESES_CON_SALDO"] = clientes_df[[f"FLG_SDO_OTSSFF_MENOS{i}" for i in range(6)]].sum(axis=1)
        return clientes_df
    
    def imputacion_variables(clientes_df, requerimientos_df):
        imputaciones = []
        moda_rango = clientes_df['RANG_INGRESO'].mode()[0]
        clientes_df['RANG_INGRESO'] = clientes_df['RANG_INGRESO'].fillna(moda_rango)
        imputaciones.append({'dataframe': 'clientes', 'variable': 'RANG_INGRESO', 'estrategia': 'moda', 'valor': moda_rango})
        moda_lima = clientes_df['FLAG_LIMA_PROVINCIA'].mode()[0]
        clientes_df['FLAG_LIMA_PROVINCIA'] = clientes_df['FLAG_LIMA_PROVINCIA'].fillna(moda_lima)
        imputaciones.append({'dataframe': 'clientes', 'variable': 'FLAG_LIMA_PROVINCIA', 'estrategia': 'moda', 'valor': moda_lima})
        mediana_edad = clientes_df['EDAD'].median()
        clientes_df['EDAD'] = clientes_df['EDAD'].fillna(mediana_edad)
        imputaciones.append({'dataframe': 'clientes', 'variable': 'EDAD', 'estrategia': 'mediana', 'valor': mediana_edad})
        mediana_antig = clientes_df['ANTIGUEDAD'].median()
        clientes_df['ANTIGUEDAD'] = clientes_df['ANTIGUEDAD'].fillna(mediana_antig)
        imputaciones.append({'dataframe': 'clientes', 'variable': 'ANTIGUEDAD', 'estrategia': 'mediana', 'valor': mediana_antig})
        moda_dictamen = requerimientos_df['DICTAMEN'].mode()[0]
        requerimientos_df['DICTAMEN'] = requerimientos_df['DICTAMEN'].fillna(moda_dictamen)
        imputaciones.append({'dataframe': 'requerimientos', 'variable': 'DICTAMEN', 'estrategia': 'moda', 'valor': moda_dictamen})
        df_imputaciones = pd.DataFrame(imputaciones)
        df_imputaciones.to_csv(os.path.join(train_s3_path, "outputs", "preprocess", "imputacion_parametros.csv"), index=False)
        df_imputaciones.to_csv(os.path.join(output_dir, "imputacion_parametros.csv"), index=False)
        mlflow.log_artifact(os.path.join(output_dir, "imputacion_parametros.csv"), artifact_path="outputs/preprocess")
        return clientes_df, requerimientos_df, df_imputaciones

    def encoder_categoricos(clientes_df):
        clientes_df['RANG_SDO_PASIVO_MENOS0'] = clientes_df['RANG_SDO_PASIVO_MENOS0'].replace('Cero', 'Rango_SDO_00')
        clientes_df['FLAG_LIMA_PROVINCIA'] = clientes_df['FLAG_LIMA_PROVINCIA'].map({'Lima': 1, 'Provincia': 0})
        cat_cols = clientes_df.select_dtypes(include=['object', 'category','string']).columns
        encoders_clientes = {} 
        for col in cat_cols:
            le = LabelEncoder()
            clientes_df[col] = le.fit_transform(clientes_df[col])
            encoders_clientes[col] = le
        return clientes_df, encoders_clientes

    def construir_variables_requerimientos(df_reqs, id_col='ID_CORRELATIVO'):
        total_reqs = df_reqs.groupby(id_col).size().rename('total_requerimientos')
        if not isinstance(total_reqs, pd.DataFrame):
            total_reqs = total_reqs.to_frame()
        n_tipo_req = df_reqs.groupby(id_col)['TIPO_REQUERIMIENTO2'].nunique().rename('nro_tipos_requerimiento').to_frame()
        n_dictamen = df_reqs.groupby(id_col)['DICTAMEN'].nunique().rename('nro_dictamenes').to_frame()
        n_producto = df_reqs.groupby(id_col)['PRODUCTO_SERVICIO_2'].nunique().rename('nro_productos_servicios').to_frame()
        n_submotivo = df_reqs.groupby(id_col)['SUBMOTIVO_2'].nunique().rename('nro_submotivos').to_frame()
        tipo_ohe = pd.get_dummies(df_reqs['TIPO_REQUERIMIENTO2'], prefix='tipo')
        tipo_ohe[id_col] = df_reqs[id_col]
        tipo_ohe = tipo_ohe.groupby(id_col).sum()
        dictamen_ohe = pd.get_dummies(df_reqs['DICTAMEN'], prefix='dictamen')
        dictamen_ohe[id_col] = df_reqs[id_col]
        dictamen_ohe = dictamen_ohe.groupby(id_col).sum()
        df_agregado = pd.concat([total_reqs, n_tipo_req, n_dictamen, n_producto, n_submotivo, tipo_ohe, dictamen_ohe],axis=1)
        return df_agregado

    def estandarizacion(df_final):
        no_escalar = ['ID_CORRELATIVO', 'CODMES', 'ATTRITION']
        columnas_a_escalar = df_final.columns.difference(no_escalar)
        df_predictoras = df_final[columnas_a_escalar]
        scaler = StandardScaler()
        df_escaladas = pd.DataFrame(scaler.fit_transform(df_predictoras),columns=columnas_a_escalar,index=df_final.index)
        df_final_estandarizado = pd.concat([df_final[no_escalar], df_escaladas],axis=1)
        return df_final_estandarizado, scaler

    def preprocess_dataset(clientes_df, requerimientos_df, y_col):
        save_y_col_name(y_col)
        clientes_df = generar_variables_ingenieria(clientes_df)
        clientes_df,requerimientos_df,df_imputaciones = imputacion_variables(clientes_df,requerimientos_df)
        clientes_df, artifact_encoders_clientes = encoder_categoricos(clientes_df)
        requerimientos_df = construir_variables_requerimientos(requerimientos_df)
        df_final = clientes_df.merge(requerimientos_df, on='ID_CORRELATIVO', how='left')
        df_final.fillna(0, inplace=True)
        df_final, artifact_scaler = estandarizacion(df_final)
        save_x_col_names(df_final, y_col)
        return df_final, artifact_encoders_clientes, artifact_scaler, df_imputaciones

    # Funciones preprocesamiento test

    def prepare_impute_missing_test(df_data, x_cols, df_impute_parameters):
        df_data_imputed = df_data.copy()
        for col in x_cols:
            impute_value = df_impute_parameters[df_impute_parameters["variable"]==col]["valor"].values[0]
            df_data_imputed[col] = df_data_imputed[col].fillna(impute_value)
        return df_data_imputed

    def apply_label_encoders_to_test(df_test, encoders_clientes):
        df_test['RANG_SDO_PASIVO_MENOS0'] = df_test['RANG_SDO_PASIVO_MENOS0'].replace('Cero', 'Rango_SDO_00')
        df_test['FLAG_LIMA_PROVINCIA'] = df_test['FLAG_LIMA_PROVINCIA'].map({'Lima': 1, 'Provincia': 0})
        for col, le in encoders_clientes.items():
            df_test[col] = le.transform(df_test[col])
        return df_test

    def aplicar_estandarizacion_test(df_test, scaler):
        no_escalar = ['ID_CORRELATIVO', 'CODMES', 'ATTRITION']
        columnas_a_escalar = df_test.columns.difference(no_escalar)
        df_predictoras = df_test[columnas_a_escalar]
        df_escaladas = pd.DataFrame(scaler.transform(df_predictoras),columns=columnas_a_escalar,index=df_test.index)
        df_test_estandarizado = pd.concat([df_test[no_escalar], df_escaladas], axis=1)
        return df_test_estandarizado

    def prepare_dataset_test(df_data_test,df_requerimientos_test,df_impute_parameters,encoders_clientes,scaler):
        x_cols_clientes = ['RANG_INGRESO','FLAG_LIMA_PROVINCIA','EDAD','ANTIGUEDAD']
        x_cols_requerimientos = ['DICTAMEN']
        df_data_imputed_clientes = prepare_impute_missing_test(df_data_test, x_cols_clientes, df_impute_parameters)
        df_data_imputed_requerimientos = prepare_impute_missing_test(df_requerimientos_test, x_cols_requerimientos, df_impute_parameters)
        df_data_feature_clientes = generar_variables_ingenieria(df_data_imputed_clientes)
        df_data_feature_requerimientos = construir_variables_requerimientos(df_data_imputed_requerimientos)
        df_data_encoder_clientes = apply_label_encoders_to_test(df_data_feature_clientes, encoders_clientes)
        df_final = df_data_encoder_clientes.merge(df_data_feature_requerimientos, on='ID_CORRELATIVO', how='left')
        df_final.fillna(0, inplace=True)
        df_final = aplicar_estandarizacion_test(df_final, scaler)
        return df_final
    
    with mlflow.start_run(run_name=run_name) as run:
        run_id = run.info.run_id

        with mlflow.start_run(run_name="DataPull", nested=True) as data_pull:
            data_pull_id = data_pull.info.run_id

            # Ejecutamos funciones de extraccion de datos

            df_clientes = wr.athena.read_sql_query(sql=query_clientes, database="bank_attrition")
            df_clientes.columns = df_clientes.columns.str.upper()
            df_clientes['RANG_INGRESO'] = df_clientes['RANG_INGRESO'].replace('', np.nan)
            df_clientes['FLAG_LIMA_PROVINCIA'] = df_clientes['FLAG_LIMA_PROVINCIA'].replace('', np.nan) 

            df_requerimientos = wr.athena.read_sql_query(sql=query_requerimientos, database="bank_attrition")
            df_requerimientos.columns = df_requerimientos.columns.str.upper()
            df_requerimientos['DICTAMEN'] = df_requerimientos['DICTAMEN'].replace('', np.nan) 
            
            ids_comunes = buscar_indices_coincidentes(df_clientes)
            ids_train, ids_test = split_data(ids_comunes)

            train_clientes = df_clientes[df_clientes['ID_CORRELATIVO'].isin(ids_train)].copy()
            test_clientes = df_clientes[df_clientes['ID_CORRELATIVO'].isin(ids_test)].copy()

            train_requerimientos = df_requerimientos[df_requerimientos['ID_CORRELATIVO'].isin(ids_train)].copy()
            test_requerimientos = df_requerimientos[df_requerimientos['ID_CORRELATIVO'].isin(ids_test)].copy()

            path_train_clientes = os.path.join(train_s3_path, "data", "out", "clientes_data_train.csv")
            path_test_clientes = os.path.join(train_s3_path, "data", "out", "clientes_data_test.csv")
            path_train_reqs = os.path.join(train_s3_path, "data", "out", "requerimientos_data_train.csv")
            path_test_reqs = os.path.join(train_s3_path, "data", "out", "requerimientos_data_test.csv")

            train_clientes.to_csv(path_train_clientes, index=False)
            test_clientes.to_csv(path_test_clientes, index=False)
            train_requerimientos.to_csv(path_train_reqs, index=False)
            test_requerimientos.to_csv(path_test_reqs, index=False)

            train_clientes.to_csv(os.path.join(output_dir, "clientes_data_train.csv"), index=False)
            test_clientes.to_csv(os.path.join(output_dir, "clientes_data_test.csv"), index=False)
            train_requerimientos.to_csv(os.path.join(output_dir, "requerimientos_data_train.csv"), index=False)
            test_requerimientos.to_csv(os.path.join(output_dir, "requerimientos_data_test.csv"), index=False)

            mlflow.log_artifact(os.path.join(output_dir, "clientes_data_train.csv"), artifact_path="data/out")
            mlflow.log_artifact(os.path.join(output_dir, "clientes_data_test.csv"), artifact_path="data/out")
            mlflow.log_artifact(os.path.join(output_dir, "requerimientos_data_train.csv"), artifact_path="data/out")
            mlflow.log_artifact(os.path.join(output_dir, "requerimientos_data_test.csv"), artifact_path="data/out")

            mlflow.log_input(mlflow.data.from_pandas(train_clientes, path_train_clientes, targets=TARGET_COL), context="DataPull_train_clientes")
            mlflow.log_input(mlflow.data.from_pandas(train_requerimientos, path_train_reqs), context="DataPull_train_requerimientos")

            mlflow.log_input(mlflow.data.from_pandas(test_clientes, path_test_clientes, targets=TARGET_COL), context="DataPull_test_clientes")
            mlflow.log_input(mlflow.data.from_pandas(test_requerimientos, path_test_reqs), context="DataPull_test_requerimientos")

            # Ejecutamos las funciones de preprocesamiento

            df_data_train_prepared, artifact_encoders_clientes, artifact_scaler, df_imputaciones = preprocess_dataset(train_clientes, train_requerimientos, TARGET_COL)
            df_data_train_prepared.to_csv(os.path.join(train_s3_path, "data", "out", "data_train_prepared.csv"), index=False)
            df_data_train_prepared.to_csv(os.path.join(output_dir, "data_train_prepared.csv"), index=False)
            mlflow.log_artifact(os.path.join(output_dir, "data_train_prepared.csv"), artifact_path="data/out")
            
            with open(os.path.join(output_dir, "scaler_train.pkl"), 'wb') as f:
                pickle.dump(artifact_scaler, f)
            mlflow.log_artifact(os.path.join(output_dir, "scaler_train.pkl"), artifact_path="outputs/preprocess")

            with open(os.path.join(output_dir, "label_encoder_train.pkl"), 'wb') as f:
                pickle.dump(artifact_encoders_clientes, f)
            mlflow.log_artifact(os.path.join(output_dir, "label_encoder_train.pkl"), artifact_path="outputs/preprocess")

            wr.s3.upload(local_file=os.path.join(output_dir, "scaler_train.pkl"),path=os.path.join(train_s3_path, "outputs", "preprocess", "scaler_train.pkl"))
            wr.s3.upload(local_file=os.path.join(output_dir, "label_encoder_train.pkl"),path=os.path.join(train_s3_path, "outputs", "preprocess", "label_encoder_train.pkl"))

            # Ejecutamos las funciones de preprocesamiento test
            df_data_test_prepared  = prepare_dataset_test(test_clientes,test_requerimientos,df_imputaciones,artifact_encoders_clientes,artifact_scaler)
            df_data_test_prepared.to_csv(os.path.join(train_s3_path, "data", "out", "data_test_prepared.csv"), index=False)
            df_data_test_prepared.to_csv(os.path.join(output_dir, "data_test_prepared.csv"), index=False)
            mlflow.log_artifact(os.path.join(output_dir, "data_test_prepared.csv"), artifact_path="data/out")
    return run_id, data_pull_id

## Model training

In [9]:
@step(
    name="ModelTraining",
    instance_type=instance_type
)
def model_training(experiment_name: str, run_id: str, data_pull_id: str ) -> str:
    import subprocess
    subprocess.run(['pip', 'install', 'awswrangler==3.12.0']) 
    import awswrangler as wr
    import mlflow
    from mlflow.artifacts import download_artifacts
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import GridSearchCV
    from xgboost import XGBClassifier
    import os
    import pickle
    import tempfile
    from sklearn.metrics import classification_report, roc_auc_score, RocCurveDisplay, log_loss, ConfusionMatrixDisplay, roc_curve
    import matplotlib.pyplot as plt
    import mlflow.sklearn
    import mlflow.xgboost
    from mlflow.models.signature import infer_signature
    from mlflow.tracking import MlflowClient
    output_dir = tempfile.mkdtemp()
    mlflow.set_tracking_uri(tracking_server_arn)
    mlflow.set_experiment(experiment_name)
    train_s3_path = f"s3://{default_path}"

    # Funciones train CV
    
    def get_preprocess_x_columns():
        x_cols_path = download_artifacts(run_id=data_pull_id, artifact_path='outputs/preprocess/x_col_names.csv')
        x_cols = pd.read_csv(x_cols_path)['x_col'].to_list()
        return x_cols

    def get_preprocess_y_column():
        y_col_path = download_artifacts(run_id=data_pull_id, artifact_path='outputs/preprocess/y_col_name.csv')
        y_col = pd.read_csv(y_col_path)['y_col'].to_list()
        return y_col

    def train_evaluate_models(df_data_train, model_parameters_grid, model, name_path):
        x_cols = get_preprocess_x_columns()
        y_col = get_preprocess_y_column()
        mlflow.log_param("set_train_cv_rows", df_data_train.shape[0])
        mlflow.log_param("set_train_cv_cols", x_cols)
        grid_search = GridSearchCV(estimator=model, param_grid=model_parameters_grid, scoring='roc_auc', cv=3, verbose=1, n_jobs=-1)
        grid_search.fit(df_data_train[x_cols], df_data_train[y_col].values.ravel())
        prefixed_params = {f"best_param_{name_path}_{k}": v for k, v in grid_search.best_params_.items()}
        mlflow.log_params(prefixed_params)
        df_model_results = pd.DataFrame({'model_parameters': grid_search.cv_results_['params'],
                                         'model_rank': grid_search.cv_results_['rank_test_score'],
                                         'auc_score_mean': grid_search.cv_results_['mean_test_score'],
                                         'auc_score_std': grid_search.cv_results_['std_test_score']})
        df_model_results['auc_score_cv'] = df_model_results['auc_score_std'] / df_model_results['auc_score_mean']
        df_model_results.to_csv(os.path.join(train_s3_path, "outputs", "train", "metrics", name_path, "train_cv_model_results.csv"), index=False)
        df_model_results.to_csv(os.path.join(output_dir, "train_cv_model_results.csv"), index=False)
        mlflow.log_artifact(os.path.join(output_dir, "train_cv_model_results.csv"), artifact_path=f"outputs/train/metrics/{name_path}")
        df_model_results_best_model = df_model_results[df_model_results['model_rank']==1].copy()
        best_auc_score_mean = df_model_results_best_model['auc_score_mean'].values[0]
        mlflow.log_metric(f"best_cv_train_{name_path}_auc_score_mean", best_auc_score_mean)
        best_auc_score_std = df_model_results_best_model['auc_score_std'].values[0]
        mlflow.log_metric(f"best_cv_train_{name_path}_auc_score_std", best_auc_score_std)
        best_auc_score_cv = df_model_results_best_model['auc_score_cv'].values[0]
        mlflow.log_metric(f"best_cv_train_{name_path}_auc_score_cv", best_auc_score_cv)
        df_model_results_best_model.to_csv(os.path.join(train_s3_path, "outputs", "train", "metrics", name_path, "train_cv_model_results_best_model.csv"), index=False)
        df_model_results_best_model.to_csv(os.path.join(output_dir, "train_cv_model_results_best_model.csv"), index=False)
        mlflow.log_artifact(os.path.join(output_dir, "train_cv_model_results_best_model.csv"), artifact_path=f"outputs/train/metrics/{name_path}")
        df_feature_importance = pd.DataFrame({'variable': grid_search.feature_names_in_, 'importance': grid_search.best_estimator_.feature_importances_})
        df_feature_importance.to_csv(os.path.join(train_s3_path, "outputs", "train", "feature_importance", name_path, "feature_importance.csv"), index=False)
        df_feature_importance.to_csv(os.path.join(output_dir, "feature_importance.csv"), index=False)
        mlflow.log_artifact(os.path.join(output_dir, "feature_importance.csv"), artifact_path=f"outputs/train/feature_importance/{name_path}")
        with open(os.path.join(output_dir, "grid_search_model.pickle"), 'wb') as handle:
            pickle.dump(grid_search, handle, protocol=pickle.HIGHEST_PROTOCOL)
        mlflow.log_artifact(os.path.join(output_dir, "grid_search_model.pickle"), artifact_path=f"outputs/train/models/{name_path}")
        wr.s3.upload(local_file=os.path.join(output_dir, "grid_search_model.pickle"),path=os.path.join(train_s3_path, "outputs", "train", "models", name_path, "grid_search_model.pickle"))
        
    with mlflow.start_run(run_id=run_id):
        with mlflow.start_run(run_name="ModelTraining", nested=True) as training_run:
            training_run_id = training_run.info.run_id
            
            model_parameters_grid_xgbost = {'max_depth': [3, 5],'eta': [0.05, 0.1],'gamma': [0, 1],'min_child_weight': [1, 5],'subsample': [0.8],'n_estimators': [50],'scale_pos_weight': [1, 5]}
            xgb_model = XGBClassifier(objective='binary:logistic', eval_metric='auc',random_state=42)
            mlflow.log_param("xgbost_param_grid", str(model_parameters_grid_xgbost))
            model_parameters_grid_random_forest = {'n_estimators': [100],'max_depth': [None, 6],'min_samples_leaf': [1, 10],'min_impurity_decrease': [0.0, 0.01]}
            rf_model = RandomForestClassifier(random_state=42, class_weight='balanced')
            mlflow.log_param("random_forest_param_grid", str(model_parameters_grid_random_forest))
            clientes_train_path = download_artifacts(run_id=data_pull_id, artifact_path="data/out/data_train_prepared.csv")
            df_data_train = pd.read_csv(clientes_train_path)
            path_train_clientes = os.path.join(train_s3_path, "data", "out", "data_train_prepared.csv")
            mlflow.log_input(mlflow.data.from_pandas(df_data_train, path_train_clientes, targets='ATTRITION'), context="ModelTraining_data_train_cv")
            train_evaluate_models(df_data_train, model_parameters_grid_xgbost, xgb_model, 'xgbost')
            train_evaluate_models(df_data_train, model_parameters_grid_random_forest, rf_model, 'random_forest')
            
    return training_run_id

## Evaluation

In [10]:
@step(
    name="ModelEvaluation",
    instance_type=instance_type
)
def evaluate(experiment_name: str, run_id: str, data_pull_id: str, training_run_id: str) -> tuple[str, str, str]:
    import subprocess
    subprocess.run(['pip', 'install', 'awswrangler==3.12.0']) 
    import awswrangler as wr
    import mlflow
    from mlflow.artifacts import download_artifacts
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import GridSearchCV
    from xgboost import XGBClassifier
    import os
    import pickle
    import tempfile
    from sklearn.metrics import classification_report, roc_auc_score, RocCurveDisplay, log_loss, ConfusionMatrixDisplay, roc_curve
    import matplotlib.pyplot as plt
    import mlflow.sklearn
    import mlflow.xgboost
    from mlflow.models.signature import infer_signature
    from mlflow.tracking import MlflowClient
    output_dir = tempfile.mkdtemp()
    mlflow.set_tracking_uri(tracking_server_arn)
    mlflow.set_experiment(experiment_name)
    train_s3_path = f"s3://{default_path}"
    
    # Funciones Best model

    def get_features_name(name_path,training_run_id):
        feature_path = download_artifacts(run_id=training_run_id, artifact_path=f'outputs/train/feature_importance/{name_path}/feature_importance.csv')
        df_feature_importance = pd.read_csv(feature_path)    
        return df_feature_importance['variable'].to_list()

    def get_target_name():
        y_col = "ATTRITION"
        return y_col

    def evaluate_best_model_in_dataset(df_data,name_path,best_model,training_run_id):
        x_cols = get_features_name(name_path,training_run_id)
        y_col = get_target_name()
        y_proba = best_model.predict_proba(df_data[x_cols])[:,1]
        
        fpr, tpr, thresholds = roc_curve(df_data[y_col], y_proba)
        j_scores = tpr - fpr
        optimal_idx = j_scores.argmax()
        best_threshold = thresholds[optimal_idx]
        y_pred_label = (y_proba >= best_threshold).astype(int)

        report = classification_report(df_data[y_col], y_pred_label, output_dict=True)
        auc_metric = roc_auc_score(df_data[y_col],y_proba)
        mlflow.log_metric(f"{name_path}_roc_auc_test", auc_metric)
        mlflow.log_metric(f"{name_path}_precision_test", report["1"]["precision"])
        mlflow.log_metric(f"{name_path}_recall_test", report["1"]["recall"])
        mlflow.log_metric(f"{name_path}_f1-score_test", report["1"]["f1-score"])
        mlflow.log_metric(f"{name_path}_log_loss_test", log_loss(df_data[y_col], y_proba))

        roc_display = RocCurveDisplay.from_predictions(df_data[y_col], y_proba)
        plt.title(f"Receiver Operating Characteristic (ROC) Curve for {name_path}")
        plt.savefig(os.path.join(output_dir,f'roc_curve_{name_path}_test.png'))
        plt.close()
        mlflow.log_artifact(os.path.join(output_dir, f"roc_curve_{name_path}_test.png"), artifact_path=f"outputs/train/metrics/{name_path}")
        wr.s3.upload(local_file=os.path.join(output_dir, f"roc_curve_{name_path}_test.png"),path=os.path.join(train_s3_path, "outputs", "train", "metrics", name_path, f"roc_curve_{name_path}_test.png"))
        
        disp = ConfusionMatrixDisplay.from_predictions(df_data[y_col], y_pred_label)
        plt.title(f"Confusion Matrix for {name_path}")
        plt.savefig(os.path.join(output_dir,f'conf_matrix_{name_path}_test.png'))
        plt.close()
        mlflow.log_artifact(os.path.join(output_dir, f"conf_matrix_{name_path}_test.png"), artifact_path=f"outputs/train/metrics/{name_path}")
        wr.s3.upload(local_file=os.path.join(output_dir, f"conf_matrix_{name_path}_test.png"),path=os.path.join(train_s3_path, "outputs", "train", "metrics", name_path, f"conf_matrix_{name_path}_test.png"))
        
        return report["1"]["recall"]

    def select_best_model(df_data_train, df_data_test,name_path,training_run_id):
        path_grid = download_artifacts(run_id=training_run_id, artifact_path=f'outputs/train/models/{name_path}/grid_search_model.pickle')
        with open(path_grid, 'rb') as handle:
            grid_search = pickle.load(handle)

        best_params = grid_search.best_params_
        if name_path == "xgbost":
            model = XGBClassifier(**best_params, objective='binary:logistic', eval_metric='auc', random_state=42)
        elif name_path == "random_forest":
            model = RandomForestClassifier(**best_params, random_state=42, class_weight='balanced')
        
        x_cols = get_features_name(name_path,training_run_id)
        y_col = get_target_name()
        model.fit(df_data_train[x_cols], df_data_train[y_col].values.ravel())
        best_model = model
        signature = infer_signature(df_data_train[x_cols], best_model.predict_proba(df_data_train[x_cols]))
        input_example = df_data_train[x_cols].iloc[:5]
        if name_path == "xgbost":
            mlflow.xgboost.log_model(best_model, artifact_path=f"{name_path}_model", signature=signature,input_example=input_example)
        elif name_path == "random_forest":
            mlflow.sklearn.log_model(best_model, artifact_path=f"{name_path}_model", signature=signature,input_example=input_example)

        with open(os.path.join(output_dir, "best_model.pickle"), 'wb') as handle:
            pickle.dump(grid_search, handle, protocol=pickle.HIGHEST_PROTOCOL)
        mlflow.log_artifact(os.path.join(output_dir, "best_model.pickle"), artifact_path=f"outputs/train/models/{name_path}")
        wr.s3.upload(local_file=os.path.join(output_dir, "best_model.pickle"),path=os.path.join(train_s3_path, "outputs", "train", "models", name_path, "best_model.pickle"))
        
        recall_metric_test = evaluate_best_model_in_dataset(df_data_test,name_path,best_model,training_run_id)
        df_metrics = pd.DataFrame({'sample':['test'],'recall':[recall_metric_test]})

        df_metrics.to_csv(os.path.join(train_s3_path, "outputs", "train", "metrics", name_path, "test_metrics.csv"), index=False)
        df_metrics.to_csv(os.path.join(output_dir, "test_metrics.csv"), index=False)
        mlflow.log_artifact(os.path.join(output_dir, "test_metrics.csv"), artifact_path=f"outputs/train/metrics/{name_path}")

        return recall_metric_test

    
    with mlflow.start_run(run_id=run_id):
        with mlflow.start_run(run_name="ModelEvaluation", nested=True) as evaluation_run:
            evaluation_run_id = evaluation_run.info.run_id
            
            clientes_train_path = download_artifacts(run_id=data_pull_id, artifact_path="data/out/data_train_prepared.csv")
            df_data_train = pd.read_csv(clientes_train_path)

            path_train_clientes = os.path.join(train_s3_path, "data", "out", "data_train_prepared.csv")
            mlflow.log_input(mlflow.data.from_pandas(df_data_train, path_train_clientes, targets='ATTRITION'), context="data_train")
            
            clientes_test_path = download_artifacts(run_id=data_pull_id, artifact_path="data/out/data_test_prepared.csv")
            df_data_test = pd.read_csv(clientes_test_path)

            path_test_clientes = os.path.join(train_s3_path, "data", "out", "data_test_prepared.csv")
            mlflow.log_input(mlflow.data.from_pandas(df_data_test, path_test_clientes, targets='ATTRITION'), context="data_test")
            
            metrics_rf = select_best_model(df_data_train, df_data_test, 'random_forest',training_run_id)
            metrics_xg = select_best_model(df_data_train, df_data_test, 'xgbost',training_run_id)
    return evaluation_run_id, metrics_rf, metrics_xg

## MODEL REGISTRATION RANDOM FOREST

In [11]:
@step(
    name="RegisterRandomForestModel", 
    instance_type=instance_type 
)
def register_random_forest_model(experiment_name: str, name_path: str, run_id: str, evaluation_run_id: str):

    import mlflow
    from mlflow.artifacts import download_artifacts
    from mlflow.models.signature import infer_signature
    from mlflow.tracking import MlflowClient
    import os
    import pickle
    import pandas as pd
    from xgboost import XGBClassifier
    from sklearn.ensemble import RandomForestClassifier
    import tempfile
    output_dir = tempfile.mkdtemp()
    
    mlflow.set_tracking_uri(tracking_server_arn)
    mlflow.set_experiment(experiment_name)
    
    train_s3_path = f"s3://{default_path}"

    with mlflow.start_run(run_id=run_id):
        with mlflow.start_run(run_name="RegisterRandomForestModel", nested=True):

            model_uri = f"runs:/{evaluation_run_id}/{name_path}_model"
            model_registry_name = "attrition-detection-model"
            result = mlflow.register_model(model_uri=model_uri, name=model_registry_name)
            client = MlflowClient()
        
            client.set_model_version_tag(
            name=model_registry_name,
            version=result.version,
            key="estado",
            value="production")

            client.set_registered_model_alias(
            name=model_registry_name,
            alias="champion",
            version=result.version)

            client.update_model_version(
            name=model_registry_name,
            version=result.version,
            description=f"{name_path} fue el modelo que obtuvo mejor recall usando los mejores hiperparametros por lo que ahora sera el modelo productivo")

## MODEL REGISTRATION XGBOOST

In [12]:
@step(
    name="RegisterXGBoostModel", 
    instance_type=instance_type
)
def register_xgboost_model(experiment_name: str, name_path: str, run_id: str, evaluation_run_id: str):
    import mlflow
    from mlflow.artifacts import download_artifacts
    from mlflow.models.signature import infer_signature
    from mlflow.tracking import MlflowClient
    import os
    import pickle
    import pandas as pd
    from xgboost import XGBClassifier
    from sklearn.ensemble import RandomForestClassifier
    import tempfile
    output_dir = tempfile.mkdtemp()
    
    mlflow.set_tracking_uri(tracking_server_arn)
    mlflow.set_experiment(experiment_name)
    
    train_s3_path = f"s3://{default_path}"
    with mlflow.start_run(run_id=run_id):
        with mlflow.start_run(run_name="RegisterXGBoostModel", nested=True):
            
            model_uri = f"runs:/{evaluation_run_id}/{name_path}_model"
            model_registry_name = "attrition-detection-model"
            result = mlflow.register_model(model_uri=model_uri, name=model_registry_name)
            client = MlflowClient()
        
            client.set_model_version_tag(
            name=model_registry_name,
            version=result.version,
            key="estado",
            value="production")

            client.set_registered_model_alias(
            name=model_registry_name,
            alias="champion",
            version=result.version)

            client.update_model_version(
            name=model_registry_name,
            version=result.version,
            description=f"{name_path} fue el modelo que obtuvo mejor recall usando los mejores hiperparametros por lo que ahora sera el modelo productivo")

## Pipeline

In [13]:
data_pull_step = data_pull(
    experiment_name=experiment_name,
    run_name=ExecutionVariables.PIPELINE_EXECUTION_ID,
    cod_month=cod_month,
    cod_month_start=cod_month_start,
    cod_month_end=cod_month_end
)

model_training_step = model_training(
    experiment_name=experiment_name,
    run_id=data_pull_step[0],
    data_pull_id=data_pull_step[1]
)

model_evaluation_step = evaluate(
    experiment_name=experiment_name,
    run_id=data_pull_step[0],
    data_pull_id=data_pull_step[1],
    training_run_id=model_training_step
)

conditional_register_step = ConditionStep(
    name="ConditionalRegisterOverall",
    conditions=[
        ConditionGreaterThanOrEqualTo(
            left=model_evaluation_step[1], 
            right=model_evaluation_step[2], 
        )
    ],
    if_steps=[
        ConditionStep(
            name="ConditionalRegisterRandomForestBranch",
            conditions=[
                ConditionGreaterThanOrEqualTo(
                    left=model_evaluation_step[1], 
                    right=0.6,
                )
            ],
            if_steps=[
                register_random_forest_model( 
                    experiment_name=experiment_name,
                    name_path='random_forest',
                    run_id=data_pull_step[0],
                    evaluation_run_id = model_evaluation_step[0]
                )
            ],
            else_steps=[
                FailStep(
                    name="FailRandomForestPerformance", 
                    error_message="Random Forest performance is not good enough"
                )
            ]
        )
    ],
    else_steps=[
        ConditionStep(
            name="ConditionalRegisterXGBoostBranch",
            conditions=[
                ConditionGreaterThanOrEqualTo(
                    left=model_evaluation_step[2], 
                    right=0.6,
                )
            ],
            if_steps=[
                register_xgboost_model(
                    experiment_name=experiment_name,
                    name_path='xgbost', 
                    run_id=data_pull_step[0],
                    evaluation_run_id = model_evaluation_step[0]
                )
            ],
            else_steps=[
                FailStep(
                    name="FailXGBoostPerformance", 
                    error_message="XGBoost performance is not good enough"
                )
            ]
        )
    ]
)


In [14]:
pipeline = Pipeline(
    name=pipeline_name,
    steps=[
        data_pull_step,
        model_training_step,
        model_evaluation_step,
        conditional_register_step
    ],
    parameters=[cod_month, cod_month_start, cod_month_end]
)

pipeline.upsert(role_arn=role)

2025-06-29 15:24:19,467 sagemaker.remote_function INFO     Uploading serialized function code to s3://sagemaker-us-east-2-762233743642/pipeline-train/DataPull/2025-06-29-15-24-19-254/function
2025-06-29 15:24:19,544 sagemaker.remote_function INFO     Uploading serialized function arguments to s3://sagemaker-us-east-2-762233743642/pipeline-train/DataPull/2025-06-29-15-24-19-254/arguments
2025-06-29 15:24:19,749 sagemaker.remote_function INFO     Uploading serialized function code to s3://sagemaker-us-east-2-762233743642/pipeline-train/ModelTraining/2025-06-29-15-24-19-254/function
2025-06-29 15:24:19,802 sagemaker.remote_function INFO     Uploading serialized function arguments to s3://sagemaker-us-east-2-762233743642/pipeline-train/ModelTraining/2025-06-29-15-24-19-254/arguments
2025-06-29 15:24:19,876 sagemaker.remote_function INFO     Uploading serialized function code to s3://sagemaker-us-east-2-762233743642/pipeline-train/ModelEvaluation/2025-06-29-15-24-19-254/function
2025-06-29 

{'PipelineArn': 'arn:aws:sagemaker:us-east-2:762233743642:pipeline/pipeline-train',
 'ResponseMetadata': {'RequestId': '9c903aa1-5b8f-4aed-8b05-edf2f5722071',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '9c903aa1-5b8f-4aed-8b05-edf2f5722071',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '82',
   'date': 'Sun, 29 Jun 2025 15:24:21 GMT'},
  'RetryAttempts': 0}}

In [15]:
pipeline.start(parameters={"PeriodoCargaClientes": '201208',
                           "PeriodoCargaRequerimientosInicio": 201203,
                          "PeriodoCargaRequerimientosFin": 201208},
               execution_display_name="test-training-full-2",
               execution_description="Testando training full 2")

_PipelineExecution(arn='arn:aws:sagemaker:us-east-2:762233743642:pipeline/pipeline-train/execution/7h0xjbe88zx5', sagemaker_session=<sagemaker.session.Session object at 0x7f0557bc42c0>)