# SageMaker Training with MLflow

This notebook's CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.

![This us-west-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://prod.us-west-2.tcx-beacon.docs.aws.dev/sagemaker-nb/us-west-2/ml_ops|sm-mlflow_training|sm-mlflow_training.ipynb)

## Setup environment

Import necessary libraries

In [19]:
import sagemaker
from sagemaker import get_execution_role
import subprocess, json
import os
import pandas as pd
from sklearn.model_selection import train_test_split
from sagemaker.inputs import TrainingInput
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.tuner import HyperparameterTuner, IntegerParameter, ContinuousParameter
from datetime import datetime
import boto3
import tarfile, time
import joblib
from sklearn.metrics import roc_auc_score, accuracy_score

GIT_SHA = subprocess.getoutput("git rev-parse --short HEAD") if os.path.exists(".git") else "unknown"

Declare some variables used later

In [20]:
# Define session, role, and region so we can
# perform any SageMaker tasks we need
sagemaker_session = sagemaker.Session()
role = get_execution_role()
region = sagemaker_session.boto_region_name

# Bucket S3
bucket = "i32419"

# S3 prefix for the training dataset to be uploaded to
prefix = "ai-deployment-monitoring-grupo-5/aidm-loan-default"

# MLflow
tracking_server_arn = "arn:aws:sagemaker:eu-west-1:267567228900:mlflow-tracking-server/aidm"

In [21]:
!mkdir -p training_code

## Get some training data

Carregar o dataset e tratar os valores omissos da variável y

In [22]:
df = pd.read_csv("Dataset/Loan_Default.csv")

# Remover linhas sem target
before = len(df)
df = df.dropna(subset=["Status"]).copy()
after = len(df)
print(f"Dropped {before - after} rows with NaN Status")

# Garantir target binário inteiro
df["Status"] = df["Status"].astype(int)
print("Unique Status values:", sorted(df["Status"].unique()))

os.makedirs("./data", exist_ok=True)

# Target binário
assert "Status" in df.columns, "Coluna target 'Status' não encontrada"

# Remover identificador (não é feature)
if "ID" in df.columns:
    df = df.drop(columns=["ID"])

# Remover year porque o year é sempre 2019
if "year" in df.columns:
    df = df.drop(columns=["year"])

# Split train/validation
train_df, val_df = train_test_split(
    df,
    test_size=0.2,
    random_state=42,
    stratify=df["Status"]
)

train_path = "./data/train.csv"
val_path = "./data/validation.csv"
train_df.to_csv(train_path, index=False)
val_df.to_csv(val_path, index=False)

print("Train shape:", train_df.shape)
print("Validation shape:", val_df.shape)
print("Target distribution (train):")
print(train_df["Status"].value_counts(normalize=True))

Dropped 1 rows with NaN Status
Unique Status values: [0, 1]
Train shape: (26329, 32)
Validation shape: (6583, 32)
Target distribution (train):
Status
0    0.756732
1    0.243268
Name: proportion, dtype: float64


And now let's upload that data to S3

In [23]:
WORK_DIRECTORY = "data"

train_s3_uri = sagemaker_session.upload_data(
    path="./data/train.csv",
    bucket=bucket,
    key_prefix=f"{prefix}/{WORK_DIRECTORY}/train"
)

val_s3_uri = sagemaker_session.upload_data(
    path="./data/validation.csv",
    bucket=bucket,
    key_prefix=f"{prefix}/{WORK_DIRECTORY}/validation"
)

train_input = TrainingInput(train_s3_uri, content_type="text/csv")
validation_input = TrainingInput(val_s3_uri, content_type="text/csv")

print("Train S3:", train_s3_uri)
print("Validation S3:", val_s3_uri)

Train S3: s3://i32419/ai-deployment-monitoring-grupo-5/aidm-loan-default/data/train/train.csv
Validation S3: s3://i32419/ai-deployment-monitoring-grupo-5/aidm-loan-default/data/validation/validation.csv


## Write your training script

Let's write the code to train a Decision Tree model using the scikit-learn framework

In [24]:
%%writefile training_code/train.py
import argparse
import json
import os
import pathlib

import numpy as np
import pandas as pd
import joblib

from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder
from sklearn.metrics import roc_auc_score, accuracy_score

from xgboost import XGBClassifier

import mlflow

def _load_csv(channel_name: str) -> pd.DataFrame:
    base = pathlib.Path("/opt/ml/input/data") / channel_name
    csv_files = list(base.glob("*.csv"))
    if not csv_files:
        raise FileNotFoundError(f"No CSV found in channel: {base}")
    return pd.read_csv(csv_files[0])


def main():
    parser = argparse.ArgumentParser()

    # XGBoost hyperparameters (keep small for short HPO runs)
    parser.add_argument("--n_estimators", type=int, default=200)
    parser.add_argument("--max_depth", type=int, default=6)
    parser.add_argument("--learning_rate", type=float, default=0.1)
    parser.add_argument("--subsample", type=float, default=0.8)
    parser.add_argument("--colsample_bytree", type=float, default=0.8)

    args = parser.parse_args()

    train_df = _load_csv("train")
    val_df = _load_csv("validation")

    if "Status" not in train_df.columns:
        raise ValueError("Target column 'Status' not found")

    y_train = train_df["Status"].astype(int)
    X_train = train_df.drop(columns=["Status"])

    y_val = val_df["Status"].astype(int)
    X_val = val_df.drop(columns=["Status"])

    cat_cols = X_train.select_dtypes(include=["object"]).columns.tolist()
    num_cols = [c for c in X_train.columns if c not in cat_cols]

    numeric_transformer = Pipeline(
        steps=[("imputer", SimpleImputer(strategy="median"))]
    )

    categorical_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="most_frequent")),
            ("onehot", OneHotEncoder(handle_unknown="ignore")),
        ]
    )

    preprocessor = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, num_cols),
            ("cat", categorical_transformer, cat_cols),
        ],
        remainder="drop",
    )

    clf = XGBClassifier(
        n_estimators=args.n_estimators,
        max_depth=args.max_depth,
        learning_rate=args.learning_rate,
        subsample=args.subsample,
        colsample_bytree=args.colsample_bytree,
        objective="binary:logistic",
        eval_metric="auc",
        tree_method="hist",
        n_jobs=1,
        random_state=42,
    )

    model = Pipeline(
        steps=[
            ("preprocess", preprocessor),
            ("clf", clf),
        ]
    )

    tracking_arn = os.environ.get("MLFLOW_TRACKING_ARN")
    if not tracking_arn:
        raise ValueError("Missing env var MLFLOW_TRACKING_ARN")

    mlflow.set_tracking_uri(tracking_arn)
    mlflow.set_experiment(os.environ.get("MLFLOW_EXPERIMENT_NAME", "grupo-5-aidm-loan-default"))

    sm_env = json.loads(os.environ.get("SM_TRAINING_ENV", "{}"))
    training_job_name = sm_env.get("job_name", "unknown")

    with mlflow.start_run(run_name=training_job_name):
        model.fit(X_train, y_train)

        val_proba = model.predict_proba(X_val)[:, 1]
        val_pred = (val_proba >= 0.5).astype(int)

        auc = roc_auc_score(y_val, val_proba)
        acc = accuracy_score(y_val, val_pred)

        mlflow.log_params(
            {
                "n_estimators": args.n_estimators,
                "max_depth": args.max_depth,
                "learning_rate": args.learning_rate,
                "subsample": args.subsample,
                "colsample_bytree": args.colsample_bytree,
                "model_type": "sklearn_pipeline_xgbclassifier",
                "num_features": len(num_cols),
                "cat_features": len(cat_cols),
            }
        )
        mlflow.log_metrics({"validation_auc": float(auc), "validation_accuracy": float(acc)})

        mlflow.set_tags(
            {
                "training_job_name": training_job_name,
                "dataset": "Loan_Default.csv",
                "task": "binary_classification",
                "target": "Status",
                "git_sha": os.environ.get("GIT_SHA", "unknown"),
            }
        )

        metrics_path = "/opt/ml/output/metrics.json"
        os.makedirs(os.path.dirname(metrics_path), exist_ok=True)
        with open(metrics_path, "w") as f:
            json.dump({"validation_auc": float(auc), "validation_accuracy": float(acc)}, f)
        mlflow.log_artifact(metrics_path)

        model_dir = os.environ.get("SM_MODEL_DIR", "/opt/ml/model")
        os.makedirs(model_dir, exist_ok=True)
        joblib.dump(model, os.path.join(model_dir, "model.joblib"))

        print(f"validation_auc: {auc}")


