In [None]:
import json

import boto3
import pandas as pd
import sagemaker
from sagemaker.deserializers import CSVDeserializer
from sagemaker.drift_check_baselines import DriftCheckBaselines
from sagemaker.image_uris import retrieve
from sagemaker.inputs import TrainingInput
from sagemaker.lambda_helper import Lambda
from sagemaker.model_metrics import FileSource, MetricsSource, ModelMetrics
from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor
from sagemaker.serializers import CSVSerializer
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.clarify_check_step import (
    ClarifyCheckStep,
    ModelBiasCheckConfig,
    ModelExplainabilityCheckConfig,
)
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.lambda_step import (
    LambdaStep,
)
from sagemaker.workflow.parameters import (
    ParameterBoolean,
    ParameterInteger,
    ParameterString,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.steps import CreateModelStep, ProcessingStep, TrainingStep
from sagemaker.xgboost.estimator import XGBoost

## Set up a SageMaker Studio notebook and parameterize the pipeline

In [None]:
# Instantiate AWS services session and client objects
sess = sagemaker.Session()
write_bucket = sess.default_bucket()
write_prefix = "hospital_wait_time_prediction"

read_bucket = sess.default_bucket()
read_prefix = "hospital_wait_time_prediction/raw"

region = sess.boto_region_name
s3_client = boto3.client("s3", region_name=region)
sm_client = boto3.client("sagemaker", region_name=region)
sm_runtiome_client = boto3.client("sagemaker-runtime")

# Fetch SageMaker execution role
sagemaker_role = sagemaker.get_execution_role()

# Full S3 paths
hospital_data_uri = f"s3://{read_bucket}/{read_prefix}/basesdedatos_50K.xlsx"
output_data_uri = f"s3://{write_bucket}/{write_prefix}/"
scripts_uri = f"s3://{write_bucket}/{write_prefix}/scripts"
estimator_output_uri = f"s3://{write_bucket}/{write_prefix}/training_jobs"
processing_output_uri = f"s3://{write_bucket}/{write_prefix}/processing_jobs"
model_eval_output_uri = f"s3://{write_bucket}/{write_prefix}/model_eval"

# Retrieve training image
training_image = retrieve(framework="xgboost", region=region, version="1.3-1")

In [None]:
# Set names of pipeline objects
pipeline_name = "HospitalWaitTimePredPipeline"
pipeline_model_name = "hospital-wait-time-pred-pipeline"
model_package_group_name = "hospital-wait-time-pred-model-group"
base_job_name_prefix = "hospital-pred"
endpoint_config_name = f"{pipeline_model_name}-endpoint-config"
endpoint_name = f"{pipeline_model_name}-endpoint"

# Set instance types and counts
process_instance_type = "ml.m4.xlarge"
train_instance_count = 1
train_instance_type = "ml.m4.xlarge"
predictor_instance_count = 1
predictor_instance_type = "ml.m4.xlarge"

In [None]:
# Set up pipeline input parameters

# Set processing instance type
process_instance_type_param = ParameterString(
    name="ProcessingInstanceType",
    default_value=process_instance_type,
)

# Set training instance type
train_instance_type_param = ParameterString(
    name="TrainingInstanceType",
    default_value=train_instance_type,
)

# Set training instance count
train_instance_count_param = ParameterInteger(
    name="TrainingInstanceCount", default_value=train_instance_count
)

# Set deployment instance type
deploy_instance_type_param = ParameterString(
    name="DeployInstanceType",
    default_value=predictor_instance_type,
)

# Set deployment instance count
deploy_instance_count_param = ParameterInteger(
    name="DeployInstanceCount", default_value=predictor_instance_count
)

# Set model approval param
model_approval_status_param = ParameterString(name="ModelApprovalStatus", default_value="Approved")

## Data processing step

In [None]:
%%writefile process.py

import argparse
import logging
import os
import re
import string

import gender_guesser.detector as gender
from gensim.models import Doc2Vec
from gensim.models.doc2vec import TaggedDocument
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


def parse_args():
    """Parse job arguments"""
    parser = argparse.ArgumentParser()
    parser.add_argument("--train-ratio", type=float, default=0.8)
    parser.add_argument("--validation-ratio", type=float, default=0.1)
    parser.add_argument("--test-ratio", type=float, default=0.1)
    args, _ = parser.parse_known_args()
    logger.info(f"Received arguments {args}")
    return args


def load_data():
    """Load data from input path"""
    input_path = os.path.join("/opt/ml/processing/input", "basededatos_50K.xlsx")
    logger.info(f"Loading data from {input_path}")
    df = pd.read_excel(input_path)
    logger.info(f"Loaded dataset with shape: {df.shape}")
    logger.info(f"Dataset info: {df.info()}")
    return df


def clean_duplicates(df):
    """Remove duplicate records"""
    logger.info("Cleaning duplicates")
    initial_len = len(df)
    df = df.drop_duplicates()
    logger.info(
        f"Number of duplicates:\t\t {initial_len - len(df)} - {(initial_len - len(df)) / len(df) * 100:.1f}%"
    )
    return df


def handle_missing_values(df):
    """Handle missing values in the dataset"""
    logger.info("Handling missing values")
    initial_len = len(df)
    df = df.dropna()
    logger.info(
        f"Number of missing values:\t {initial_len - len(df)} - {(initial_len - len(df)) / len(df) * 100:.1f}%"
    )
    return df


def absolute_values(df):
    """Convert values to absolute values"""
    logger.info("Converting values to absolute values")
    df["tiempo_espera_triage"] = df["tiempo_espera_triage"].abs()
    df["tiempo_en_triage"] = df["tiempo_en_triage"].abs()
    df["tiempo_espera_despuestriage"] = df["tiempo_espera_despuestriage"].abs()
    df["tiempo_en_consulta"] = df["tiempo_en_consulta"].abs()
    df["edad"] = df["edad"].abs()

    # create a column with the total time
    df["tiempo_total"] = (
        df["tiempo_espera_triage"]
        + df["tiempo_en_triage"]
        + df["tiempo_espera_despuestriage"]
        + df["tiempo_en_consulta"]
    )
    return df


def remove_outliers(df, column):
    """Remove outliers from a column"""
    logger.info(f"Removing outliers from {column}")
    initial_len = len(df)

    if column == "edad":
        logger.info("Standard age values")
        df.rename(columns={"Edad1": "edad_unidad"}, inplace=True)
        # convert column edad_unidad to numeric.
        df["edad_unidad"] = df["edad_unidad"].replace(
            {"Años": 1, "Meses": 1 / 12, "Dias": 1 / 365}
        )
        # standardize the value of the age
        df["edad"] = df["edad"] * df["edad_unidad"]

    # Calculate the percentiles
    twenty_fifth = df[column].quantile(0.25)
    seventy_fifth = df[column].quantile(0.75)

    # Obtain IQR
    iqr = seventy_fifth - twenty_fifth

    # Upper and lower thresholds
    upper = seventy_fifth + (1.5 * iqr)
    lower = twenty_fifth - (1.5 * iqr)

    # Subset the dataset
    outliers = df[(df[column] < lower) | (df[column] > upper)]

    logger.info(
        f"Number of outliers for {column}:\t {len(outliers)} - {(len(outliers) / initial_len * 100):.1f}%"
    )

    df = df.drop(outliers.index)

    return df


def gender_encoding(df):
    """Encode gender"""
    logger.info("Encoding gender")
    initial_len = len(df)
    # Create an gender detector object
    d = gender.Detector()

    def obtener_genero(nombre):
        # setting the second name, there is a total categorization between 'Ambiguo' y 'Desconocido' of 34,535 names
        nombre = nombre.split()[-1]
        nombre = nombre.title()
        # if the name is 'Femenino' or 'Masculino' return the name
        # total categorization between 'Ambiguo' y 'Desconocido' of 27,709 names. Recover 6,826 names
        if nombre == "Femenino" or nombre == "Masculino":
            return nombre
        else:
            return d.get_gender(nombre)

    df["genero"] = df["nombre"].apply(obtener_genero)

    df["genero"] = df["genero"].replace(
        {
            "male": "Masculino",
            "female": "Femenino",
            "andy": "Ambiguo",
            "unknown": "Desconocido",
            "mostly_male": "Masculino",
            "mostly_female": "Femenino",
        }
    )

    df_ambiguo = df[(df["genero"] == "Ambiguo") | (df["genero"] == "Desconocido")]

    logger.info(
        f"Number of unknown:\t {len(df_ambiguo)} - {len(df_ambiguo) / initial_len * 100:.1f}%"
    )

    df = df.drop(df_ambiguo.index)
    return df


def encode_categorical(df):
    """Encode categorical variables"""
    logger.info("Encoding categorical variables")
    # Encode clasification 'ROJO': 0, 'AMARILLO': 1, 'VERDE': 2
    df["clasificacion_encode"] = df["clasificacion"].replace(
        {"ROJO": 0, "AMARILLO": 1, "VERDE": 2}
    )

    encoder = OneHotEncoder(sparse_output=False)
    # Aplicar el codificador a la columna 'genero'
    genero_encoded = encoder.fit_transform(df[["genero"]])
    # Convertir la salida a un DataFrame
    genero_encoded_df = pd.DataFrame(
        genero_encoded, columns=encoder.get_feature_names_out(["genero"]), index=df.index
    )
    # Concatenar las columnas codificadas con el DataFrame original
    df = pd.concat([df, genero_encoded_df], axis=1)

    return df


def process_dates(df):
    """Extract features from date columns"""
    logger.info("Extracting time features from date columns")
    # Extract caracteristics from date
    df["hora"] = df["Fecha"].dt.hour
    df["minuto"] = df["Fecha"].dt.minute
    df["mes"] = df["Fecha"].dt.month
    df["dia"] = df["Fecha"].dt.day
    df["dia_semana"] = df["Fecha"].dt.dayofweek

    return df


def remove_punctuation(df):
    """Remove punctuation from the dataset"""
    logger.info("Removing punctuation from the dataset")
    initial_len = len(df)
    # clean text
    df_not_string = df[~df["Dx"].apply(lambda x: isinstance(x, str))]
    logger.info(
        f"Number of not string values:\t {len(df_not_string)} - {len(df_not_string) / initial_len * 100:.3f}%"
    )
    df = df.drop(df_not_string.index)

    def remove_punctuation(text):
        """custom function to remove the punctuation"""
        return text.translate(str.maketrans("", "", string.punctuation + "1" + "2" + "3"))

    df["Dx"] = df["Dx"].apply(lambda text: remove_punctuation(text))

    return df


def remove_empty_string(df):
    """Remove empty string from the dataset"""
    logger.info("Removing empty string from the dataset")
    initial_len = len(df)
    df_empty_string = df[df["Dx"] == ""]
    logger.info(
        f"Number of empty string:\t {len(df_empty_string)} - {len(df_empty_string) / initial_len * 100:.1f}%"
    )
    df = df.drop(df_empty_string.index)
    return df


def delete_meaningless_strings(df):
    """Delete meaningless strings from the dataset"""
    logger.info("Deleting meaningless strings from the dataset")
    initial_len = len(df)

    df["Dx"] = df["Dx"].str.lower()

    def search_meaningless_str(text):
        found = re.search("^x+$", text)
        if found is not None:
            return True
        else:
            return False

    df_x_strings = df[df["Dx"].apply(search_meaningless_str)]
    logger.info(
        f"Number of meaningless string:\t {len(df_x_strings)} - {len(df_x_strings) / initial_len * 100:.1f}%"
    )

    df = df.drop(df_x_strings.index)

    return df


def generate_embeddings(df):
    """Generate embeddings for the dataset"""
    # Create a TaggedDocument object
    tagged_data = [
        TaggedDocument(words=text.split(), tags=[str(i)]) for i, text in enumerate(df["Dx"])
    ]

    # Create a Doc2Vec model
    model = Doc2Vec(vector_size=100, window=2, min_count=1, workers=4)
    model.build_vocab(tagged_data)
    model.train(tagged_data, total_examples=model.corpus_count, epochs=40)

    df["doc2vec"] = [model.dv[str(i)] for i in range(len(tagged_data))]

    df_doc2vec = pd.DataFrame(df["doc2vec"].tolist(), index=df.index)
    df = pd.concat([df.drop("doc2vec", axis=1), df_doc2vec], axis=1)

    return df


if __name__ == "__main__":
    """Main preprocessing function"""
    args = parse_args()

    # Load data
    df = load_data()

    # Clean duplicates
    df = clean_duplicates(df)

    # Handle missing values
    df = handle_missing_values(df)

    # Convert values to absolute values
    df = absolute_values(df)

    # Outliers for 'tiempo_total' (IQR) interquartile range
    df = remove_outliers(df, "tiempo_total")

    # Outliers for 'edad' (IQR) interquartile range
    df = remove_outliers(df, "edad")

    # Obtain gender
    df = gender_encoding(df)

    # Encode categorical variables
    df = encode_categorical(df)

    # Extract time features
    df = process_dates(df)

    # Removal of punctuation
    df = remove_punctuation(df)

    # Remove empty string
    df = remove_empty_string(df)

    # Delete meaningless strings
    df = delete_meaningless_strings(df)

    # Generate embeddings
    df = generate_embeddings(df)

    df = df.drop(
        columns=["nombre", "apat", "amat", "Fecha", "Dx", "edad_unidad", "genero", "clasificacion"]
    )
    logger.info(f"Dropped columns: {df.columns}")
    logger.info(f"Processed data shape: {df.shape}")

    # Split into train, validation, and test sets
    logger.debug("Splitting data into train, validation, and test sets")
    x = df.drop(columns=["tiempo_total"])
    y = df["tiempo_total"]

    x_train, x_val, y_train, y_val = train_test_split(
        x, y, test_size=args.test_ratio, random_state=42
    )
    x_val, x_test, y_val, y_test = train_test_split(
        x_val, y_val, test_size=args.validation_ratio, random_state=42
    )

    train_df = pd.concat([y_train, x_train], axis=1)
    val_df = pd.concat([y_val, x_val], axis=1)
    test_df = pd.concat([y_test, x_test], axis=1)
    dataset_df = pd.concat([y, x], axis=1)

    logger.info("Train data shape after preprocessing: {}".format(train_df.shape))
    logger.info("Validation data shape after preprocessing: {}".format(val_df.shape))
    logger.info("Test data shape after preprocessing: {}".format(test_df.shape))

    # Save processed datasets to the local paths in the processing container.
    # SageMaker will upload the contents of these paths to S3 bucket
    local_dir = "/opt/ml/processing"
    logger.debug("Writing processed datasets to container local path.")
    train_output_path = os.path.join(f"{local_dir}/train", "train.csv")
    validation_output_path = os.path.join(f"{local_dir}/val", "validation.csv")
    test_output_path = os.path.join(f"{local_dir}/test", "test.csv")
    full_processed_output_path = os.path.join(f"{local_dir}/full", "dataset.csv")

    logger.info("Saving train data to {}".format(train_output_path))
    train_df.to_csv(train_output_path, index=False)

    logger.info("Saving validation data to {}".format(validation_output_path))
    val_df.to_csv(validation_output_path, index=False)

    logger.info("Saving test data to {}".format(test_output_path))
    test_df.to_csv(test_output_path, index=False)

    logger.info("Saving full processed data to {}".format(full_processed_output_path))
    dataset_df.to_csv(full_processed_output_path, index=False)


In [None]:
# Upload processing script to S3
s3_client.upload_file(
    Filename="process.py",
    Bucket=write_bucket,
    Key=f"{write_prefix}/scripts/process.py",
)

# Define the SKLearnProcessor configuration
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=sagemaker_role,
    instance_count=1,
    instance_type=process_instance_type,
    base_job_name=f"{base_job_name_prefix}-processing",
)

