Offline: Get cleaned dataset with quality checks from offline code\
Do feature engineering here - modularize\
Set up XGBoost model - modularize\
Output results to S3

# Local only: Take a look at modeling data

In [None]:
import sys
import subprocess
import pkg_resources
import pandas as pd
import numpy as np
from math import sqrt
import sklearn
import calendar
from datetime import datetime
from dateutil.relativedelta import relativedelta

# model(s)
from sklearn.ensemble import RandomForestClassifier
import xgboost as xgb
from sklearn.model_selection import GridSearchCV

# train test split
from sklearn.model_selection import train_test_split

# eval metrics
from sklearn.metrics import precision_score, recall_score, precision_recall_curve, f1_score, classification_report, confusion_matrix, ConfusionMatrixDisplay

# visualization
import matplotlib.pyplot as plt
# import seaborn as sns # could not install w Python 3.13.1-could be a python version issue

In [None]:
assert f"Pandas version: {pd.__version__}" == "Pandas version: 2.2.3"
assert f"Sklearn version: {sklearn.__version__}" == "Sklearn version: 1.2.1"
print(f"Pandas version: {pd.__version__}")
print(f"Sklearn version: {sklearn.__version__}")

In [None]:
# List files in your S3 bucket
s3 = boto3.client('s3')
response = s3.list_objects_v2(
    Bucket=bucket,
    Prefix=prefix
)

if 'Contents' in response:
    for obj in response['Contents']:
        print(obj['Key'])
else:
    print(f"No objects found in {bucket}/{prefix}")

# !aws s3 ls s3://$bucket/sagemaker/renewal-forecast-pipeline/06132025175000/model-and-inference-data/ --no-sign-request | grep ".csv"  # only download files that are current (not previous versions)

In [None]:
# Download files from S3 bucket
!aws s3 cp s3://$bucket/sagemaker/renewal-forecast-pipeline/06132025175000/model-and-inference-data/ . --recursive --exclude "*" --include "*.csv" --exclude "*//*"

In [None]:
current_data = pd.read_csv("Renewal_Forecast_individual_modeling_dataset_FINAL_250613.csv")
inference_data = pd.read_csv("Renewal_Forecast_individual_inference_dataset_FINAL_250613.csv")

model_data, val_data = train_test_split(current_data, test_size=0.2, random_state=42, stratify=current_data['Renewed'])

model_data.head(10)
val_data.head(10)

In [None]:
display(model_data)

# Start a Conda env

In [None]:
# !conda create -n sagemaker_renforecast_env_2026 python=3.10.6 -y
# !conda install -n sagemaker_renforecast_env_2026 pandas==2.2.3 scikit-learn=1.2.1 numpy xgboost seaborn matplotlib category_encoders ipykernel sagemaker -y
# !conda run -n sagemaker_renforecast_env_2026 python -m ipykernel install --user --name sagemaker_renforecast_env_2026 --display-name "Python (sagemaker_renforecast_env_2026)"

After running the code above, refresh the page and select your desired kernel.

In [1]:
import sys
print(sys.executable)

/home/sagemaker-user/.conda/envs/sagemaker_renforecast_env_2026/bin/python


# Import packages

In [2]:
# Sagemaker session
import sagemaker # had a corrupt installation. solved by the block below this cell
import boto3
import re
from botocore.exceptions import ClientError
from sagemaker import get_execution_role
from sagemaker.workflow.parameters import ParameterString, ParameterFloat, ParameterInteger
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.model_step import ModelStep
from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor
from sagemaker.inputs import TrainingInput
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.model import Model
from sagemaker.transformer import Transformer
from sagemaker.workflow.steps import TransformStep
from sagemaker.model_metrics import MetricsSource, ModelMetrics

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [None]:
# Solve corrupt installation of the sagemaker package
# subprocess.check_call([sys.executable, "-m", "pip", "uninstall", "-y", "sagemaker", "sagemaker-core", "smdebug-rulesconfig"])
# subprocess.check_call([sys.executable, "-m", "pip", "install", "sagemaker"])
# Restart your kernel after this

In [3]:
print(dir(sagemaker))

