# Pipeline Monitoring and Reusability

    Drift Detection, Health Monitoring, and Reusable ML Components
    
## Objective

This notebook demonstrates how to:

- Monitor deployed ML pipelines

- Detect data drift and prediction drift

- Track model health and stability

- Design reusable, modular pipeline components

- Build foundations for production MLOps

It answers:

    How do we ensure a deployed model remains reliable over time and reusable across systems?

## Why Monitoring Is Mandatory

Even correct models degrade due to:

- Data distribution shift

- Concept drift

- Feature schema change

- Behavioral change in users/business

- Upstream data quality failures

Without monitoring:

- ‚ùå Silent model decay
- ‚ùå Undetected bias drift
- ‚ùå Incorrect business decisions

üìå All production models must be monitored.

## Imports and dataset

In [9]:
import numpy as np
import pandas as pd
import joblib

from scipy.stats import ks_2samp
from sklearn.metrics import roc_auc_score


In [None]:
DATA_PATH =  """D:/GitHub/Data-Science-Techniques/datasets/synthetic_customer_churn_classification_complete.csv"""
df = pd.read_csv(DATA_PATH)

X = df.drop(columns=["churn", "customer_id"])
y = df["churn"]

In [18]:
X_train = X
y_train = y

# Load Trained Pipeline

In [16]:
pipeline = joblib.load("churn_pipeline.joblib")

This represents the reference distribution.

## Simulated New Production Data

In [21]:
prod_df = X_train.sample(frac=1.0, random_state=42).copy()

# Simulate drift
prod_df["income"] *= 1.2
prod_df["avg_monthly_usage"] *= 0.8


## Data Drift Detection (KS Test)

In [24]:
def detect_data_drift(reference_df, current_df, alpha=0.01):
    drift_report = {}

    numeric_cols = reference_df.select_dtypes(include=np.number).columns

    for col in numeric_cols:
        stat, p_value = ks_2samp(reference_df[col], current_df[col])
        drift_report[col] = {
            "ks_statistic": stat,
            "p_value": p_value,
            "drift_detected": p_value < alpha
        }

    return pd.DataFrame(drift_report).T


Run drift check:

In [42]:
drift_report = detect_data_drift(X_train, prod_df)
drift_report


Unnamed: 0,ks_statistic,p_value,drift_detected
age,0.0,1.0,False
income,,,False
tenure_years,0.0,1.0,False
avg_monthly_usage,,,False
support_tickets_last_year,0.0,1.0,False
future_retention_offer,0.0,1.0,False


drift_detected=True indicates distribution shift.

## Prediction Drift Monitoring

In [27]:
train_preds = pipeline.predict_proba(X_train)[:, 1]
prod_preds = pipeline.predict_proba(prod_df)[:, 1]

ks_2samp(train_preds, prod_preds)


KstestResult(statistic=np.float64(0.0198), pvalue=np.float64(0.03966362560959423), statistic_location=np.float64(0.00024221750088898437), statistic_sign=np.int8(-1))

Large KS statistic ‚Üí prediction behavior changed.

## Model Performance Monitoring (If Labels Available)

In [30]:
# Simulated production labels
y_prod = y_train.copy()

roc_auc_score(y_train, train_preds), roc_auc_score(y_prod, prod_preds)


(np.float64(1.0), np.float64(0.5075215558602145))

Performance decay signals concept drift.

## Feature Schema Validation

In [33]:
def validate_schema(model, input_df):
    expected_cols = set(model.feature_names_in_)
    incoming_cols = set(input_df.columns)

    missing = expected_cols - incoming_cols
    extra = incoming_cols - expected_cols

    return {
        "missing_columns": missing,
        "extra_columns": extra,
        "schema_valid": len(missing) == 0
    }


In [35]:
validate_schema(pipeline, prod_df)

{'missing_columns': set(), 'extra_columns': set(), 'schema_valid': True}

Schema mismatch must block inference in production.

## Monitoring Dashboard Metrics

Typical tracked signals:

| Metric            | Purpose               |
| ----------------- | --------------------- |
| Data drift        | Distribution change   |
| Prediction drift  | Model behavior change |
| AUC / accuracy    | Performance decay     |
| Feature null rate | Data quality          |
| Schema mismatch   | Pipeline safety       |