# Define pipeline processing step
process_step = ProcessingStep(
    name="DataProcessing",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=hospital_data_uri, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(
            destination=f"{processing_output_uri}/train_data",
            output_name="train_data",
            source="/opt/ml/processing/train",
        ),
        ProcessingOutput(
            destination=f"{processing_output_uri}/validation_data",
            output_name="validation_data",
            source="/opt/ml/processing/val",
        ),
        ProcessingOutput(
            destination=f"{processing_output_uri}/test_data",
            output_name="test_data",
            source="/opt/ml/processing/test",
        ),
        ProcessingOutput(
            destination=f"{processing_output_uri}/processed_data",
            output_name="processed_data",
            source="/opt/ml/processing/full",
        ),
    ],
    job_arguments=["--train-ratio", "0.8", "--validation-ratio", "0.1", "--test-ratio", "0.1"],
    code=f"{scripts_uri}/preprocessing.py",
)

## Train model step

In [None]:
%%writefile train.py
import argparse
import json
import os
import logging

import joblib
import pandas as pd
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import numpy as np
import xgboost as xgb

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


def parse_args():
    """Parse job arguments"""
    parser = argparse.ArgumentParser()
    parser.add_argument("--num_round", type=int, default=100)
    parser.add_argument("--max_depth", type=int, default=6)
    parser.add_argument("--eta", type=float, default=0.3)
    parser.add_argument("--subsample", type=float, default=0.9)
    parser.add_argument("--colsample_bytree", type=float, default=0.8)
    parser.add_argument("--objective", type=str, default="reg:squarederror")
    parser.add_argument("--eval_metric", type=str, default="rmse")
    parser.add_argument("--nfold", type=int, default=5)
    parser.add_argument("--early_stopping_rounds", type=int, default=10)

    # SageMaker specific arguments
    parser.add_argument("--train_data_dir", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    parser.add_argument(
        "--validation_data_dir", type=str, default=os.environ.get("SM_CHANNEL_VALIDATION")
    )
    parser.add_argument("--model_dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
    parser.add_argument(
        "--output_data_dir", type=str, default=os.environ.get("SM_OUTPUT_DATA_DIR")
    )
    args, _ = parser.parse_known_args()
    logger.info(f"Received arguments {args}")
    return args


def load_data(data_dir, filename):
    """
    Load data from the specified directory

    Args:
        data_dir (str): Directory containing the data file
        filename (str): Name of the data file

    Returns:
        tuple: Features DataFrame and labels Series
    """
    data = pd.read_csv(f"{data_dir}/{filename}")
    features = data.drop("tiempo_total", axis=1)
    labels = data["tiempo_total"]
    return features, labels


def calculate_regression_metrics(y_true, y_pred):
    """
    Calculate regression metrics

    Args:
        y_true: True values
        y_pred: Predicted values

    Returns:
        dict: Dictionary containing regression metrics
    """
    mse = mean_squared_error(y_true, y_pred)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)

    return {
        "mse": mse,
        "rmse": rmse,
        "mae": mae,
        "r2": r2
    }


def train_model(args, dtrain, dvalidation):
    """
    Train XGBoost regression model with cross-validation

    Args:
        args: Parsed command line arguments
        dtrain: Training data as DMatrix
        dvalidation: Validation data as DMatrix

    Returns:
        tuple: Trained model and metrics
    """
    params = {
        "max_depth": args.max_depth,
        "eta": args.eta,
        "objective": args.objective,
        "subsample": args.subsample,
        "colsample_bytree": args.colsample_bytree,
    }

    # Run cross-validation
    cv_results = xgb.cv(
        params=params,
        dtrain=dtrain,
        num_boost_round=args.num_round,
        nfold=args.nfold,
        early_stopping_rounds=args.early_stopping_rounds,
        metrics=[args.eval_metric],
        seed=42,
    )

    # Train final model
    evallist = [(dtrain, 'train'), (dvalidation, 'validation')]
    model = xgb.train(
        params=params,
        dtrain=dtrain,
        num_boost_round=len(cv_results),
        evals=evallist,
        early_stopping_rounds=args.early_stopping_rounds
    )

    # Generate predictions
    train_pred = model.predict(dtrain)
    validation_pred = model.predict(dvalidation)

    # Calculate metrics
    train_metrics = calculate_regression_metrics(dtrain.get_label(), train_pred)
    validation_metrics = calculate_regression_metrics(dvalidation.get_label(), validation_pred)

    metrics = {
        "train": train_metrics,
        "validation": validation_metrics
    }

    return model, metrics


def save_model_artifacts(model, metrics, args):
    """
    Save model artifacts and metrics

    Args:
        model: Trained XGBoost model
        metrics (dict): Model metrics
        args: Parsed command line arguments
    """
    metrics_data = {
        "hyperparameters": {
            "max_depth": args.max_depth,
            "eta": args.eta,
            "objective": args.objective,
            "subsample": args.subsample,
            "colsample_bytree": args.colsample_bytree,
        },
        "regression_metrics": {
            "validation": {
                "rmse": metrics["validation"]["rmse"],
                "mae": metrics["validation"]["mae"],
                "r2": metrics["validation"]["r2"]
            },
            "train": {
                "rmse": metrics["train"]["rmse"],
                "mae": metrics["train"]["mae"],
                "r2": metrics["train"]["r2"]
            }
        }
    }

    # Save metrics
    metrics_location = os.path.join(args.output_data_dir, "metrics.json")
    with open(metrics_location, "w") as f:
        json.dump(metrics_data, f)

    # Save model
    model_location = os.path.join(args.model_dir, "xgboost-model")
    with open(model_location, "wb") as f:
        joblib.dump(model, f)


if __name__ == "__main__":
    """Main training function"""
    args = parse_args()

    # Load data
    train_features, train_labels = load_data(args.train_data_dir, "train.csv")
    validation_features, validation_labels = load_data(args.validation_data_dir, "validation.csv")

    # Create DMatrix objects
    dtrain = xgb.DMatrix(train_features, label=train_labels)
    dvalidation = xgb.DMatrix(validation_features, label=validation_labels)

    # Train model
    model, metrics = train_model(args, dtrain, dvalidation)

    # Log metrics
    logger.info("Training metrics:")
    logger.info(f"RMSE: {metrics['train']['rmse']:.4f}")
    logger.info(f"MAE: {metrics['train']['mae']:.4f}")
    logger.info(f"R2: {metrics['train']['r2']:.4f}")

    logger.info("\nValidation metrics:")
    logger.info(f"RMSE: {metrics['validation']['rmse']:.4f}")
    logger.info(f"MAE: {metrics['validation']['mae']:.4f}")
    logger.info(f"R2: {metrics['validation']['r2']:.4f}")

    # Save artifacts
    save_model_artifacts(model, metrics, args)



In [None]:
# Set XGBoost model hyperparameters
hyperparams = {
    "eval_metric": "rmse",
    "objective": "reg:squarederror",
    "num_round": "100",
    "max_depth": "6",
    "subsample": "0.9",
    "colsample_bytree": "0.8",
    "eta": "0.3",
}

# Set XGBoost estimator
xgb_estimator = XGBoost(
    entry_point="train.py",
    output_path=estimator_output_uri,
    code_location=estimator_output_uri,
    hyperparameters=hyperparams,
    role=sagemaker_role,
    # Fetch instance type and count from pipeline parameters
    instance_count=train_instance_count,
    instance_type=train_instance_type,
    framework_version="1.3-1",
)

# Access the location where the preceding processing step saved train and validation datasets
# Pipeline step properties can give access to outputs which can be used in succeeding steps

# Set pipeline training step
train_step = TrainingStep(
    name="TrainModel",
    estimator=xgb_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=process_step.properties.ProcessingOutputConfig.Outputs[
                "train_data"
            ].S3Output.S3Uri,
            content_type="csv",
            s3_data_type="S3Prefix",
        ),
        "validation": TrainingInput(
            s3_data=process_step.properties.ProcessingOutputConfig.Outputs[
                "validation_data"
            ].S3Output.S3Uri,
            content_type="csv",
            s3_data_type="S3Prefix",
        ),
    },
)