['AlgorithmEstimator', 'AutoML', 'AutoMLDataChannel', 'AutoMLImageClassificationConfig', 'AutoMLInput', 'AutoMLJob', 'AutoMLJobV2', 'AutoMLTabularConfig', 'AutoMLTextClassificationConfig', 'AutoMLTextGenerationConfig', 'AutoMLTimeSeriesForecastingConfig', 'AutoMLV2', 'CandidateEstimator', 'CandidateStep', 'FactorizationMachines', 'FactorizationMachinesModel', 'FactorizationMachinesPredictor', 'FileSource', 'HyperparameterTuningJobAnalytics', 'IPInsights', 'IPInsightsModel', 'IPInsightsPredictor', 'KMeans', 'KMeansModel', 'KMeansPredictor', 'KNN', 'KNNModel', 'KNNPredictor', 'LDA', 'LDAModel', 'LDAPredictor', 'LinearLearner', 'LinearLearnerModel', 'LinearLearnerPredictor', 'LocalAutoMLDataChannel', 'LocalSession', 'MetricsSource', 'Model', 'ModelMetrics', 'ModelPackage', 'NTM', 'NTMModel', 'NTMPredictor', 'Object2Vec', 'Object2VecModel', 'PCA', 'PCAModel', 'PCAPredictor', 'PartnerAppAuthProvider', 'PipelineModel', 'Predictor', 'Processor', 'Profiler', 'ProfilerConfig', 'RandomCutForest'

In [4]:
print(sagemaker.__file__)

/home/sagemaker-user/.conda/envs/sagemaker_renforecast_env_2026/lib/python3.10/site-packages/sagemaker/__init__.py


In [5]:
print(f"SageMaker version: {sagemaker.__version__}")

SageMaker version: 2.246.0


# Set up Sagemaker session

In [6]:
session = sagemaker.Session()
bucket = session.default_bucket()

# dataSourceGroupName/dataSetName/ingestionTimeStamp/file.csv - this is for the RAP
prefix = 'xgb-renewal'
# model_artifact_prefix = 'sagemaker/renewal-forecast-pipeline/06132025175000/model-artifacts'
print('Your current bucket: ', bucket)

# Define IAM role
role = get_execution_role()
print("Your current role: ", role)

Your current bucket:  sagemaker-us-east-1-922182641966
Your current role:  arn:aws:iam::922182641966:role/service-role/SageMaker-ExecutionRole-20250328T105273


# Pipeline starts here

## Step 1: Clean data and engineer features - done outside of Sagemaker.

## Step 2: Scripts

In [43]:
# Create data preprocessing script (only for splitting training and validation data)
preprocessing_script = """
import os
import json
import pandas as pd
import numpy as np
import argparse
from sklearn.model_selection import train_test_split

def preprocess_data(input_path, output_path, test_size=0.2, val_size=0.2, random_state=42):
    # Preprocess data and split into train/validation/test sets

    print(f"Loading data from {input_path}")

    # Read the raw data
    df = pd.read_csv(input_path, encoding='utf-8')
    print(f"Original data shape: {df.shape}")
    print(f"Columns: {list(df.columns)}")

    # Basic data info
    print("Target distribution:")
    print(df['Renewed'].value_counts())

    # Identify categorical and numerical columns
    # Exclude the target column 'Renewed' from features
    feature_columns = [col for col in df.columns if col != 'Renewed']

    # Automatically identify categorical columns
    categorical_columns = []
    numerical_columns = []

    for col in feature_columns:
        if df[col].dtype == 'object' or df[col].dtype.name == 'category':
            categorical_columns.append(col)
        elif df[col].nunique() < 10 and df[col].dtype in ['int64', 'float64']:
            # Treat low-cardinality numeric columns as categorical
            categorical_columns.append(col)
        else:
            numerical_columns.append(col)

    print(f"Categorical columns ({len(categorical_columns)}): {categorical_columns}")
    print(f"Numerical columns ({len(numerical_columns)}): {numerical_columns}")

    # Handle missing values
    print("Missing values per column:")
    missing_counts = df.isnull().sum()
    print(missing_counts[missing_counts > 0])

    # Fill missing values
    df_processed = df.copy()

    # Fill categorical missing values with 'Unknown'
    for col in categorical_columns:
        if df_processed[col].isnull().sum() > 0:
            df_processed[col] = df_processed[col].fillna('Unknown')
            print(f"Filled {col} missing values with 'Unknown'")

    # Fill numerical missing values with median
    for col in numerical_columns:
        if df_processed[col].isnull().sum() > 0:
            median_value = df_processed[col].median()
            df_processed[col] = df_processed[col].fillna(median_value)
            print(f"Filled {col} missing values with median: {median_value}")

    # Convert categorical columns to string type for consistency
    for col in categorical_columns:
        df_processed[col] = df_processed[col].astype(str)

    # Split data into train/temp, then temp into validation/test
    X = df_processed[feature_columns]
    y = df_processed['Renewed']

    # First split: train vs (validation + test)
    X_train, X_temp, y_train, y_temp = train_test_split(
        X, y, test_size=(test_size + val_size), random_state=random_state, stratify=y
    )

    # Second split: validation vs test
    relative_test_size = test_size / (test_size + val_size)
    X_val, X_test, y_val, y_test = train_test_split(
        X_temp, y_temp, test_size=relative_test_size, random_state=random_state, stratify=y_temp
    )

    print(f"Data split:")
    print(f"Train: {X_train.shape[0]} samples")
    print(f"Validation: {X_val.shape[0]} samples")
    print(f"Test: {X_test.shape[0]} samples")

    # Create output directories
    train_dir = os.path.join(output_path, 'train')
    val_dir = os.path.join(output_path, 'validation')
    test_dir = os.path.join(output_path, 'test')

    os.makedirs(train_dir, exist_ok=True)
    os.makedirs(val_dir, exist_ok=True)
    os.makedirs(test_dir, exist_ok=True)

    # Combine features and target for saving
    train_df = pd.concat([X_train, y_train], axis=1)
    val_df = pd.concat([X_val, y_val], axis=1)
    test_df = pd.concat([X_test, y_test], axis=1)

    # Save datasets
    train_df.to_csv(os.path.join(train_dir, 'train.csv'), index=False)
    val_df.to_csv(os.path.join(val_dir, 'validation.csv'), index=False)
    test_df.to_csv(os.path.join(test_dir, 'test.csv'), index=False)

    print(f"Saved datasets to {output_path}")

    # Create metadata for the pipeline
    metadata = {
        "feature_names": feature_columns,
        "categorical_columns": categorical_columns,
        "numerical_columns": numerical_columns,
        "target_column": "Renewed",
        "original_shape": df.shape,
        "train_shape": train_df.shape,
        "validation_shape": val_df.shape,
        "test_shape": test_df.shape,
        "preprocessing_info": {
            "missing_value_strategy": {
                "categorical": "Unknown",
                "numerical": "median"
            },
            "train_test_split": {
                "test_size": test_size,
                "validation_size": val_size,
                "random_state": random_state
            }
        }
    }

    # Save metadata to all directories (required by your training script)
    for directory in [train_dir, val_dir, test_dir]:
        with open(os.path.join(directory, 'metadata.json'), 'w') as f:
            json.dump(metadata, f, indent=2)

    print("Metadata saved to all directories")

    return metadata

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Preprocess data for XGBoost training')

    # SageMaker processing job arguments
    parser.add_argument('--input-data', 
                        type=str, 
                        default='/opt/ml/processing/input/Renewal_Forecast_individual_modeling_dataset_FINAL_250703_v3.csv',
                        help='Path to input data file')
    parser.add_argument('--output-data', type=str, default='/opt/ml/processing/output',
                        help='Path to output directory')
    parser.add_argument('--test-size', type=float, default=0.2,
                        help='Proportion of data to use for test set')
    parser.add_argument('--val-size', type=float, default=0.2,
                        help='Proportion of data to use for validation set')
    parser.add_argument('--random-state', type=int, default=42,
                        help='Random state for reproducibility')

    args = parser.parse_args()
    
    print("Starting data preprocessing...")
    print(f"Input data: {args.input_data}")
    print(f"Output directory: {args.output_data}")
    print(f"Test size: {args.test_size}")
    print(f"Validation size: {args.val_size}")
    print(f"Random state: {args.random_state}")
    
    # Run preprocessing
    try:
        metadata = preprocess_data(
            input_path=args.input_data,
            output_path=args.output_data,
            test_size=args.test_size,
            val_size=args.val_size,
            random_state=args.random_state
        )
        
        print("Preprocessing completed successfully!")
        print(f"Features: {len(metadata['feature_names'])}")
        print(f"Categorical features: {len(metadata['categorical_columns'])}")
        print(f"Numerical features: {len(metadata['numerical_columns'])}")
        
    except Exception as e:
        print(f"Error during preprocessing: {str(e)}")
        raise e
"""
with open("preprocessing.py", "w") as f:
    f.write(preprocessing_script)

# Create training script
training_script = """
# train_xgb.py - training script with categorical handling
import argparse
import os
import pandas as pd
import numpy as np
import xgboost as xgb
import json
import pickle
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score

def convert_categorical_columns(df, categorical_columns):
    # Convert specified columns to categorical type for XGBoost
    df_copy = df.copy()
    for col in categorical_columns:
        if col in df_copy.columns:
            # XGBoost expects categorical columns as 'category' dtype
            df_copy[col] = df_copy[col].astype('category')
    return df_copy

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    
    # XGBoost hyperparameters
    parser.add_argument("--num_round", type=int, default=100)
    parser.add_argument("--max_depth", type=int, default=6)
    parser.add_argument("--eta", type=float, default=0.1)
    parser.add_argument("--subsample", type=float, default=0.8)
    parser.add_argument("--colsample_bytree", type=float, default=0.8)
    parser.add_argument("--min_child_weight", type=float, default=1.0)
    parser.add_argument("--scale_pos_weight", type=float, default=1.0)
    parser.add_argument("--objective", type=str, default="binary:logistic")
    parser.add_argument("--eval_metric", type=str, default="auc")
    parser.add_argument("--enable_categorical", type=str, default="true")
    parser.add_argument("--tree_method", type=str, default="hist")
    
    # SageMaker specific arguments
    parser.add_argument("--model_dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    parser.add_argument("--validation", type=str, default=os.environ.get("SM_CHANNEL_VALIDATION"))
    
    args = parser.parse_args()
    
    print("Arguments received:")
    for arg, value in vars(args).items():
        print(f"{arg}: {value}")
    
    # Load data
    train_df = pd.read_csv(f"{args.train}/train.csv", encoding='utf-8')
    val_df = pd.read_csv(f"{args.validation}/validation.csv", encoding='utf-8')
    
    # Load metadata
    with open(f"{args.train}/metadata.json", "r") as f:
        metadata = json.load(f)
    
    feature_names = metadata["feature_names"]
    categorical_columns = metadata.get("categorical_columns", [])
    
    print(f"Feature names: {feature_names}")
    print(f"Categorical columns: {categorical_columns}")
    
    # Prepare features and target
    X_train = train_df[feature_names].copy()
    y_train = train_df["Renewed"].copy()
    X_val = val_df[feature_names].copy()
    y_val = val_df["Renewed"].copy()
    
    # Convert categorical columns to proper categorical type
    X_train = convert_categorical_columns(X_train, categorical_columns)
    X_val = convert_categorical_columns(X_val, categorical_columns)
    
    print(f"Training data shape: {X_train.shape}")
    print(f"Validation data shape: {X_val.shape}")
    print(f"Training target distribution:{y_train.value_counts()}")
    
    # For XGBoost native container, we need to use the training API approach
    # Create XGBClassifier for categorical support
    enable_categorical = args.enable_categorical.lower() == "true"
    
    model = xgb.XGBClassifier(
        n_estimators=args.num_round,
        max_depth=args.max_depth,
        learning_rate=args.eta,
        subsample=args.subsample,
        colsample_bytree=args.colsample_bytree,
        min_child_weight=args.min_child_weight,
        scale_pos_weight=args.scale_pos_weight,
        objective=args.objective,
        eval_metric=args.eval_metric,
        enable_categorical=enable_categorical,
        tree_method=args.tree_method,
        random_state=42,
        verbosity=1
    )
    
    print("Starting training with categorical support...")
    print(f"Enable categorical: {enable_categorical}")
    print(f"Tree method: {args.tree_method}")
    
    # Train the model
    # model.fit(
    #     X_train, y_train,
    #     eval_set=[(X_train, y_train), (X_val, y_val)],
    #     eval_names=['train', 'validation'],
    #     early_stopping_rounds=10,
    #     verbose=True
    # )
    
    model.fit(
        X_train, y_train,
        eval_set=[(X_train, y_train), (X_val, y_val)],
        early_stopping_rounds=10,
        verbose=True
    )
    
    # Make predictions for validation metrics
    val_pred_proba = model.predict_proba(X_val)[:, 1]
    val_pred = model.predict(X_val)
    
    # Calculate validation metrics
    val_metrics = {
        "accuracy": float(accuracy_score(y_val, val_pred)),
        "precision": float(precision_score(y_val, val_pred, zero_division=0)),
        "recall": float(recall_score(y_val, val_pred, zero_division=0)),
        "f1": float(f1_score(y_val, val_pred, zero_division=0)),
        "auc": float(roc_auc_score(y_val, val_pred_proba))
    }
    
    print("Validation Metrics:")
    for metric, value in val_metrics.items():
        print(f"{metric}: {value:.4f}")
    
    # Save model using pickle
    model_path = os.path.join(args.model_dir, "model.pkl")
    with open(model_path, 'wb') as f:
        pickle.dump(model, f)
    print(f"Model saved to {model_path}")
    
    # Also save in XGBoost native format for compatibility
    xgb_model_path = os.path.join(args.model_dir, "xgboost-model")
    model.save_model(xgb_model_path)
    print(f"XGBoost model saved to {xgb_model_path}")
    
    # Save metadata
    model_metadata = {
        "feature_names": feature_names,
        "categorical_columns": categorical_columns,
        "validation_metrics": val_metrics,
        "model_type": "XGBClassifier",
        "hyperparameters": {
            "n_estimators": args.num_round,
            "max_depth": args.max_depth,
            "learning_rate": args.eta,
            "enable_categorical": enable_categorical,
            "tree_method": args.tree_method
        }
    }
    
    with open(os.path.join(args.model_dir, "model_metadata.json"), "w") as f:
        json.dump(model_metadata, f, indent=2)
    
    print("Training completed successfully!")
    print(f"Best iteration: {model.best_iteration}")
    print(f"Best score: {model.best_score}") """

# Save training script
with open("train.py", "w") as f:
    f.write(training_script)

# Evaluation script
evaluation_script = """
# evaluate.py - Model evaluation script
# evaluate.py - Model evaluation script
import os
import json
import pandas as pd
import numpy as np
import joblib
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score, 
    roc_auc_score, confusion_matrix, classification_report,
    precision_recall_curve, roc_curve
)

def convert_categorical_columns(df, categorical_columns):
    # Convert specified columns to categorical type
    df_copy = df.copy()
    for col in categorical_columns:
        if col in df_copy.columns:
            df_copy[col] = df_copy[col].astype('category')
    return df_copy

def create_evaluation_plots(y_true, y_pred, y_pred_proba, output_dir):
    # Create evaluation plots
    
    # 1. Confusion Matrix
    plt.figure(figsize=(8, 6))
    cm = confusion_matrix(y_true, y_pred)
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=['Not Renewed', 'Renewed'],
                yticklabels=['Not Renewed', 'Renewed'])
    plt.title('Confusion Matrix')
    plt.ylabel('Actual')
    plt.xlabel('Predicted')
    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, 'confusion_matrix.png'), dpi=300, bbox_inches='tight')
    plt.close()
    
    # 2. ROC Curve
    plt.figure(figsize=(8, 6))
    fpr, tpr, _ = roc_curve(y_true, y_pred_proba)
    auc_score = roc_auc_score(y_true, y_pred_proba)
    plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {auc_score:.3f})')
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) Curve')
    plt.legend(loc="lower right")
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, 'roc_curve.png'), dpi=300, bbox_inches='tight')
    plt.close()
    
    # 3. Precision-Recall Curve
    plt.figure(figsize=(8, 6))
    precision, recall, _ = precision_recall_curve(y_true, y_pred_proba)
    plt.plot(recall, precision, color='blue', lw=2)
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title('Precision-Recall Curve')
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, 'precision_recall_curve.png'), dpi=300, bbox_inches='tight')
    plt.close()
    
    # 4. Prediction Distribution
    plt.figure(figsize=(10, 6))
    plt.subplot(1, 2, 1)
    plt.hist(y_pred_proba[y_true == 0], bins=30, alpha=0.7, label='Not Renewed', color='red')
    plt.hist(y_pred_proba[y_true == 1], bins=30, alpha=0.7, label='Renewed', color='blue')
    plt.xlabel('Predicted Probability')
    plt.ylabel('Frequency')
    plt.title('Prediction Probability Distribution')
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    plt.subplot(1, 2, 2)
    plt.boxplot([y_pred_proba[y_true == 0], y_pred_proba[y_true == 1]], 
                labels=['Not Renewed', 'Renewed'])
    plt.ylabel('Predicted Probability')
    plt.title('Probability Distribution by Class')
    plt.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, 'prediction_distribution.png'), dpi=300, bbox_inches='tight')
    plt.close()

if __name__ == "__main__":
    print("Starting model evaluation...")
    
    # Load test data
    test_path = "/opt/ml/processing/test/test.csv"
    test_df = pd.read_csv(test_path, encoding='utf-8')
    
    # Load metadata
    with open("/opt/ml/processing/test/metadata.json", "r") as f:
        metadata = json.load(f)
    
    feature_names = metadata["feature_names"]
    categorical_columns = metadata.get("categorical_columns", [])
    
    print(f"Test data shape: {test_df.shape}")
    print(f"Feature names: {feature_names}")
    print(f"Categorical columns: {categorical_columns}")
    
    # Prepare test data
    X_test = test_df[feature_names]
    y_test = test_df["Renewed"]
    
    # Convert categorical columns
    X_test = convert_categorical_columns(X_test, categorical_columns)
    
    # Load model - try pickled XGBClassifier first, then XGBoost native
    model_pkl_path = "/opt/ml/processing/model/model.pkl"
    model_xgb_path = "/opt/ml/processing/model/xgboost-model"
    
    if os.path.exists(model_pkl_path):
        print("Loading pickled XGBClassifier model...")
        import pickle
        with open(model_pkl_path, 'rb') as f:
            model = pickle.load(f)
        model_type = "XGBClassifier"
    elif os.path.exists(model_xgb_path):
        print("Loading XGBoost native model...")
        import xgboost as xgb
        model = xgb.Booster()
        model.load_model(model_xgb_path)
        model_type = "Booster"
    else:
        raise FileNotFoundError("No model file found")
    
    print(f"Model type: {model_type}")
    print("Making predictions...")
    
    # Make predictions based on model type
    if model_type == "XGBClassifier":
        y_pred = model.predict(X_test)
        y_pred_proba = model.predict_proba(X_test)[:, 1]
    else:
        # XGBoost Booster
        dtest = xgb.DMatrix(X_test, enable_categorical=True)
        y_pred_proba = model.predict(dtest)
        y_pred = (y_pred_proba > 0.5).astype(int)
    
    # Calculate metrics
    metrics = {
        "binary_classification_metrics": {
            "accuracy": float(accuracy_score(y_test, y_pred)),
            "precision": float(precision_score(y_test, y_pred)),
            "recall": float(recall_score(y_test, y_pred)),
            "f1": float(f1_score(y_test, y_pred)),
            "roc_auc": float(roc_auc_score(y_test, y_pred_proba))
        },
        "confusion_matrix": confusion_matrix(y_test, y_pred).tolist(),
        "classification_report": classification_report(y_test, y_pred, output_dict=True)
    }
    
    print("Evaluation Metrics:")
    for metric, value in metrics["binary_classification_metrics"].items():
        print(f"{metric}: {value:.4f}")
    
    # Create output directory
    output_dir = "/opt/ml/processing/evaluation"
    os.makedirs(output_dir, exist_ok=True)
    
    # Save metrics
    with open(os.path.join(output_dir, "evaluation.json"), "w") as f:
        json.dump(metrics, f, indent=2)
    
    # Create evaluation plots
    create_evaluation_plots(y_test, y_pred, y_pred_proba, output_dir)
    
    # Feature importance
    if hasattr(model, 'feature_importances_'):
        # XGBClassifier
        importance_df = pd.DataFrame({
            'Feature': feature_names,
            'Importance': model.feature_importances_
        }).sort_values('Importance', ascending=False)
    elif hasattr(model, 'get_score'):
        # XGBoost Booster
        importance_dict = model.get_score(importance_type='gain')
        importance_df = pd.DataFrame({
            'Feature': list(importance_dict.keys()),
            'Importance': list(importance_dict.values())
        }).sort_values('Importance', ascending=False)
    else:
        importance_df = None
    
    if importance_df is not None:
        # Save feature importance
        importance_df.to_csv(os.path.join(output_dir, "feature_importance.csv"), index=False)
        
        # Plot feature importance
        plt.figure(figsize=(12, 8))
        top_features = importance_df.head(20)
        plt.barh(range(len(top_features)), top_features['Importance'])
        plt.yticks(range(len(top_features)), top_features['Feature'])
        plt.xlabel('Feature Importance')
        plt.title('Top 20 Feature Importances')
        plt.gca().invert_yaxis()
        plt.tight_layout()
        plt.savefig(os.path.join(output_dir, 'feature_importance.png'), dpi=300, bbox_inches='tight')
        plt.close()
    
    # Save detailed predictions for analysis
    results_df = pd.DataFrame({
        'actual': y_test,
        'predicted': y_pred,
        'probability': y_pred_proba
    })
    results_df.to_csv(os.path.join(output_dir, "predictions.csv"), index=False)
    
    print(f"Evaluation completed. Results saved to {output_dir}")
    print(f"Final F1 Score: {metrics['binary_classification_metrics']['f1']:.4f}")
    print(f"Final AUC Score: {metrics['binary_classification_metrics']['roc_auc']:.4f}")
"""


# Save evaluation script
with open("evaluate.py", "w") as f:
    f.write(evaluation_script)

# Create inference script
inference_script = """
# inference.py - Inference script for batch transform with categorical support
# inference.py - Inference script for XGBoost native container
import os
import json
import pandas as pd
import numpy as np
import pickle
import xgboost as xgb
from io import StringIO

def convert_categorical_columns(df, categorical_columns):
    # Convert specified columns to categorical type
    df_copy = df.copy()
    for col in categorical_columns:
        if col in df_copy.columns:
            df_copy[col] = df_copy[col].astype('category')
    return df_copy

def model_fn(model_dir):
    # Load model for SageMaker XGBoost container. SageMaker will call this function to load the model.

    print(f"Loading model from {model_dir}")
    
    # Try to load the pickled XGBClassifier first (for categorical support)
    model_path = os.path.join(model_dir, "model.pkl")
    if os.path.exists(model_path):
        print("Loading pickled XGBClassifier model...")
        with open(model_path, 'rb') as f:
            model = pickle.load(f)
    else:
        # Fallback to XGBoost native format
        print("Loading XGBoost native model...")
        xgb_model_path = os.path.join(model_dir, "xgboost-model")
        model = xgb.Booster()
        model.load_model(xgb_model_path)
    
    # Load metadata
    metadata_path = os.path.join(model_dir, "model_metadata.json")
    with open(metadata_path, "r") as f:
        metadata = json.load(f)
    
    return {
        "model": model,
        "feature_names": metadata["feature_names"],
        "categorical_columns": metadata.get("categorical_columns", []),
        "model_type": metadata.get("model_type", "XGBClassifier")
    }

def input_fn(request_body, request_content_type):
    # Parse input data for prediction. SageMaker will call this function to process the input.
    
    print(f"Processing input with content type: {request_content_type}")
    
    if request_content_type == "text/csv":
        # For batch transform, input comes as CSV string
        df = pd.read_csv(StringIO(request_body), encoding='utf-8')
        print(f"Loaded CSV data with shape: {df.shape}")
        return df
    elif request_content_type == "application/json":
        # For real-time inference
        data = json.loads(request_body)
        if isinstance(data, dict):
            df = pd.DataFrame([data])
        elif isinstance(data, list):
            df = pd.DataFrame(data)
        else:
            raise ValueError("JSON data must be dict or list of dicts")
        print(f"Loaded JSON data with shape: {df.shape}")
        return df
    else:
        raise ValueError(f"Unsupported content type: {request_content_type}")

def predict_fn(input_data, model_components):
    # Make predictions using the loaded model. SageMaker will call this function to make predictions.
    
    model = model_components["model"]
    feature_names = model_components["feature_names"]
    categorical_columns = model_components["categorical_columns"]
    model_type = model_components["model_type"]
    
    print(f"Making predictions with model type: {model_type}")
    print(f"Input data shape: {input_data.shape}")
    print(f"Expected features: {len(feature_names)}")
    print(f"Categorical columns: {len(categorical_columns)}")
    
    # Ensure we have all required features
    missing_features = set(feature_names) - set(input_data.columns)
    if missing_features:
        print(f"Warning: Missing features {missing_features}, filling with defaults")
        for feature in missing_features:
            if feature in categorical_columns:
                input_data[feature] = 'Unknown'  # Default for categorical
            else:
                input_data[feature] = 0  # Default for numerical
    
    # Select and order features
    X = input_data[feature_names].copy()
    
    # Convert categorical columns
    X = convert_categorical_columns(X, categorical_columns)
    
    print(f"Data prepared for prediction, shape: {X.shape}")
    
    # Make predictions based on model type
    if model_type == "XGBClassifier" and hasattr(model, 'predict_proba'):
        # Using XGBClassifier with categorical support
        predictions = model.predict(X)
        probabilities = model.predict_proba(X)[:, 1]
    else:
        # Using XGBoost Booster (native format)
        dmatrix = xgb.DMatrix(X, enable_categorical=True)
        probabilities = model.predict(dmatrix)
        predictions = (probabilities > 0.5).astype(int)
    
    return {
        "predictions": predictions.tolist(),
        "probabilities": probabilities.tolist()
    }

def output_fn(prediction, response_content_type):
    # Format the prediction output. SageMaker will call this function to format the response.

    print(f"Formatting output with content type: {response_content_type}")
    
    if response_content_type == "application/json":
        return json.dumps(prediction)
    elif response_content_type == "text/csv":
        # For batch transform, return CSV format
        df = pd.DataFrame({
            'prediction': prediction['predictions'],
            'probability': prediction['probabilities']
        })
        return df.to_csv(index=False)
    else:
        raise ValueError(f"Unsupported response content type: {response_content_type}")
"""

# Save inference script
with open("inference.py", "w") as f:
    f.write(inference_script)

In [44]:
# Upload scripts to S3
# need a script to load dataset into session
# session.upload_data(path=..., bucket=bucket, key_prefix=f"{prefix}/scripts")
session.upload_data(path="preprocessing.py", bucket=bucket, key_prefix=f"{prefix}/scripts")
session.upload_data(path="train.py", bucket=bucket, key_prefix=f"{prefix}/scripts")
session.upload_data(path="evaluate.py", bucket=bucket, key_prefix=f"{prefix}/scripts")
session.upload_data(path="inference.py", bucket=bucket, key_prefix=f"{prefix}/scripts")

's3://sagemaker-us-east-1-922182641966/xgb-renewal/scripts/inference.py'

In [45]:
# sagemaker_pipeline.py - Complete XGBoost pipeline for renewal prediction
import boto3
import sagemaker
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.xgboost import XGBoost
from sagemaker.inputs import TrainingInput
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.drift_check_baselines import DriftCheckBaselines
from sagemaker.workflow.functions import Join
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.parameters import ParameterString, ParameterFloat, ParameterInteger
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.model import Model
from sagemaker.workflow.step_collections import RegisterModel
import os

# Initialize SageMaker session
sagemaker_session = sagemaker.Session()
pipeline_session = PipelineSession()
# bucket = sagemaker_session.default_bucket()
role = sagemaker.get_execution_role()
region = sagemaker_session.boto_region_name

# Define pipeline parameters
input_data_uri = ParameterString(
    name="InputDataUri",
    default_value=f"s3://{bucket}/xgb-renewal/data/Renewal_Forecast_individual_modeling_dataset_FINAL_250703_v3.csv"
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)

# XGBoost hyperparameters
num_round = ParameterInteger(name="NumRound", default_value=500)
max_depth = ParameterInteger(name="MaxDepth", default_value=7)
eta = ParameterFloat(name="Eta", default_value=0.01)
subsample = ParameterFloat(name="Subsample", default_value=0.8)
colsample_bytree = ParameterFloat(name="ColsampleByTree", default_value=0.8)

def create_preprocessing_step():
    """Create preprocessing step using SKLearn processor"""
    
    # Create SKLearn processor for preprocessing
    sklearn_processor = SKLearnProcessor(
        framework_version='0.23-1',
        role=role,
        instance_type='ml.m5.large',
        instance_count=1,
        sagemaker_session=pipeline_session
    )
    
    # Define preprocessing step
    preprocessing_step = ProcessingStep(
        name="PreprocessingStep",
        processor=sklearn_processor,
        code="preprocessing.py",
        inputs=[
            ProcessingInput(
                source=input_data_uri,
                destination="/opt/ml/processing/input"
            )
        ],
        outputs=[
            ProcessingOutput(
                output_name="train",
                source="/opt/ml/processing/output/train",
                destination=f"s3://{bucket}/renewal-prediction/processed-data/train"
            ),
            ProcessingOutput(
                output_name="validation",
                source="/opt/ml/processing/output/validation",
                destination=f"s3://{bucket}/renewal-prediction/processed-data/validation"
            ),
            ProcessingOutput(
                output_name="test",
                source="/opt/ml/processing/output/test",
                destination=f"s3://{bucket}/renewal-prediction/processed-data/test"
            )
        ],
        job_arguments=[
            "--input-data", "/opt/ml/processing/input/Renewal_Forecast_individual_modeling_dataset_FINAL_250703_v3.csv",
            "--output-data", "/opt/ml/processing/output",
            "--test-size", "0.2",
            "--val-size", "0.2",
            "--random-state", "42"
        ]
    )
    
    return preprocessing_step

def create_training_step(preprocessing_step):
    """Create XGBoost training step"""
    
    # XGBoost estimator with categorical support
    xgb_estimator = XGBoost(
        entry_point="train.py",
        framework_version="1.5-1",
        py_version="py3",
        instance_type="ml.m5.large",
        instance_count=1,
        role=role,
        sagemaker_session=pipeline_session,
        hyperparameters={
            "num_round": num_round,
            "max_depth": max_depth,
            "eta": eta,
            "subsample": subsample,
            "colsample_bytree": colsample_bytree,
            "objective": "binary:logistic",
            "eval_metric": "auc",
            "enable_categorical": "true",
            "tree_method": "hist"
        }
    )
    
    # Training step
    training_step = TrainingStep(
        name="TrainingStep",
        estimator=xgb_estimator,
        inputs={
            "train": TrainingInput(
                s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv"
            ),
            "validation": TrainingInput(
                s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv"
            )
        }
    )
    
    return training_step

def create_evaluation_step(training_step, preprocessing_step):
    """Create model evaluation step"""
    
    # Create processor for evaluation
    evaluation_processor = SKLearnProcessor(
        framework_version='0.23-1',
        role=role,
        instance_type='ml.m5.large',
        instance_count=1,
        sagemaker_session=pipeline_session
    )
    
    # Evaluation step
    evaluation_step = ProcessingStep(
        name="EvaluationStep",
        processor=evaluation_processor,
        code="evaluate.py",
        inputs=[
            ProcessingInput(
                source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
                destination="/opt/ml/processing/model"
            ),
            ProcessingInput(
                source=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                    "test"
                ].S3Output.S3Uri,
                destination="/opt/ml/processing/test"
            )
        ],
        outputs=[
            ProcessingOutput(
                output_name="evaluation",
                source="/opt/ml/processing/evaluation",
                destination=f"s3://{bucket}/renewal-prediction/evaluation"
            )
        ]
    )
    
    return evaluation_step