if __name__ == "__main__":
    main()


Overwriting training_code/train.py


Since we're using MLflow in our training script, let's make sure the container installs `mlflow` along with our MLflow AWS plugin before running our training script. We can do this by creating a `requirements.txt` file and putting it in the same directory as our training script.

In [25]:
%%writefile training_code/requirements.txt
mlflow==2.13.2
sagemaker-mlflow==0.1.0
xgboost==2.0.3

Overwriting training_code/requirements.txt


## SageMaker Training and MLflow

Train your Decision tree model by launching a SageMaker Training job.

In [26]:
output_path = f"s3://{bucket}/{prefix}/training-output"

sklearn = SKLearn(
    entry_point="train.py",
    source_dir="training_code",
    framework_version="1.2-1",
    instance_type="ml.c4.xlarge",
    instance_count=1,
    role=role,
    sagemaker_session=sagemaker_session,
    hyperparameters={
        "n_estimators": 200,
        "max_depth": 6,
        "learning_rate": 0.1,
        "subsample": 0.8,
        "colsample_bytree": 0.8,
    },
    keep_alive_period_in_seconds=3600,
    environment={
        "MLFLOW_TRACKING_ARN": tracking_server_arn,
        "MLFLOW_EXPERIMENT_NAME": "grupo-5-aidm-loan-default",
        "GIT_SHA": GIT_SHA,
    },
)

