
# üéì Enterprise Credit Risk ML System  
## MS-Level Final Project ‚Äì Production-Grade SageMaker Implementation

This notebook implements a full end-to-end MLOps system:

- Data validation & preprocessing
- Feature Store (online + offline)
- XGBoost training with evaluation
- SageMaker Pipeline (CI/CD DAG)
- Model Registry governance
- Real-time & Batch inference
- Data Drift monitoring (PSI thresholds)
- Model Quality Monitoring
- Model Bias Monitoring (Clarify)
- SHAP Explainability
- CloudWatch alarms
- Cleanup & cost control

Author: Karan Verma  
Course: AAI-540  


## 1Ô∏è‚É£ Configuration & Environment Setup

In [1]:

import boto3
import sagemaker
import pandas as pd
import numpy as np
import json
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, roc_auc_score
from sagemaker import get_execution_role
from sagemaker.session import Session
from sagemaker.inputs import TrainingInput
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.model_step import RegisterModel
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.model_monitor import DefaultModelMonitor, DatasetFormat
from sagemaker.model_monitor import ModelBiasMonitor
from sagemaker.clarify import (
    BiasConfig,
    DataConfig,
    ModelConfig,
    ModelPredictedLabelConfig,
    SHAPConfig,
    SageMakerClarifyProcessor
)

session = Session()
pipeline_session = PipelineSession()
role = get_execution_role()
region = session.boto_region_name
bucket = session.default_bucket()

print("Region:", region)
print("Bucket:", bucket)


ImportError: cannot import name 'get_execution_role' from 'sagemaker' (/Users/rajni/opt/anaconda3/lib/python3.9/site-packages/sagemaker/__init__.py)

Create Bucket and upload file to s3

In [None]:
import sagemaker
import boto3

# SageMaker session
sagemaker_session = sagemaker.Session()

# Use default SageMaker bucket (recommended)
bucket = sagemaker_session.default_bucket()

prefix = "credit-risk/raw"

print("Using bucket:", bucket)


In [None]:
local_file_path = "credit.csv"

s3_path = sagemaker_session.upload_data(
    path=local_file_path,
    bucket=bucket,
    key_prefix=prefix
)

print("File uploaded to:", s3_path)


## 2Ô∏è‚É£ Data Validation & Feature Engineering

In [None]:

# Load dataset (must already exist in S3)
data_path = f"s3://{bucket}/credit-risk/raw/credit.csv"
df = pd.read_csv(data_path)

# Basic validation
assert 'credit_score_label' in df.columns, "Target column missing"

# Handle missing values
df.fillna(df.median(numeric_only=True), inplace=True)

# Feature engineering
df['income_to_debt_ratio'] = df['income'] / (df['debt'] + 1)
df['delinquency_flag'] = (df['late_payments'] > 0).astype(int)

# Train-test split
train, test = train_test_split(df, test_size=0.2, stratify=df['credit_score_label'], random_state=42)

train.to_csv("train.csv", index=False)
test.to_csv("test.csv", index=False)

session.upload_data("train.csv", bucket=bucket, key_prefix="credit-risk/train")
session.upload_data("test.csv", bucket=bucket, key_prefix="credit-risk/test")


‚úÖ 1. SageMaker Feature Store Implementation

In [None]:
import boto3
import sagemaker
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.session import Session
from sagemaker import get_execution_role
import pandas as pd
import time

role = get_execution_role()
region = boto3.Session().region_name
sagemaker_session = Session()

feature_store_session = sagemaker.Session()
feature_group_name = "credit-risk-feature-group"

feature_group = FeatureGroup(
    name=feature_group_name,
    sagemaker_session=feature_store_session
)

df['event_time'] = pd.Timestamp.now().astype('datetime64[ns]')
df['record_id'] = df.index.astype(str)

feature_group.load_feature_definitions(data_frame=df)

feature_group.create(
    s3_uri=f"s3://credit-risk-processed-data/feature-store",
    record_identifier_name="record_id",
    event_time_feature_name="event_time",
    role_arn=role,
    enable_online_store=True
)

feature_group.ingest(data_frame=df, max_workers=3, wait=True)


Features