## Create model step

In [None]:
# Create a SageMaker model
model = sagemaker.model.Model(
    image_uri=training_image,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sess,
    role=sagemaker_role,
)

# Specify model deployment instance type
inputs = sagemaker.inputs.CreateModelInput(instance_type=deploy_instance_type_param)

create_model_step = CreateModelStep(name="CreateModel", model=model, inputs=inputs)

## Evaluate model step

In [None]:
%%writefile evaluate.py
import json
import logging
import pathlib
import pickle
import tarfile

import pandas as pd
from sklearn.metrics import mean_absolute_error, r2_score, root_mean_squared_error
import xgboost as xgb

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


def load_model():
    """Load model from training script"""
    model_path = "/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    logger.debug("Loading xgboost model.")
    # The name of the file should match how the model was saved in the training script
    model = pickle.load(open("xgboost-model", "rb"))
    return model


def load_data():
    """Loada data from input path"""
    logger.debug("Reading test data.")
    input_path = "/opt/ml/processing/test/test.csv"
    df_test = pd.read_csv(input_path)

    # Extract test set target column
    y_test = df_test["tiempo_total"]

    # Extract test set feature columns
    X = df_test.drop("tiempo_total", axis=1)
    x_test = xgb.DMatrix(X)
    return x_test, y_test