def create_model_registration_step(training_step, evaluation_step):
    """Create model registration step"""
    
    # Create model
    model = Model(
        image_uri=training_step.properties.AlgorithmSpecification.TrainingImage,
        model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
        sagemaker_session=pipeline_session,
        role=role,
        entry_point="inference.py",
        source_dir="."
    )
    
    # Model metrics
    model_metrics = ModelMetrics(
        model_statistics=MetricsSource(
            s3_uri=Join(
                on="/",
                values=[
                    evaluation_step.properties.ProcessingOutputConfig.Outputs['evaluation'].S3Output.S3Uri,
                    "evaluation.json"
                ]
                    ),
            content_type="application/json"
        )
    )

    # Register model step
    register_model_step = RegisterModel(
        name="RegisterModelStep",
        estimator=training_step.estimator,
        model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
        content_types=["text/csv", "application/json"],
        response_types=["text/csv", "application/json"],
        inference_instances=["ml.t2.medium", "ml.m5.large"],
        transform_instances=["ml.m5.large"],
        model_package_group_name="renewal-prediction-model-group",
        approval_status=model_approval_status,
        model_metrics=model_metrics
    )
    
    return register_model_step

In [46]:
# Make sure everything exists before starting pipeline
required_files = ['preprocessing.py', 'train.py', 'evaluate.py', 'inference.py']

