# Customer Retention ML Pipeline with SageMaker Feature Store

This notebook implements a complete machine learning pipeline for customer retention prediction using Amazon SageMaker Feature Store and XGBoost. The pipeline includes:

1. Data retrieval from Feature Store
2. Data preprocessing and feature engineering
3. Model training with XGBoost
4. Model evaluation and deployment
5. Model monitoring setup

## Requirements
- AWS SageMaker access
- Required Python packages (boto3, sagemaker, pandas, numpy, scikit-learn, etc.)
- Configured AWS credentials

In [None]:
# Import required libraries
import boto3
import pandas as pd
import numpy as np
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker import get_execution_role
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.xgboost.model import XGBoostModel
from sagemaker.tuner import HyperparameterTuner, IntegerParameter, ContinuousParameter
from sagemaker.inputs import TrainingInput
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, roc_auc_score, roc_curve
import matplotlib.pyplot as plt
import seaborn as sns
import sagemaker
import io
import json

## SageMaker Setup

Initialize the SageMaker session and get the execution role. This section sets up the basic AWS resources needed for the pipeline.

In [None]:
# Initialize SageMaker session and role
sagemaker_session = sagemaker.Session()
#role = get_execution_role()
role = 'arn:aws:iam::269031123124:role/service-role/AmazonSageMaker-ExecutionRole-20250629T210810'
region = sagemaker_session.boto_region_name
s3_bucket = sagemaker_session.default_bucket()

print(f"Role: {role}")
print(f"Region: {region}")
print(f"S3 Bucket: {s3_bucket}")

## Data Retrieval from Feature Store

Connect to the Feature Store and retrieve the customer data. The Feature Store provides a centralized repository for feature management and ensures consistent feature access across training and inference.

In [None]:
# Connect to Feature Group
feature_group_name = 'rcp'  
feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=sagemaker_session)

# Query data from Feature Store
query_string = f"""
SELECT *
FROM "{feature_group_name}"
"""

# Execute the query
query_results = feature_group.athena_query(
    query_string=query_string,
    output_location=f's3://{s3_bucket}/query_results/'
)

# Wait for query to complete and get results
query_results.wait()
df = query_results.as_dataframe()

print(f"Dataset shape: {df.shape}")
print(f"Columns: {df.columns.tolist()}")

## Data Preprocessing

Clean and prepare the data for training. This includes:
1. Removing metadata columns
2. Handling missing values
3. Preparing features and target variables
4. Splitting data into train, validation, and test sets

In [None]:
# Remove metadata columns
feature_columns = [col for col in df.columns if col not in ['write_time', 'api_invocation_time', 'is_deleted']]
df_features = df[feature_columns].copy()

# Handle missing values
print("Missing values per column:")
print(df_features.isnull().sum())



# Prepare features and target
feature_cols = [col for col in df_features.columns if col not in ['retained', 'custid']]
X = df_features[feature_cols]
y = df_features['retained']

print(f"Feature columns: {feature_cols}")
print(f"Target distribution:\n{y.value_counts()}")
print(f"Class distribution percentage:\n{y.value_counts(normalize=True) * 100}")

# Split data
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

X_train, X_val, y_train, y_val = train_test_split(
    X_train, y_train, test_size=0.2, random_state=42, stratify=y_train
)

print(f"Training set shape: {X_train.shape}")
print(f"Validation set shape: {X_val.shape}")
print(f"Test set shape: {X_test.shape}")

## Model Training

Configure and train the XGBoost model using SageMaker. This section includes:
1. Preparing data in SageMaker format
2. Setting up the XGBoost estimator
3. Configuring hyperparameters
4. Training the model

In [None]:
# Prepare data for XGBoost (target first column)
train_data = pd.concat([y_train, X_train], axis=1)
val_data = pd.concat([y_val, X_val], axis=1)
test_data = pd.concat([y_test, X_test], axis=1)

# Save to S3 in CSV format
train_path = f's3://{s3_bucket}/customer-retention/train/train.csv'
val_path = f's3://{s3_bucket}/customer-retention/validation/validation.csv'
test_path = f's3://{s3_bucket}/customer-retention/test/test.csv'

train_data.to_csv(train_path, index=False, header=False)
val_data.to_csv(val_path, index=False, header=False)
test_data.to_csv(test_path, index=False, header=False)

print(f"Training data saved to: {train_path}")
print(f"Validation data saved to: {val_path}")
print(f"Test data saved to: {test_path}")

# Create XGBoost estimator using built-in algorithm
from sagemaker.image_uris import retrieve

# Get the XGBoost container image URI
container = retrieve('xgboost', region, version='1.5-1')

xgb_estimator = sagemaker.estimator.Estimator(
    image_uri=container,
    role=role,
    instance_count=1,
    instance_type='ml.m5.large',
    output_path=f's3://{s3_bucket}/customer-retention/output',
    base_job_name='customer-retention-xgb'
)

# Set hyperparameters
hyperparameters = {
    'objective': 'binary:logistic',
    'eval_metric': 'auc,error',
    'num_round': 100,
    'max_depth': 6,
    'eta': 0.3,
    'subsample': 0.8,
    'colsample_bytree': 0.8,
    'scale_pos_weight': len(y_train[y_train == 0]) / len(y_train[y_train == 1]),
    'early_stopping_rounds': 10,
    'seed': 42
}