## 3Ô∏è‚É£ Model Training (XGBoost)

In [None]:

xgb = XGBoost(
    role=role,
    instance_count=1,
    instance_type="ml.m5.large",
    framework_version="1.5-1",
    hyperparameters={
        "objective": "multi:softprob",
        "num_class": 4,
        "max_depth": 6,
        "eta": 0.1,
        "subsample": 0.8,
        "num_round": 300
    }
)

xgb.fit({
    "train": TrainingInput(f"s3://{bucket}/credit-risk/train/train.csv", content_type="text/csv"),
    "validation": TrainingInput(f"s3://{bucket}/credit-risk/test/test.csv", content_type="text/csv")
})


## 4Ô∏è‚É£ Model Registry

In [None]:

model_package = xgb.register(
    content_types=["text/csv"],
    response_types=["application/json"],
    model_package_group_name="CreditRiskModelGroup",
    approval_status="PendingManualApproval"
)

print("Model registered in Model Registry")

# from sagemaker.model_registry.model_package import ModelPackage

# model_package = training_step.get_expected_model()

# model_package.register(
#     content_types=["text/csv"],
#     response_types=["application/json"],
#     inference_instances=["ml.t3.medium"],
#     model_package_group_name="CreditRiskModelGroup",
#     approval_status="PendingManualApproval"
# )



## 5Ô∏è‚É£ Real-Time Endpoint Deployment

In [None]:

predictor = xgb.deploy(
    initial_instance_count=1,
    instance_type="ml.t3.medium"
)

endpoint_name = predictor.endpoint_name
print("Endpoint deployed:", endpoint_name)


## 6Ô∏è‚É£ Model Quality Monitoring

In [None]:

quality_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.large",
    max_runtime_in_seconds=3600
)

quality_monitor.suggest_baseline(
    baseline_dataset=f"s3://{bucket}/credit-risk/test/test.csv",
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=f"s3://{bucket}/monitoring/quality-baseline"
)

# from sagemaker.model_monitor import ModelQualityMonitor

# model_quality_monitor = ModelQualityMonitor(
#     role=role,
#     instance_count=1,
#     instance_type="ml.m5.large",
#     sagemaker_session=sagemaker_session
# )

# model_quality_monitor.create_monitoring_schedule(
#     monitor_schedule_name="credit-risk-model-quality-monitor",
#     endpoint_input=endpoint_name,
#     output_s3_uri="s3://credit-risk-monitoring/model-quality",
#     problem_type="BinaryClassification"
# )



## 7Ô∏è‚É£ Data Drift Monitoring (PSI Thresholds)

In [None]:

drift_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.large"
)

drift_monitor.create_monitoring_schedule(
    monitor_schedule_name="credit-risk-drift-monitor",
    endpoint_input=sagemaker.model_monitor.EndpointInput(
        endpoint_name=endpoint_name,
        destination="/opt/ml/processing/input"
    ),
    output_s3_uri=f"s3://{bucket}/monitoring/drift-reports",
    statistics=quality_monitor.baseline_statistics(),
    constraints=quality_monitor.suggested_constraints()
)

# from sagemaker.model_monitor import DefaultModelMonitor

# data_monitor = DefaultModelMonitor(
#     role=role,
#     instance_count=1,
#     instance_type="ml.m5.large",
#     sagemaker_session=sagemaker_session
# )

# data_monitor.create_monitoring_schedule(
#     monitor_schedule_name="credit-risk-data-drift-monitor",
#     endpoint_input=endpoint_name,
#     output_s3_uri="s3://credit-risk-monitoring/data-drift"
# )



## 8Ô∏è‚É£ Model Bias Monitoring (Clarify)

In [None]:

bias_config = BiasConfig(
    label_values_or_threshold=[1],
    facet_name="gender"
)

data_config = DataConfig(
    s3_data_input_path=f"s3://{bucket}/credit-risk/test/test.csv",
    s3_output_path=f"s3://{bucket}/monitoring/bias",
    label="credit_score_label",
    dataset_type="text/csv"
)

model_config = ModelConfig(
    model_name=endpoint_name,
    instance_type="ml.m5.large",
    instance_count=1,
    content_type="text/csv",
    accept_type="application/json"
)

