In [1]:
# Install dependencies
!pip install mlflow boto3 joblib



In [2]:
import os
import json
import boto3
import joblib
import pandas as pd
import mlflow
from mlflow.tracking import MlflowClient
from datetime import datetime
import tempfile
from pathlib import Path

In [3]:
# Configuration
# S3
S3_BUCKET = "mlops-creditcard"
INPUT_PREFIX = "inference/input"
OUTPUT_PREFIX = "inference/output"
MODEL_PREFIX = "inference/models"

# MLflow tracking detection (prefer env var, then SageMaker sqlite path used by register_model.ipynb, then local fallback)
tracking_env = os.environ.get("MLFLOW_TRACKING_URI")
if tracking_env:
    MLFLOW_TRACKING_URI = tracking_env
else:
    sage_db = Path("/home/ec2-user/SageMaker/ML-Ops-CreditCard-AWS/mlflow.db")
    if sage_db.exists():
        MLFLOW_TRACKING_URI = f"sqlite:///{sage_db}"
    else:
        local_db = Path.cwd() / "mlflow.db"
        if local_db.exists():
            MLFLOW_TRACKING_URI = f"sqlite:///{local_db}"
        else:
            MLFLOW_TRACKING_URI = None  # no tracking URI detected

# Model name must match registration step
MODEL_NAME = "creditcard-fraud-model"

# Apply tracking URI if found
if MLFLOW_TRACKING_URI:
    try:
        mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
        mlflow.set_registry_uri(mlflow.get_tracking_uri())
        print("MLflow tracking URI set to:", mlflow.get_tracking_uri())
    except Exception as e:
        print("‚ö†Ô∏è Failed to set MLflow tracking URI:", e)

s3 = boto3.client("s3")


MLflow tracking URI set to: sqlite:////home/ec2-user/SageMaker/ML-Ops-CreditCard-AWS/mlflow.db


In [4]:
# Load Batch Inputs
def load_batch_input():
    obj = s3.get_object(
        Bucket=S3_BUCKET,
        Key=f"{INPUT_PREFIX}/batch_input.csv"
    )
    df = pd.read_csv(obj["Body"])
    print(f"üì• Loaded batch input: {df.shape}")
    return df

In [5]:
# Get Champion Model (MLflow registry first; fallback to S3 pickle)

def get_champion_model():
    client = MlflowClient()

    # Try to find champion in MLflow registry
    try:
        versions = client.search_model_versions(f"name='{MODEL_NAME}'")
    except Exception as e:
        print("‚ö†Ô∏è Could not query MLflow registry:", e)
        versions = []

    for v in versions:
        try:
            mv = client.get_model_version(MODEL_NAME, v.version)
            tags = mv.tags or {}
            if tags.get("status") == "production" and tags.get("role") == "champion":
                print(f"üèÜ Champion model in registry: v{v.version}")
                model_uri = f"models:/{MODEL_NAME}/{v.version}"
                try:
                    # Attempt to load via MLflow (requires artifact store access)
                    model = mlflow.sklearn.load_model(model_uri)
                    return model, model_uri
                except Exception as load_err:
                    print("‚ö†Ô∏è Failed to load model from MLflow registry, will try S3 fallback:", load_err)
                    break
        except Exception:
            continue

    # S3 fallback: try to load a previously uploaded champion model pickle
    s3_key = f"{MODEL_PREFIX}/champion_model.pkl"
    try:
        with tempfile.NamedTemporaryFile(delete=False, suffix=".pkl") as tmp:
            tmp_path = tmp.name
        print(f"Trying to download champion model from s3://{S3_BUCKET}/{s3_key} ‚Üí {tmp_path}")
        s3.download_file(S3_BUCKET, s3_key, tmp_path)
        model = joblib.load(tmp_path)
        model_uri = f"s3://{S3_BUCKET}/{s3_key}"
        print(f"üèÜ Champion model loaded from S3: {model_uri}")
        return model, model_uri
    except Exception as s3err:
        print("‚ö†Ô∏è S3 fallback failed:", s3err)

    raise Exception("‚ùå No champion model found in MLflow registry or S3")


In [6]:
# Generate Predictions
def generate_predictions(df, model):
    if "ID" not in df.columns:
        df.insert(0, "ID", range(1, len(df) + 1))

    features = df.drop(columns=["ID"] + (["CLASS"] if "CLASS" in df.columns else []))

    preds = model.predict(features)

    if hasattr(model, "predict_proba"):
        probs = model.predict_proba(features)[:, 1]
    else:
        probs = [None] * len(preds)

    df["PREDICTION"] = preds
    df["PREDICTION_PROB"] = probs
    return df


In [7]:
# Save Predictions to S3

def save_predictions_to_s3(df):
    ts = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
    output_key = f"{OUTPUT_PREFIX}/predictions_{ts}.csv"

    csv_buffer = df.to_csv(index=False)
    s3.put_object(
        Bucket=S3_BUCKET,
        Key=output_key,
        Body=csv_buffer
    )

    print(f"üì§ Predictions saved to s3://{S3_BUCKET}/{output_key}")


In [8]:
# Save Champion model to s3
def save_champion_model(model_uri):
    local_path = mlflow.artifacts.download_artifacts(model_uri)
    model_path = os.path.join(local_path, "model.pkl")

    s3.upload_file(
        model_path,
        S3_BUCKET,
        f"{MODEL_PREFIX}/champion_model.pkl"
    )

    print("üì¶ Champion model uploaded to S3")


In [9]:
def main():
    print("üöÄ AWS Batch Inference Started")

    batch_df = load_batch_input()
    model, model_uri = get_champion_model()
    preds_df = generate_predictions(batch_df, model)

    save_predictions_to_s3(preds_df)
    save_champion_model(model_uri)

    print("‚úÖ AWS Batch Inference Completed")

if __name__ == "__main__":
    main()


üöÄ AWS Batch Inference Started
üì• Loaded batch input: (10000, 30)


2025/12/31 10:11:46 INFO mlflow.store.db.utils: Creating initial MLflow database tables...
2025/12/31 10:11:46 INFO mlflow.store.db.utils: Updating database tables
2025/12/31 10:11:46 INFO alembic.runtime.migration: Context impl SQLiteImpl.
2025/12/31 10:11:46 INFO alembic.runtime.migration: Will assume non-transactional DDL.
2025/12/31 10:11:46 INFO alembic.runtime.migration: Context impl SQLiteImpl.
2025/12/31 10:11:46 INFO alembic.runtime.migration: Will assume non-transactional DDL.
2025/12/31 10:11:46 INFO mlflow.store.db.utils: Creating initial MLflow database tables...
2025/12/31 10:11:46 INFO mlflow.store.db.utils: Updating database tables
2025/12/31 10:11:46 INFO alembic.runtime.migration: Context impl SQLiteImpl.
2025/12/31 10:11:46 INFO alembic.runtime.migration: Will assume non-transactional DDL.


üèÜ Champion model in registry: v1


Downloading artifacts:   0%|          | 0/5 [00:00<?, ?it/s]

üì§ Predictions saved to s3://mlops-creditcard/inference/output/predictions_20251231_101147.csv


Downloading artifacts:   0%|          | 0/5 [00:00<?, ?it/s]

üì¶ Champion model uploaded to S3
‚úÖ AWS Batch Inference Completed