for file in required_files:
    if os.path.exists(file):
        print(f"✅ {file} exists")
    else:
        print(f"❌ {file} is missing!")

# Check if input data exists
import boto3
s3_client = boto3.client('s3')

def check_s3_file(s3_uri):
    try:
        # Parse S3 URI
        bucket = s3_uri.split('/')[2]
        key = '/'.join(s3_uri.split('/')[3:])
        
        # Check if file exists
        s3_client.head_object(Bucket=bucket, Key=key)
        print(f"✅ {s3_uri} FOUND!")
        return True
    except Exception as e:
        print(f"❌ {s3_uri} not found: {e}")
        return False

# Check your input data
bucket = bucket
input_data = f"s3://{bucket}/xgb-renewal/data/Renewal_Forecast_individual_modeling_dataset_FINAL_250703_v2.csv"
check_s3_file(input_data)

✅ preprocessing.py exists
✅ train.py exists
✅ evaluate.py exists
✅ inference.py exists
✅ s3://sagemaker-us-east-1-922182641966/xgb-renewal/data/Renewal_Forecast_individual_modeling_dataset_FINAL_250703_v2.csv FOUND!


True

In [47]:
def create_pipeline():
    """Create and return the complete pipeline"""
    
    # Create pipeline steps
    preprocessing_step = create_preprocessing_step()
    training_step = create_training_step(preprocessing_step)
    evaluation_step = create_evaluation_step(training_step, preprocessing_step)
    register_model_step = create_model_registration_step(training_step, evaluation_step)
    
    # Create pipeline
    pipeline = Pipeline(
        name="RenewalForecastPipeline",
        parameters=[
            input_data_uri,
            model_approval_status,
            num_round,
            max_depth,
            eta,
            subsample,
            colsample_bytree
        ],
        steps=[
            preprocessing_step,
            training_step,
            evaluation_step,
            register_model_step
        ],
        sagemaker_session=pipeline_session
    )
    
    return pipeline