In [27]:
timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S")
job_name = f"grupo-5-aidm-loan-default-{timestamp}"

sklearn.fit({"train": train_input, "validation": validation_input},
           job_name=job_name)

  timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S")
INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.
INFO:sagemaker:Creating training-job with name: grupo-5-aidm-loan-default-20260111-222006


2026-01-11 22:20:08 Starting - Starting the training job...
2026-01-11 22:20:21 Starting - Preparing the instances for training...
2026-01-11 22:20:49 Downloading - Downloading input data......
2026-01-11 22:21:45 Downloading - Downloading the training image...
  import pkg_resources[0m
[34m2026-01-11 22:22:37,177 sagemaker-containers INFO     Imported framework sagemaker_sklearn_container.training[0m
[34m2026-01-11 22:22:37,181 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2026-01-11 22:22:37,184 sagemaker-training-toolkit INFO     No Neurons detected (normal if no neurons installed)[0m
[34m2026-01-11 22:22:37,200 sagemaker_sklearn_container.training INFO     Invoking user training script.[0m
[34m2026-01-11 22:22:37,454 sagemaker-training-toolkit INFO     Installing dependencies from requirements.txt[0m
[34mCollecting mlflow==2.13.2 (from -r requirements.txt (line 1))
  Downloading mlflow-2.13.2-py3-none-any.whl.metadata (29 kB)

## Hyperparameter tuning 

In [28]:
# Estimator para HPO (reutiliza o mesmo train.py e source_dir)
hpo_estimator = SKLearn(
    entry_point="train.py",
    source_dir="training_code",
    framework_version="1.2-1",
    instance_type="ml.c4.xlarge",
    instance_count=1,
    role=role,
    sagemaker_session=sagemaker_session,
    output_path=output_path,
    keep_alive_period_in_seconds=3600,
    environment={
        "MLFLOW_TRACKING_ARN": tracking_server_arn,
        "MLFLOW_EXPERIMENT_NAME": "grupo-5-aidm-loan-default",
        "GIT_SHA": GIT_SHA,
    },
)


Definir como o tuner “encontra” a métrica nos logs

In [29]:
metric_definitions = [
    {"Name": "validation_auc", "Regex": r"validation_auc:\s*([0-9\.]+)"}
]

Definir o espaço de hiperparâmetros (curto e realista)

In [30]:
hyperparameter_ranges = {
    "n_estimators": IntegerParameter(50, 300),
    "max_depth": IntegerParameter(3, 10),
    "learning_rate": ContinuousParameter(0.03, 0.3),
    "subsample": ContinuousParameter(0.6, 1.0),
    "colsample_bytree": ContinuousParameter(0.6, 1.0),
}

Criar o tuner (job curto)

In [31]:
tuner = HyperparameterTuner(
    estimator=hpo_estimator,
    objective_metric_name="validation_auc",
    hyperparameter_ranges=hyperparameter_ranges,
    metric_definitions=metric_definitions,
    objective_type="Maximize",
    max_jobs=4,
    max_parallel_jobs=2,
    base_tuning_job_name=f"grupo-5-aidm-loan-default-hpo-{timestamp}",
)

Lançar o tuning job usando os mesmos channels

In [32]:
tuner.fit({"train": train_input, "validation": validation_input})

INFO:sagemaker:Creating hyperparameter tuning job with name: grupo-5-aidm-loan-de-260111-2226


..........................................................!


Obter o melhor job e o melhor modelo (para Registry/BYOC)

In [33]:
best_training_job = tuner.best_training_job()
print("Best training job:", best_training_job)

best_model_s3 = tuner.best_estimator().model_data
print("Best model artifact:", best_model_s3)

Best training job: grupo-5-aidm-loan-de-260111-2226-002-23c01fb8

2026-01-11 22:30:34 Starting - Preparing the instances for training
2026-01-11 22:30:34 Downloading - Downloading the training image
2026-01-11 22:30:34 Training - Training image download completed. Training in progress.
2026-01-11 22:30:34 Uploading - Uploading generated training model
2026-01-11 22:30:34 Completed - Resource reused by training job: grupo-5-aidm-loan-de-260111-2226-003-a71f1c7d
Best model artifact: s3://i32419/ai-deployment-monitoring-grupo-5/aidm-loan-default/training-output/grupo-5-aidm-loan-de-260111-2226-002-23c01fb8/output/model.tar.gz


## Registry

In [42]:
# Imagem BYOC (ECR) usada no endpoint
account_id = boto3.client('sts').get_caller_identity()['Account']
algorithm_name = "aidm-grupo-5-loan-default"

image_uri = f"{account_id}.dkr.ecr.{region}.amazonaws.com/{algorithm_name}:{GIT_SHA}"

# Model Registry naming
model_package_group_name = "aidm-grupo-5-loan-default"
model_package_description = "Loan default binary classifier (BYOC) - Grupo 5 - AIDM"

## Avaliação do modelo para gerar métricas (a partir do melhor modelo)

In [43]:
# Clientes
sm = boto3.client("sagemaker", region_name=region)
sm_s3 = boto3.client("s3", region_name=region)

# Nome do tuning job (já tens)
tuning_job_name = "grupo-5-aidm-loan-de-260111-2226"

# Descrever tuning job e obter best training job + best AUC
tuning_desc = sm.describe_hyper_parameter_tuning_job(
    HyperParameterTuningJobName=tuning_job_name
)

best = tuning_desc["BestTrainingJob"]
best_training_job = best["TrainingJobName"]
best_auc = float(best["FinalHyperParameterTuningJobObjectiveMetric"]["Value"])

print("Best training job:", best_training_job)
print("Best validation_auc (objective):", best_auc)

# Garantir ModelDataUrl correto via API (fonte de verdade)
best_job_desc = sm.describe_training_job(TrainingJobName=best_training_job)
best_model_s3 = best_job_desc["ModelArtifacts"]["S3ModelArtifacts"]

print("Best model artifact (S3):", best_model_s3)

# Criar metrics.json para o Model Registry (aqui usamos AUC)
metrics_payload = {
    "binary_classification_metrics": {
        "auc": {"value": float(best_auc), "standard_deviation": "NaN"},
    },
    "metadata": {
        "tuning_job": tuning_job_name,
        "best_training_job": best_training_job,
        "git_sha": GIT_SHA,
        "dataset": "Loan_Default.csv",
        "target": "Status",
    }
}

local_metrics_path = "/tmp/metrics.json"
with open(local_metrics_path, "w") as f:
    json.dump(metrics_payload, f)

Best training job: grupo-5-aidm-loan-de-260111-2226-002-23c01fb8
Best validation_auc (objective): 1.0
Best model artifact (S3): s3://i32419/ai-deployment-monitoring-grupo-5/aidm-loan-default/training-output/grupo-5-aidm-loan-de-260111-2226-002-23c01fb8/output/model.tar.gz


## Carregar metrics.json para S3 (para anexar ao Model Package)

In [44]:
metrics_prefix = f"{prefix}/model-registry/metrics"
metrics_key = f"{metrics_prefix}/metrics-{int(time.time())}.json"

sm_s3.upload_file(local_metrics_path, bucket, metrics_key)
metrics_s3_uri = f"s3://{bucket}/{metrics_key}"

print("Metrics uploaded to:", metrics_s3_uri)

Metrics uploaded to: s3://i32419/ai-deployment-monitoring-grupo-5/aidm-loan-default/model-registry/metrics/metrics-1768172692.json


## Criar Model Package Group

In [49]:
group_tags = [
    {"Key": "image_repo", "Value": algorithm_name},
    {"Key": "project", "Value": "ai-deployment-monitoring"},
    {"Key": "group", "Value": "5"},
    {"Key": "dataset", "Value": "Loan_Default.csv"},
    {"Key": "target", "Value": "Status"},
]

try:
    sm.create_model_package_group(
        ModelPackageGroupName=model_package_group_name,
        ModelPackageGroupDescription="AIDM Grupo 5 - Loan Default - BYOC",
        Tags=group_tags,
    )
    print("Created ModelPackageGroup with tags:", model_package_group_name)
except sm.exceptions.ResourceInUse:
    print("ModelPackageGroup already exists:", model_package_group_name)

## Criar Model Package (a versão do modelo no Registry)

In [48]:
create_resp = sm.create_model_package(
    ModelPackageGroupName=model_package_group_name,
    ModelPackageDescription=model_package_description,
    ModelApprovalStatus="PendingManualApproval",  # bom para mostrar governance
    InferenceSpecification={
        "Containers": [
            {
                "Image": image_uri,
                "ModelDataUrl": best_model_s3
            }
        ],
        # Suporta os formatos json e text/csv
        "SupportedContentTypes": ["application/json", "text/csv"],
        "SupportedResponseMIMETypes": ["application/json"],
    },
    ModelMetrics={
        "ModelQuality": {
            "Statistics": {
                "ContentType": "application/json",
                "S3Uri": metrics_s3_uri
            }
        }
    }
)

model_package_arn = create_resp["ModelPackageArn"]
print("Created ModelPackage:", model_package_arn)

Created ModelPackage: arn:aws:sagemaker:eu-west-1:267567228900:model-package/aidm-grupo-5-loan-default/1


## Aprovar programaticamente

In [None]:
#sm.update_model_package(
#    ModelPackageArn=model_package_arn,
#    ModelApprovalStatus="Approved"
#)
#print("Approved ModelPackage:", model_package_arn)