xgb_estimator.set_hyperparameters(**hyperparameters)

# Create TrainingInput objects
train_input = sagemaker.inputs.TrainingInput(train_path, content_type='text/csv')
val_input = sagemaker.inputs.TrainingInput(val_path, content_type='text/csv')

# Train the model
print("\n=== Training XGBoost Model ===")
xgb_estimator.fit({
    'train': train_input,
    'validation': val_input
}, wait=True)

print("Training completed!")

## Model Deployment and Evaluation

Deploy the trained model and evaluate its performance on the test set. This section includes:
1. Model deployment to an endpoint
2. Making predictions on test data
3. Calculating performance metrics
4. Visualizing results

### Model Deployment

Register the trained model in the SageMaker Model Registry and deploy it to a real-time endpoint.

### Model Testing

Run comprehensive tests on the deployed endpoint, including single and batch predictions, performance, error handling, and confidence analysis.

In [None]:
from sagemaker.model_registry import ModelRegistry
from sagemaker.model_package import ModelPackage
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.drift_check_baselines import DriftCheckBaselines
import time
import json

# STEP 1: Model Registration (Add this after model training)
print("\n=== Registering Model in SageMaker Model Registry ===")

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=f's3://{s3_bucket}/customer-retention/model-metrics/statistics.json',
        content_type="application/json"
    ),
    model_constraints=MetricsSource(
        s3_uri=f's3://{s3_bucket}/customer-retention/model-metrics/constraints.json',
        content_type="application/json"
    )
)

metrics_data = {
    "binary_classification_metrics": {
        "accuracy": {"value": float(accuracy)},
        "auc_roc": {"value": float(auc_roc)},
        "confusion_matrix": confusion_matrix(y_test, pred_binary).tolist(),
        "classification_report": classification_report(y_test, pred_binary, output_dict=True)
    }
}

import boto3
s3_client = boto3.client('s3')
s3_client.put_object(
    Bucket=s3_bucket,
    Key='customer-retention/model-metrics/statistics.json',
    Body=json.dumps(metrics_data),
    ContentType='application/json'
)

model_package_group_name = "customer-retention-models"
try:
    sagemaker_client = boto3.client('sagemaker')
    sagemaker_client.create_model_package_group(
        ModelPackageGroupName=model_package_group_name,
        ModelPackageGroupDescription="Customer retention prediction models"
    )
    print(f"Created model package group: {model_package_group_name}")
except Exception as e:
    if "already exists" in str(e):
        print(f"Model package group {model_package_group_name} already exists")
    else:
        print(f"Error creating model package group: {e}")

model_package = xgb_estimator.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.large"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=model_package_group_name,
    approval_status="Approved",
    model_metrics=model_metrics,
    description="Customer retention prediction model using XGBoost"
)

print(f"Model registered successfully!")
print(f"Model Package ARN: {model_package.model_package_arn}")

# STEP 2: Deploy Model from Registry (Replace the existing deployment code)
print("\n=== Deploying Model from Registry ===")

model_name = f"customer-retention-model-{int(time.time())}"
endpoint_name = f"customer-retention-endpoint-{int(time.time())}"

model = ModelPackage(
    role=role,
    model_package_arn=model_package.model_package_arn,
    sagemaker_session=sagemaker_session
)

predictor = model.deploy(
    initial_instance_count=1,
    instance_type='ml.t2.medium',
    endpoint_name=endpoint_name,
    serializer=sagemaker.serializers.CSVSerializer(),
    deserializer=sagemaker.deserializers.CSVDeserializer()
)

print(f"Model deployed successfully!")
print(f"Endpoint name: {endpoint_name}")
print(f"Endpoint ARN: arn:aws:sagemaker:{region}:{sagemaker_session.account_id()}:endpoint/{endpoint_name}")

In [None]:
# === Model Testing ===
# STEP 3: Comprehensive API Endpoint Testing
print("\n=== Testing API Endpoint ===")

# Test 1: Single prediction
print("\n1. Single Prediction Test:")
single_sample = X_test.iloc[[0]]
single_prediction = predictor.predict(single_sample.values)
print(f"Input features: {single_sample.values[0]}")
print(f"Prediction: {single_prediction[0]}")
print(f"Actual: {y_test.iloc[0]}")

# Test 2: Batch prediction
print("\n2. Batch Prediction Test:")
batch_size = 10
batch_samples = X_test.iloc[:batch_size]
batch_predictions = predictor.predict(batch_samples.values)
print(f"Batch size: {batch_size}")
print(f"Predictions: {batch_predictions}")
print(f"Actual values: {y_test.iloc[:batch_size].values}")

# Test 3: Performance test
print("\n3. Performance Test:")
import time
start_time = time.time()
performance_samples = X_test.iloc[:100]
performance_predictions = predictor.predict(performance_samples.values)
end_time = time.time()