if __name__ == "__main__":
    # Create and upload pipeline
    pipeline = create_pipeline()
    
    # Create or update pipeline
    response = pipeline.upsert(role_arn=role)
    
    # print(f"Pipeline created: {response['Name']}")
    print(f"Pipeline ARN: {response['PipelineArn']}")
    
    # Start pipeline execution
    execution = pipeline.start()
    
    print(f"Pipeline execution started: {execution.arn}")
    print("Monitor the pipeline in SageMaker Studio or AWS Console")
    
    # Wait for completion (optional)
    # execution.wait()

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Ignoring unnecessary Python version: py3.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.m5.large.
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


Pipeline ARN: arn:aws:sagemaker:us-east-1:922182641966:pipeline/RenewalForecastPipeline
Pipeline execution started: arn:aws:sagemaker:us-east-1:922182641966:pipeline/RenewalForecastPipeline/execution/0w0mlib8ala1
Monitor the pipeline in SageMaker Studio or AWS Console


In [48]:
sm_client = boto3.client('sagemaker')
response = sm_client.list_pipeline_executions(
    PipelineName="RenewalForecastPipeline"
)
for execution in response['PipelineExecutionSummaries']:
    # print(f"Execution: {execution['PipelineStepName']}")
    print(f"Execution: {execution['PipelineExecutionArn']}")
    print(f"Status: {execution['PipelineExecutionStatus']}")
    print(f"Started: {execution['StartTime']}")
    print("---")