def calculate_regression_metrics(x_test, y_test, model):
    """Calculate regression metrics"""
    logger.info("Generating predictions for test data.")
    pred = model.predict(x_test)

    # Calculate model evaluation score
    logger.debug("Calculating regression metrics.")
    rmse = root_mean_squared_error(y_test, pred)
    mae = mean_absolute_error(y_test, pred)
    r2 = r2_score(y_test, pred)

    metric_dict = {"regression_metrics": {"test": {"rmse": rmse, "mae": mae, "r2": r2}}}
    return metric_dict


def save_model_evaluation(metric_dict):
    """Save model evaluation metrics"""
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    logger.info(f"Writing evaluation report with regression metrics: {metric_dict}")
    evaluation_location = f"{output_dir}/evaluation.json"
    with open(evaluation_location, "w") as f:
        json.dump(metric_dict, f)


if __name__ == "__main__":
    """Main evaluate function"""
    # Load model
    model = load_data()

    # Load data
    x_test, y_test = load_data()

    # Calculate regression metric
    test_metrics = calculate_regression_metrics(x_test, y_test, model)

    # Save model evaluation metrics
    save_model_evaluation(test_metrics)


In [None]:
# Upload model evaluation script to S3
s3_client.upload_file(
    Filename="evaluate.py", Bucket=write_bucket, Key=f"{write_prefix}/scripts/evaluate.py"
)