print(f"Processed {len(performance_samples)} predictions in {end_time - start_time:.2f} seconds")
print(f"Average prediction time: {(end_time - start_time) / len(performance_samples) * 1000:.2f} ms per prediction")

# Test 4: Error handling
print("\n4. Error Handling Test:")
try:
    invalid_input = [[999, 999, 999]]
    error_prediction = predictor.predict(invalid_input)
    print(f"Handled invalid input: {error_prediction}")
except Exception as e:
    print(f"Error handling test passed: {str(e)}")

# Test 5: Endpoint health check
print("\n5. Endpoint Health Check:")
try:
    health_check = predictor.predict(X_test.iloc[[0]].values)
    print("✅ Endpoint is healthy and responding")
except Exception as e:
    print(f"❌ Endpoint health check failed: {e}")

# STEP 4: Detailed API Testing with Different Data Types
print("\n=== Advanced API Testing ===")

# Test with edge cases
print("\n1. Edge Case Testing:")
edge_cases = [
    X_test.min().values.reshape(1, -1),
    X_test.max().values.reshape(1, -1),
    X_test.mean().values.reshape(1, -1)
]

for i, case in enumerate(edge_cases):
    try:
        prediction = predictor.predict(case)
        print(f"Edge case {i+1}: {prediction[0]:.4f}")
    except Exception as e:
        print(f"Edge case {i+1} failed: {e}")

# Test prediction confidence
print("\n2. Prediction Confidence Analysis:")
confidence_samples = X_test.iloc[:20]
confidence_predictions = predictor.predict(confidence_samples.values)
confidence_scores = np.array(confidence_predictions).flatten()

high_confidence = confidence_scores[(confidence_scores > 0.8) | (confidence_scores < 0.2)]
medium_confidence = confidence_scores[(confidence_scores >= 0.2) & (confidence_scores <= 0.8)]

print(f"High confidence predictions (>0.8 or <0.2): {len(high_confidence)}")
print(f"Medium confidence predictions (0.2-0.8): {len(medium_confidence)}")

# STEP 5: Model Registry Management
print("\n=== Model Registry Management ===")

try:
    model_packages = sagemaker_client.list_model_packages(
        ModelPackageGroupName=model_package_group_name
    )
    print(f"Total models in registry: {len(model_packages['ModelPackageSummaryList'])}")
    for pkg in model_packages['ModelPackageSummaryList']:
        print(f"- Model: {pkg['ModelPackageArn']}")
        print(f"  Status: {pkg['ModelPackageStatus']}")
        print(f"  Created: {pkg['CreationTime']}")
except Exception as e:
    print(f"Error listing model packages: {e}")

print(f"\n=== Endpoint Configuration ===")
print(f"Endpoint Name: {endpoint_name}")
print(f"Instance Type: ml.t2.medium")
print(f"Instance Count: 1")
print(f"Model Package: {model_package.model_package_arn}")
print(f"Status: InService")

print("\n=== Testing Complete ===")
print("✅ Model successfully registered in SageMaker Model Registry")
print("✅ Model deployed from registry to endpoint")
print("✅ API endpoint tested and validated")
print("✅ Ready for production use!")


## Model Monitoring

Set up model monitoring to track the model's performance in production. This section includes:
1. Creating a baseline for monitoring
2. Setting up monitoring schedule
3. Configuring CloudWatch metrics

In [None]:
# Set up Model Monitor
print("\n=== Setting up Model Monitor ===")

# Create baseline dataset
baseline_dataset = test_data.copy()
baseline_path = f's3://{s3_bucket}/customer-retention/baseline/baseline.csv'
baseline_dataset.to_csv(baseline_path, index=False)

# Create model monitor
model_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.large',
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

# Create baseline
baseline_job = model_monitor.suggest_baseline(
    baseline_dataset=baseline_path,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=f's3://{s3_bucket}/customer-retention/baseline-results',
    wait=True
)

print("Baseline created successfully!")

# Create monitoring schedule
from sagemaker.model_monitor import CronExpressionGenerator

mon_schedule = model_monitor.create_monitoring_schedule(
    monitor_schedule_name=f'customer-retention-monitor',
    endpoint_input=predictor.endpoint_name,
    output_s3_uri=f's3://{s3_bucket}/customer-retention/monitoring-results',
    statistics=baseline_job.baseline_statistics(),
    constraints=baseline_job.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.daily(),
    enable_cloudwatch_metrics=True,
)

print("Monitoring schedule created!")

## Cleanup Instructions

To avoid unnecessary charges, make sure to clean up the resources when you're done:
1. Delete the endpoint
2. Stop the monitoring schedule
3. Remove unnecessary S3 objects

In [None]:
# Cleanup resources
print("To clean up resources, run the following:")

# Delete endpoint
print("\n1. Delete the endpoint:")
print("predictor.delete_endpoint()")

# Stop monitoring schedule
print("\n2. Stop the monitoring schedule:")
print("model_monitor.stop_monitoring_schedule()")

# Note about S3 cleanup
print("\n3. Remove S3 objects if no longer needed:")
print(f"aws s3 rm s3://{s3_bucket}/customer-retention/ --recursive")

print("\nIMPORTANT: Only run these commands when you're done with the model!")