Execution: arn:aws:sagemaker:us-east-1:922182641966:pipeline/RenewalForecastPipeline/execution/0w0mlib8ala1
Status: Executing
Started: 2025-07-12 01:11:59.122000+00:00
---
Execution: arn:aws:sagemaker:us-east-1:922182641966:pipeline/RenewalForecastPipeline/execution/jtl20aepymhk
Status: Failed
Started: 2025-07-12 00:54:41.689000+00:00
---
Execution: arn:aws:sagemaker:us-east-1:922182641966:pipeline/RenewalForecastPipeline/execution/mcfxjsvdbfvw
Status: Failed
Started: 2025-07-12 00:42:20.580000+00:00
---
Execution: arn:aws:sagemaker:us-east-1:922182641966:pipeline/RenewalForecastPipeline/execution/gs7xuw3pjx3g
Status: Failed
Started: 2025-07-12 00:35:41.602000+00:00
---
Execution: arn:aws:sagemaker:us-east-1:922182641966:pipeline/RenewalForecastPipeline/execution/v3nbjkx2buiv
Status: Failed
Started: 2025-07-11 23:57:47.486000+00:00
---
Execution: arn:aws:sagemaker:us-east-1:922182641966:pipeline/RenewalForecastPipeline/execution/72yfmvmzp6a8
Status: Failed
Started: 2025-07-11 22:27:03.