eval_processor = ScriptProcessor(
    image_uri=training_image,
    command=["python3"],
    instance_type=predictor_instance_type,
    instance_count=predictor_instance_count,
    base_job_name=f"{base_job_name_prefix}-model-eval",
    sagemaker_session=sess,
    role=sagemaker_role,
)
evaluation_report = PropertyFile(
    name="HospitalWaitTimePredReport",
    output_name="evaluation",
    path="evaluation.json",
)

# Set model evaluation step
evaluation_step = ProcessingStep(
    name="EvaluateModel",
    processor=eval_processor,
    inputs=[
        ProcessingInput(
            # Fetch S3 location where train step saved model artifacts
            source=train_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            # Fetch S3 location where processing step saved test data
            source=process_step.properties.ProcessingOutputConfig.Outputs[
                "test_data"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(
            destination=f"{model_eval_output_uri}",
            output_name="evaluation",
            source="/opt/ml/processing/evaluation",
        ),
    ],
    code=f"s3://{write_bucket}/{write_prefix}/scripts/evaluate.py",
    property_files=[evaluation_report],
)

## Register model step

In [None]:
# Define register model step
register_step = RegisterModel(
    name="RegisterModel",
    estimator=xgb_estimator,
    # Fetching S3 location where train step saved model artifacts
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=[predictor_instance_type],
    transform_instances=[predictor_instance_type],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status_param,
)

## Deploy model step

In [None]:
%%writefile lambda_deployer.py

"""
Lambda function creates an endpoint configuration and deploys a model to real-time endpoint. 
Required parameters for deployment are retrieved from the event object
"""

import json
import boto3


def lambda_handler(event, context):
    sm_client = boto3.client("sagemaker")

    # Details of the model created in the Pipeline CreateModelStep
    model_name = event["model_name"]
    model_package_arn = event["model_package_arn"]
    endpoint_config_name = event["endpoint_config_name"]
    endpoint_name = event["endpoint_name"]
    role = event["role"]
    instance_type = event["instance_type"]
    instance_count = event["instance_count"]
    primary_container = {"ModelPackageName": model_package_arn}

    # Create model
    model = sm_client.create_model(
        ModelName=model_name,
        PrimaryContainer=primary_container,
        ExecutionRoleArn=role
    )

    # Create endpoint configuration
    create_endpoint_config_response = sm_client.create_endpoint_config(
        EndpointConfigName=endpoint_config_name,
        ProductionVariants=[
        {
            "VariantName": "Alltraffic",
            "ModelName": model_name,
            "InitialInstanceCount": instance_count,
            "InstanceType": instance_type,
            "InitialVariantWeight": 1
        }
        ]
    )

    # Create endpoint
    create_endpoint_response = sm_client.create_endpoint(
        EndpointName=endpoint_name, 
        EndpointConfigName=endpoint_config_name
    )

In [None]:
# The function name must contain sagemaker
function_name = "sagemaker-hospital-wait-time-lambda-step"
# Define Lambda helper class can be used to create the Lambda function required in the Lambda step
func = Lambda(
    function_name=function_name,
    execution_role_arn=sagemaker_role,
    script="lambda_deployer.py",
    handler="lambda_deployer.lambda_handler",
    timeout=600,
    memory_size=10240,
)

# The inputs used in the lambda handler are passed through the inputs argument in the
# LambdaStep and retrieved via the `event` object within the `lambda_handler` function

lambda_deploy_step = LambdaStep(
    name="LambdaStepRealTimeDeploy",
    lambda_func=func,
    inputs={
        "model_name": pipeline_model_name,
        "endpoint_config_name": endpoint_config_name,
        "endpoint_name": endpoint_name,
        "model_package_arn": register_step.steps[0].properties.ModelPackageArn,
        "role": sagemaker_role,
        "instance_type": deploy_instance_type_param,
        "instance_count": deploy_instance_count_param,
    },
)

## Conditional step

In [None]:
# Evaluate model performance on test set
cond_gte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=evaluation_step.name,
        property_file=evaluation_report,
        json_path="regression_metrics.test.rmse",
    ),
    right=0.4,  # Threshold to compare model performance against
)
condition_step = ConditionStep(
    name="CheckEvaluation",
    conditions=[cond_gte],
    if_steps=[
        create_model_step,
        register_step,
        lambda_deploy_step,
    ],
    else_steps=[],
)

## Build and run the pipeline

In [None]:
# Create the Pipeline with all component steps and parameters
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        process_instance_type_param,
        train_instance_type_param,
        train_instance_count_param,
        deploy_instance_type_param,
        deploy_instance_count_param,
        model_approval_status_param,
    ],
    steps=[process_step, train_step, evaluation_step, condition_step],
    sagemaker_session=sess,
)

In [None]:
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)

# Full Pipeline description
pipeline_definition = json.loads(pipeline.describe()["PipelineDefinition"])
pipeline_definition

In [None]:
# Execute Pipeline
start_response = pipeline.start()