Example:

In [45]:
monitoring_summary = {
    "data_drift_features": drift_report["drift_detected"].sum(),
    "prediction_shift_ks": ks_2samp(train_preds, prod_preds).statistic,
    "train_auc": roc_auc_score(y_train, train_preds),
    "prod_auc": roc_auc_score(y_prod, prod_preds)
}

monitoring_summary


{'data_drift_features': np.False_,
 'prediction_shift_ks': np.float64(0.0198),
 'train_auc': np.float64(1.0),
 'prod_auc': np.float64(0.5075215558602145)}

## Designing Reusable Pipelines

Reusable pipeline principles:

- ‚úî No hard-coded paths
- ‚úî Schema-driven
- ‚úî Deterministic transformations
- ‚úî Serializable components
- ‚úî Versionable

## Reusable Prediction Wrapper

In [48]:
class ReusablePipeline:
    def __init__(self, model_path):
        self.pipeline = joblib.load(model_path)

    def predict(self, df):
        schema_check = validate_schema(self.pipeline, df)
        if not schema_check["schema_valid"]:
            raise ValueError("Schema mismatch detected")

        preds = self.pipeline.predict(df)
        probs = self.pipeline.predict_proba(df)[:, 1]

        return pd.DataFrame({
            "prediction": preds,
            "probability": probs
        })


Usage:

In [51]:
model = ReusablePipeline("churn_pipeline.joblib")
model.predict(prod_df.head())

Unnamed: 0,prediction,probability
0,0,0.02278
1,0,6.6e-05
2,0,7e-06
3,0,3.4e-05
4,1,0.998476


## When to Retrain Model

Retraining triggers:

- Persistent data drift

- Prediction drift

- Performance decay

- Business regime change

- Regulatory requirement

Monitoring ‚Üí Alert ‚Üí Retrain loop.

## Common Monitoring Failures

| Failure                  | Consequence        |
| ------------------------ | ------------------ |
| No baseline stored       | Drift undetectable |
| Monitoring only accuracy | Silent failure     |
| Ignoring schema          | Crashes            |
| No retraining policy     | Model decay        |
| Manual monitoring        | Non-scalable       |


## Best Practices

- ‚úî Always store training distribution
- ‚úî Monitor both data and predictions
- ‚úî Automate drift alerts
- ‚úî Log model versions
- ‚úî Build reusable wrappers

## Key Takeaways

- Deployment is not the end ‚Äî monitoring is mandatory

- Drift detection protects business reliability

- Pipelines should be reusable artifacts

- Monitoring enables safe retraining

- This completes the ML production lifecycle

## Full Pipeline Lifecycle

# Related Notebooks

[09_Pipelines_and_Workflows/]()

‚îú‚îÄ‚îÄ 	[	01_basic_pipeline.ipynb	](	01_basic_pipeline.ipynb	)

‚îú‚îÄ‚îÄ 	[	02_column_transformer_pipeline.ipynb	](	02_column_transformer_pipeline.ipynb	)

‚îú‚îÄ‚îÄ 	[	03_pipeline_with_feature_engineering.ipynb	](	03_pipeline_with_feature_engineering.ipynb	)

‚îú‚îÄ‚îÄ 	[	04_leakage_safe_cross_validation.ipynb	](	04_leakage_safe_cross_validation.ipynb	)

‚îú‚îÄ‚îÄ 	[	05_pipeline_with_model_tuning.ipynb	](	05_pipeline_with_model_tuning.ipynb	)

‚îú‚îÄ‚îÄ 	[	06_pipeline_serialization_and_inference.ipynb	](  06_pipeline_serialization_and_inference.ipynb )  

‚îú‚îÄ‚îÄ 	[	07_pipeline_monitoring_and_reusability.ipynb	](	07_pipeline_monitoring_and_reusability.ipynb	) **‚Üê COMPLETE**

<br><br>
## Next Possible Extensions


- [08_model_versioning_and_registry.ipynb](#)

- [09_batch_vs_real_time_inference.ipynb](#)

- [10_data_and_concept_drift_deep_dive.ipynb](#)

- [11_full_mlops_pipeline.ipynb](#)

- [12_production_failure_modes.ipynb](#)