## Pipeline code - old

In [None]:
# # Try checking permissions
# import boto3
# iam = boto3.client('iam')
# role_name = 'SageMaker-ExecutionRole-20250328T105273'

# try:
#     attached_policies = iam.list_attached_role_policies(RoleName=role_name)
#     print("Attached policies:", attached_policies)

#     inline_policies = iam.list_role_policies(RoleName=role_name)
#     print("Inline policies:", inline_policies)

# except Exception as e:
#     print(f"Found error: {e}")

In [None]:
# # SageMaker Pipeline for XGBoost with Categorical Support
# # Configuration
# region = 'us-east-1'
# bucket = bucket
# prefix = prefix
# role = role
# input_data_uri = f"s3://{bucket}/{prefix}/data"
# output_data_uri = f"s3://{bucket}/{prefix}/output"
# model_uri = f"s3://{bucket}/{prefix}/model"

# session = sagemaker.Session()
# pipeline_session = PipelineSession()

# # Pipeline Parameters
# model_approval_status = ParameterString(
#     name="ModelApprovalStatus", 
#     default_value="PendingManualApproval"
# )

In [None]:
# # 1. Data Preprocessing
# preprocessing_processor = ScriptProcessor(
#     image_uri=sagemaker.image_uris.retrieve("sklearn", region=region, version="1.2-1"),
#     command=["python3"],
#     instance_type="ml.m5.xlarge",
#     instance_count=1,
#     base_job_name=f"{prefix}-preprocessing",
#     role=role,
#     sagemaker_session=pipeline_session
# )