bias_monitor = ModelBiasMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.large"
)

bias_monitor.suggest_baseline(
    bias_config=bias_config,
    data_config=data_config,
    model_config=model_config
)


# from sagemaker import clarify

# clarify_processor = clarify.SageMakerClarifyProcessor(
#     role=role,
#     instance_count=1,
#     instance_type="ml.m5.large",
#     sagemaker_session=sagemaker_session
# )

# bias_config = clarify.BiasConfig(
#     label_values_or_threshold=[1],
#     facet_name="Gender",
#     facet_values_or_threshold=["Female"]
# )

# clarify_processor.run_bias(
#     data_config=data_config,
#     bias_config=bias_config,
#     model_config=model_config
# )


## 9Ô∏è‚É£ SHAP Explainability (Clarify Processor)

In [None]:

clarify_processor = SageMakerClarifyProcessor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.large"
)

shap_config = SHAPConfig(
    baseline=[0]* (len(train.columns)-1),
    num_samples=100
)

clarify_processor.run_explainability(
    data_config=data_config,
    model_config=model_config,
    explainability_config=shap_config
)

# import shap

# explainer = shap.TreeExplainer(model)
# shap_values = explainer.shap_values(X_test)

# shap.summary_plot(shap_values, X_test)



## üîü CloudWatch Alarms

In [None]:

cloudwatch = boto3.client("cloudwatch")

cloudwatch.put_metric_alarm(
    AlarmName="HighEndpointLatency",
    MetricName="ModelLatency",
    Namespace="AWS/SageMaker",
    Statistic="Average",
    Period=60,
    EvaluationPeriods=2,
    Threshold=1000.0,
    ComparisonOperator="GreaterThanThreshold",
    Dimensions=[{"Name": "EndpointName", "Value": endpoint_name}]
)
print("Latency alarm configured")

# import boto3

# cloudwatch = boto3.client('cloudwatch')

# cloudwatch.put_metric_alarm(
#     AlarmName='CreditRisk-HighErrorRate',
#     MetricName='5XXError',
#     Namespace='AWS/SageMaker',
#     Statistic='Sum',
#     Threshold=5.0,
#     ComparisonOperator='GreaterThanThreshold',
#     EvaluationPeriods=1,
#     Period=300
# )



Evaluation Metrics Section

In [None]:
from sklearn.metrics import classification_report, roc_auc_score, confusion_matrix

print(classification_report(y_test, y_pred))
print("AUC:", roc_auc_score(y_test, y_pred_proba))

cm = confusion_matrix(y_test, y_pred)
print(cm)


## üßπ Cleanup (Run After Demo to Avoid Charges)

In [None]:

# predictor.delete_endpoint()
# predictor.delete_model()
print("Cleanup section ready.")


‚úÖ 2. SageMaker Pipeline DAG

In [None]:
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import TrainingStep, ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.parameters import ParameterString

pipeline_session = PipelineSession()

model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)

pipeline = Pipeline(
    name="CreditRiskPipeline",
    parameters=[model_approval_status],
    steps=[processing_step, training_step],
    sagemaker_session=pipeline_session
)

pipeline.upsert(role_arn=role)
execution = pipeline.start()


Demonstration Script Section

In [None]:
# Simulate inference

sample = X_test.iloc[0:1]
prediction = predictor.predict(sample.values.tolist())

print("Predicted Class:", prediction)


# üèó Enterprise Architecture Diagram

Data Sources
    ‚Üì
S3 Raw Layer
    ‚Üì
Feature Store
    ‚Üì
SageMaker Processing
    ‚Üì
Training Job
    ‚Üì
Model Registry
    ‚Üì
Endpoint Deployment
    ‚Üì
Model Monitor + Clarify
    ‚Üì
CloudWatch Dashboard


# ‚ö† Risks

- Historical bias in training data
- Data drift due to economic changes
- Regulatory compliance expansion required

# üöÄ Future Improvements

- Online retraining pipeline
- Bayesian hyperparameter tuning
- Fairness-constrained optimization
- Multi-region deployment
- Explainability API endpoint