# preprocessing_step = ProcessingStep(
#     name="PreprocessRenewalData",
#     processor=preprocessing_processor,
#     inputs=[ProcessingInput(source=input_data_uri, destination="/opt/ml/processing/input")],
#     outputs=[
#         ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
#         ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
#         ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
#     ],
#     code="preprocessing.py"
# )

# # 2. Training Step using Native XGBoost Container
# from sagemaker.xgboost.estimator import XGBoost

# xgb_estimator = XGBoost(
#     entry_point="train.py",
#     framework_version="1.7-1",  # Latest version with categorical support
#     py_version="py3",
#     instance_type="ml.m5.xlarge",
#     instance_count=1,
#     role=role,
#     base_job_name=f"{prefix}-training",
#     hyperparameters={
#         "num_round": 500,
#         "max_depth": 7,
#         "eta": 0.01,
#         "subsample": 0.8,
#         "colsample_bytree": 0.8,
#         "min_child_weight": 1,
#         "scale_pos_weight": 0.324132467451242,
#         "objective": "binary:logistic",
#         "eval_metric": "f1",
#         "enable_categorical": "true",  # String for XGBoost native container
#         "tree_method": "hist"
#     },
#     output_path=model_uri,
#     sagemaker_session=pipeline_session
# )

# train_step = TrainingStep(
#     name="TrainRenewalModel",
#     estimator=xgb_estimator,
#     inputs={
#         "train": TrainingInput(
#             s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
#             content_type="text/csv"
#         ),
#         "validation": TrainingInput(
#             s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
#             content_type="text/csv"
#         )
#     }
# )

# # 3. Model Evaluation Step
# evaluation_processor = ScriptProcessor(
#     image_uri=sagemaker.image_uris.retrieve("sklearn", region=region, version="1.2-1"),
#     command=["python3"],
#     instance_type="ml.m5.xlarge",
#     instance_count=1,
#     base_job_name=f"{prefix}-evaluation",
#     role=role,
#     sagemaker_session=pipeline_session
# )

# eval_step = ProcessingStep(
#     name="EvaluateRenewalModel",
#     processor=evaluation_processor,
#     inputs=[
#         ProcessingInput(
#             source=train_step.properties.ModelArtifacts.S3ModelArtifacts,
#             destination="/opt/ml/processing/model"
#         ),
#         ProcessingInput(
#             source=preprocessing_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
#             destination="/opt/ml/processing/test"
#         )
#     ],
#     outputs=[
#         ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation")
#     ],
#     code="evaluate.py"
# )

# # Property file for evaluation metrics
# evaluation_report = PropertyFile(
#     name="EvaluationReport",
#     output_name="evaluation", 
#     path="evaluation.json"
# )
# eval_step.add_property_files(evaluation_report)

# # 4. Model Creation Step
# model = Model(
#     image_uri=sagemaker.image_uris.retrieve("xgboost", region=region, version="1.7-1"),
#     model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
#     role=role,
#     entry_point="inference.py",
#     sagemaker_session=pipeline_session
# )

# model_step = ModelStep(
#     name="CreateRenewalModel",
#     step_args=model.create()
# ) # for real-time inference

# # 5. Batch Transform Step for Inference
# transformer = Transformer(
#     model_name=model_step.properties.ModelName,
#     instance_type="ml.m5.xlarge",
#     instance_count=1,
#     output_path=f"{output_data_uri}/batch-inference",
#     accept="text/csv",
#     assemble_with="Line",
#     max_payload=6,
#     sagemaker_session=pipeline_session
# )

# transform_step = TransformStep(
#     name="BatchInferenceRenewal",
#     transformer=transformer,
#     inputs=sagemaker.inputs.TransformInput(
#         data=preprocessing_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
#         content_type="text/csv",
#         split_type="Line"
#     )
# )

# # 6. Model Registry Step with Condition
# model_metrics = ModelMetrics(
#     model_statistics=MetricsSource(
#         s3_uri=f"{eval_step.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']}/evaluation.json",
#         content_type="application/json"
#     )
# )

# register_model_step = sagemaker.workflow.steps.RegisterModel(
#     name="RegisterRenewalModel",
#     estimator=xgb_estimator,
#     model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
#     content_types=["text/csv", "application/json"],
#     response_types=["text/csv", "application/json"],
#     inference_instances=["ml.m5.xlarge"],
#     transform_instances=["ml.m5.xlarge"],
#     model_package_group_name="RenewalForecastModels",
#     approval_status=model_approval_status,
#     model_metrics=model_metrics
# )

# # Only register model if F1 score >= 0.7
# condition_step = ConditionStep(
#     name="CheckModelPerformance",
#     conditions=[
#         ConditionGreaterThanOrEqualTo(
#             left=JsonGet(
#                 step_name=eval_step.name,
#                 property_file=evaluation_report,
#                 json_path="binary_classification_metrics.f1"
#             ),
#             right=0.70
#         )
#     ],
#     if_steps=[model_step, transform_step, register_model_step],
#     else_steps=[]
# )

In [None]:
# # Create Pipeline
# pipeline = Pipeline(
#     name="RenewalForecastPipeline",
#     parameters=[model_approval_status],
#     steps=[
#         preprocessing_step,
#         train_step, 
#         eval_step,
#         condition_step
#     ],
#     sagemaker_session=pipeline_session
# )

# # Execute Pipeline
# pipeline.upsert(role_arn=role)
# execution = pipeline.start()

# print(f"Pipeline execution started: {execution.describe()['PipelineExecutionArn']}")