# 🎯 Master Pipeline: End-to-End Data Science Project Pipeline
![](https://images.unsplash.com/photo-1639322537228-f710d846310a?w=1200&h=300&fit=crop)
## 🚀 Complete Data Science Workflow
This is where everything comes together! We'll build:

- Complete preprocessing pipeline
- Feature engineering automation
- Model-ready transformations
- Production-grade code
- Real business case study

## 📚 What We'll Build:
1. Data Pipeline Architecture - Modular design
2. Custom Transformers - Sklearn compatible
3. Feature Engineering Pipeline - Automated features
4. Preprocessing Pipeline - End-to-end cleaning
5. Validation Framework - Quality checks
6. Model Preparation - Ready for ML
7. Pipeline Persistence - Save and load
8. Production Deployment - API ready
9. Monitoring & Logging - Track everything
10. Complete Project - Customer churn prediction

## 🏗️ Let's Build Production Pipelines!

In [1]:
# Import all required libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
import warnings
import json
warnings.filterwarnings('ignore')

# Sklearn imports
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from sklearn.preprocessing import OneHotEncoder, LabelEncoder, OrdinalEncoder
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV, cross_validate
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.ensemble import RandomForestClassifier
import joblib
import logging
from datetime import datetime

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

print("🎯 Master Pipeline - Ready to Build!")
print("\n💡 Building production-ready data pipelines!")

  from pandas.core import (


🎯 Master Pipeline - Ready to Build!

💡 Building production-ready data pipelines!


## 📌 Section 1: Custom Transformers
### 🎯 Building Sklearn-Compatible Components

In [2]:
print("🔧 CUSTOM TRANSFORMERS\n" + "="*40)

class OutlierRemover(BaseEstimator, TransformerMixin):
    """Remove outliers using IQR method - works with both DataFrames and arrays"""
    
    def __init__(self, factor=1.5):
        self.factor = factor
        self.bounds_ = {}
        self.feature_names_ = None
    
    def fit(self, X, y=None):
        if isinstance(X, np.ndarray):
            X_df = pd.DataFrame(X)
            self.feature_names_ = list(range(X.shape[1]))
        else:
            X_df = X
            self.feature_names_ = X.columns.tolist()
        
        for i, column in enumerate(X_df.columns):
            if X_df.iloc[:, i].dtype in ['int64', 'float64', 'float32', 'int32']:
                Q1 = X_df.iloc[:, i].quantile(0.25)
                Q3 = X_df.iloc[:, i].quantile(0.75)
                IQR = Q3 - Q1
                self.bounds_[i] = (Q1 - self.factor * IQR, Q3 + self.factor * IQR)
        return self
    
    def transform(self, X):
        if isinstance(X, np.ndarray):
            X_df = pd.DataFrame(X)
        else:
            X_df = X.copy()
        
        for col_idx, (lower, upper) in self.bounds_.items():
            X_df.iloc[:, col_idx] = X_df.iloc[:, col_idx].clip(lower, upper)
        
        logger.info(f"Outliers capped for {len(self.bounds_)} columns")
        
        if isinstance(X, np.ndarray):
            return X_df.values
        else:
            return X_df


class MissingIndicator(BaseEstimator, TransformerMixin):
    """Add binary indicators for missing values"""
    
    def __init__(self, threshold=0.1):
        self.threshold = threshold
        self.columns_ = []
        self.feature_names_ = None
    
    def fit(self, X, y=None):
        if isinstance(X, np.ndarray):
            X_df = pd.DataFrame(X)
        else:
            X_df = X
        
        missing_pct = X_df.isnull().sum() / len(X_df)
        self.columns_ = list(missing_pct[missing_pct > self.threshold].index)
        
        if isinstance(X, pd.DataFrame):
            self.feature_names_ = X.columns.tolist()
        else:
            self.feature_names_ = list(range(X.shape[1]))
        
        return self
    
    def transform(self, X):
        if isinstance(X, np.ndarray):
            X_df = pd.DataFrame(X)
        else:
            X_df = X.copy()
        
        for col in self.columns_:
            X_df[f'missing_indicator_{col}'] = X_df.iloc[:, col].isnull().astype(int)
        
        logger.info(f"Added {len(self.columns_)} missing indicators")
        
        if isinstance(X, np.ndarray):
            return X_df.values
        else:
            return X_df


class FeatureEngineer(BaseEstimator, TransformerMixin):
    """Create new features from existing ones"""
    
    def __init__(self, create_interactions=True, create_ratios=True):
        self.create_interactions = create_interactions
        self.create_ratios = create_ratios
        self.numeric_columns_ = []
        self.feature_names_ = None
    
    def fit(self, X, y=None):
        if isinstance(X, np.ndarray):
            X_df = pd.DataFrame(X)
        else:
            X_df = X
            
        self.numeric_columns_ = []
        for i, col in enumerate(X_df.columns):
            if X_df.iloc[:, i].dtype in [np.float64, np.float32, np.int64, np.int32]:
                self.numeric_columns_.append(i)
        
        return self
    
    def transform(self, X):
        if isinstance(X, np.ndarray):
            X_df = pd.DataFrame(X)
        else:
            X_df = X.copy()
        
        for col_idx in self.numeric_columns_:
            if col_idx < X_df.shape[1]:
                col_name = X_df.columns[col_idx]
                X_df[f'{col_name}_squared'] = X_df.iloc[:, col_idx] ** 2
                X_df[f'{col_name}_log'] = np.log1p(np.abs(X_df.iloc[:, col_idx]))
        
        if self.create_interactions and len(self.numeric_columns_) > 1:
            cols_for_interaction = self.numeric_columns_[:min(5, len(self.numeric_columns_))]
            for i, col1_idx in enumerate(cols_for_interaction[:-1]):
                for col2_idx in cols_for_interaction[i+1:i+2]:
                    if col1_idx < X_df.shape[1] and col2_idx < X_df.shape[1]:
                        col1_name = X_df.columns[col1_idx]
                        col2_name = X_df.columns[col2_idx]
                        X_df[f'{col1_name}_x_{col2_name}'] = X_df.iloc[:, col1_idx] * X_df.iloc[:, col2_idx]
        
        if self.create_ratios and len(self.numeric_columns_) > 1:
            cols_for_ratio = self.numeric_columns_[:min(3, len(self.numeric_columns_))]
            for i, col1_idx in enumerate(cols_for_ratio[:-1]):
                for col2_idx in cols_for_ratio[i+1:i+2]:
                    if col1_idx < X_df.shape[1] and col2_idx < X_df.shape[1]:
                        col1_name = X_df.columns[col1_idx]
                        col2_name = X_df.columns[col2_idx]
                        X_df[f'{col1_name}_div_{col2_name}'] = X_df.iloc[:, col1_idx] / (X_df.iloc[:, col2_idx] + 1e-8)
        
        logger.info(f"Created {X_df.shape[1] - X.shape[1]} new features")
        
        if isinstance(X, np.ndarray):
            return X_df.values
        else:
            return X_df


print("✅ Custom transformers created:")
print("  • OutlierRemover")
print("  • MissingIndicator")
print("  • FeatureEngineer")

🔧 CUSTOM TRANSFORMERS
✅ Custom transformers created:
  • OutlierRemover
  • MissingIndicator
  • FeatureEngineer


## 📌 Section 2: Pipeline Architecture
### 🎯 Building Modular Pipelines

In [3]:
print("📊 CREATING SAMPLE DATASET\n" + "="*40)

# Generate realistic customer churn dataset
np.random.seed(42)
n_samples = 5000

data = pd.DataFrame({
    'customer_id': range(1, n_samples + 1),
    'age': np.random.normal(45, 15, n_samples).clip(18, 80),
    'tenure_months': np.random.exponential(24, n_samples),
    'monthly_charges': np.random.gamma(2, 40, n_samples),
    'total_charges': np.random.lognormal(7, 1.5, n_samples),
    'num_services': np.random.poisson(3, n_samples),
    'contract_type': np.random.choice(['Month-to-month', 'One year', 'Two year'], n_samples, p=[0.5, 0.3, 0.2]),
    'payment_method': np.random.choice(['Electronic', 'Mailed check', 'Bank transfer', 'Credit card'], n_samples),
    'internet_service': np.random.choice(['DSL', 'Fiber optic', 'No'], n_samples, p=[0.3, 0.5, 0.2]),
    'tech_support': np.random.choice(['Yes', 'No'], n_samples, p=[0.3, 0.7]),
    'online_security': np.random.choice(['Yes', 'No'], n_samples, p=[0.4, 0.6]),
    'device_protection': np.random.choice(['Yes', 'No'], n_samples, p=[0.35, 0.65]),
    'streaming_tv': np.random.choice(['Yes', 'No'], n_samples, p=[0.45, 0.55]),
    'streaming_movies': np.random.choice(['Yes', 'No'], n_samples, p=[0.45, 0.55]),
    'satisfaction_score': np.random.uniform(1, 5, n_samples),
    'support_tickets': np.random.poisson(2, n_samples),
    'late_payments': np.random.poisson(0.5, n_samples)
})

data['total_charges'] = data['monthly_charges'] * data['tenure_months'] * np.random.uniform(0.8, 1.2, n_samples)

churn_probability = (
    0.1 +
    0.3 * (data['contract_type'] == 'Month-to-month') +
    0.2 * (data['satisfaction_score'] < 2.5) +
    0.1 * (data['support_tickets'] > 5) +
    0.1 * (data['late_payments'] > 2) +
    0.1 * (data['tenure_months'] < 12) -
    0.2 * (data['contract_type'] == 'Two year') -
    0.1 * (data['online_security'] == 'Yes')
)

data['churn'] = (np.random.random(n_samples) < churn_probability).astype(int)

missing_cols = ['satisfaction_score', 'support_tickets', 'total_charges']
for col in missing_cols:
    missing_idx = np.random.choice(data.index, size=int(0.1 * len(data)), replace=False)
    data.loc[missing_idx, col] = np.nan

outlier_idx = np.random.choice(data.index, size=50, replace=False)
data.loc[outlier_idx, 'monthly_charges'] *= 5
data.loc[outlier_idx[:25], 'total_charges'] *= 10

print(f"Dataset shape: {data.shape}")
print(f"Churn rate: {data['churn'].mean()*100:.1f}%")
print(f"Missing values: {data.isnull().sum().sum()}")
print("\nFirst 5 rows:")
print(data.head())

📊 CREATING SAMPLE DATASET
Dataset shape: (5000, 18)
Churn rate: 31.0%
Missing values: 1500

First 5 rows:
   customer_id        age  tenure_months  monthly_charges  total_charges  \
0            1  52.450712       4.420973        28.998892            NaN   
1            2  42.926035       5.055371        42.243909     250.571324   
2            3  54.715328      14.820140        22.840260     282.019941   
3            4  67.845448       8.094345        49.860711     346.586247   
4            5  41.487699       6.823322        93.380864     562.060393   

   num_services   contract_type payment_method internet_service tech_support  \
0             4  Month-to-month     Electronic      Fiber optic           No   
1             2        Two year  Bank transfer              DSL           No   
2             3  Month-to-month     Electronic               No           No   
3             2        One year    Credit card      Fiber optic           No   
4             1  Month-to-month  Bank

In [4]:
print("🔧 BUILDING PREPROCESSING PIPELINE\n" + "="*40)

X = data.drop(['customer_id', 'churn'], axis=1)
y = data['churn']

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

print(f"Training set: {X_train.shape}")
print(f"Test set: {X_test.shape}")

numeric_features = X.select_dtypes(include=[np.number]).columns.tolist()
categorical_features = X.select_dtypes(include=['object']).columns.tolist()

print(f"\nNumeric features ({len(numeric_features)}): {numeric_features[:5]}...")
print(f"Categorical features ({len(categorical_features)}): {categorical_features[:5]}...")

numeric_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('outliers', OutlierRemover(factor=1.5)),
    ('scaler', RobustScaler())
])

categorical_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
    ('encoder', OneHotEncoder(drop='first', sparse_output=False))
])

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_pipeline, numeric_features),
        ('cat', categorical_pipeline, categorical_features)
    ])

full_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])

print("\n📋 Pipeline Structure:")
print("1. Numeric Pipeline:")
print("   → Median Imputation")
print("   → Outlier Capping")
print("   → Robust Scaling")
print("\n2. Categorical Pipeline:")
print("   → Constant Imputation")
print("   → One-Hot Encoding")
print("\n3. Model:")
print("   → Random Forest Classifier")

print("\n⏳ Training pipeline...")
full_pipeline.fit(X_train, y_train)

y_pred = full_pipeline.predict(X_test)
y_pred_proba = full_pipeline.predict_proba(X_test)[:, 1]

print("\n📊 Model Performance:")
print(f"Accuracy: {accuracy_score(y_test, y_pred):.3f}")
print(f"Precision: {precision_score(y_test, y_pred):.3f}")
print(f"Recall: {recall_score(y_test, y_pred):.3f}")
print(f"F1-Score: {f1_score(y_test, y_pred):.3f}")
print(f"ROC-AUC: {roc_auc_score(y_test, y_pred_proba):.3f}")

🔧 BUILDING PREPROCESSING PIPELINE
Training set: (4000, 16)
Test set: (1000, 16)

Numeric features (8): ['age', 'tenure_months', 'monthly_charges', 'total_charges', 'num_services']...
Categorical features (8): ['contract_type', 'payment_method', 'internet_service', 'tech_support', 'online_security']...

📋 Pipeline Structure:
1. Numeric Pipeline:
   → Median Imputation
   → Outlier Capping
   → Robust Scaling

2. Categorical Pipeline:
   → Constant Imputation
   → One-Hot Encoding

3. Model:
   → Random Forest Classifier

⏳ Training pipeline...


2025-10-08 18:03:24,861 - INFO - Outliers capped for 8 columns
2025-10-08 18:03:26,682 - INFO - Outliers capped for 8 columns
2025-10-08 18:03:26,802 - INFO - Outliers capped for 8 columns



📊 Model Performance:
Accuracy: 0.735
Precision: 0.615
Recall: 0.387
F1-Score: 0.475
ROC-AUC: 0.786


## 📌 Section 3: Cross-Validation and Optimization
### 🎯 Optimizing Pipeline Performance

In [5]:
print("🔍 PIPELINE OPTIMIZATION\n" + "="*40)

optimization_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', RandomForestClassifier(random_state=42))
])

param_grid = {
    'preprocessor__num__imputer__strategy': ['mean', 'median'],
    'preprocessor__num__scaler': [StandardScaler(), RobustScaler()],
    'classifier__n_estimators': [50, 100, 200],
    'classifier__max_depth': [5, 10, None],
    'classifier__min_samples_split': [2, 5, 10]
}

print("🔍 Starting Grid Search...")
print(f"Total combinations: {np.prod([len(v) for v in param_grid.values()])}")

grid_search = GridSearchCV(
    optimization_pipeline,
    param_grid,
    cv=3,
    scoring='roc_auc',
    n_jobs=-1,
    verbose=1
)

grid_search.fit(X_train, y_train)

print("\n✅ Grid Search Complete!")
print(f"Best Score: {grid_search.best_score_:.3f}")
print("\nBest Parameters:")
for param, value in grid_search.best_params_.items():
    print(f"  {param}: {value}")

best_model = grid_search.best_estimator_
y_pred_best = best_model.predict(X_test)
y_pred_proba_best = best_model.predict_proba(X_test)[:, 1]

print("\n📊 Best Model Performance on Test Set:")
print(f"Accuracy: {accuracy_score(y_test, y_pred_best):.3f}")
print(f"ROC-AUC: {roc_auc_score(y_test, y_pred_proba_best):.3f}")

cv_scores = cross_validate(
    best_model,
    X_train,
    y_train,
    cv=5,
    scoring=['accuracy', 'precision', 'recall', 'roc_auc'],
    return_train_score=True
)

cv_results = pd.DataFrame(cv_scores)
print("\n📊 Cross-Validation Results (5-fold):")
print(cv_results[['test_accuracy', 'test_precision', 'test_recall', 'test_roc_auc']].describe().round(3))

🔍 PIPELINE OPTIMIZATION
🔍 Starting Grid Search...
Total combinations: 108
Fitting 3 folds for each of 108 candidates, totalling 324 fits


2025-10-08 18:06:42,090 - INFO - Outliers capped for 8 columns
2025-10-08 18:06:44,086 - INFO - Outliers capped for 8 columns
2025-10-08 18:06:44,195 - INFO - Outliers capped for 8 columns



✅ Grid Search Complete!
Best Score: 0.771

Best Parameters:
  classifier__max_depth: 5
  classifier__min_samples_split: 5
  classifier__n_estimators: 200
  preprocessor__num__imputer__strategy: median
  preprocessor__num__scaler: StandardScaler()


2025-10-08 18:06:44,404 - INFO - Outliers capped for 8 columns



📊 Best Model Performance on Test Set:
Accuracy: 0.722
ROC-AUC: 0.803


2025-10-08 18:06:45,940 - INFO - Outliers capped for 8 columns
2025-10-08 18:06:46,061 - INFO - Outliers capped for 8 columns
2025-10-08 18:06:46,149 - INFO - Outliers capped for 8 columns
2025-10-08 18:06:46,301 - INFO - Outliers capped for 8 columns
2025-10-08 18:06:46,470 - INFO - Outliers capped for 8 columns
2025-10-08 18:06:47,647 - INFO - Outliers capped for 8 columns
2025-10-08 18:06:47,735 - INFO - Outliers capped for 8 columns
2025-10-08 18:06:47,814 - INFO - Outliers capped for 8 columns
2025-10-08 18:06:47,957 - INFO - Outliers capped for 8 columns
2025-10-08 18:06:48,116 - INFO - Outliers capped for 8 columns
2025-10-08 18:06:49,256 - INFO - Outliers capped for 8 columns
2025-10-08 18:06:49,345 - INFO - Outliers capped for 8 columns
2025-10-08 18:06:49,431 - INFO - Outliers capped for 8 columns
2025-10-08 18:06:49,569 - INFO - Outliers capped for 8 columns
2025-10-08 18:06:49,729 - INFO - Outliers capped for 8 columns
2025-10-08 18:06:51,020 - INFO - Outliers capped for 8 


📊 Cross-Validation Results (5-fold):
       test_accuracy  test_precision  test_recall  test_roc_auc
count          5.000           5.000        5.000         5.000
mean           0.709           0.649        0.132         0.776
std            0.005           0.034        0.021         0.023
min            0.704           0.600        0.105         0.751
25%            0.705           0.634        0.117         0.758
50%            0.709           0.654        0.137         0.777
75%            0.710           0.674        0.145         0.789
max            0.716           0.684        0.157         0.807


## 📌 Section 4: Pipeline Persistence
### 🎯 Saving and Loading Pipelines

In [6]:
print("💾 SAVING PIPELINE\n" + "="*40)

pipeline_filename = 'churn_prediction_pipeline.pkl'
metadata_filename = 'pipeline_metadata.json'

joblib.dump(best_model, pipeline_filename)
print(f"✅ Pipeline saved: {pipeline_filename}")

metadata = {
    'model_name': 'Churn Prediction Pipeline',
    'version': '1.0.0',
    'created_date': datetime.now().isoformat(),
    'features': X_train.columns.tolist(),
    'numeric_features': numeric_features,
    'categorical_features': categorical_features,
    'performance_metrics': {
        'accuracy': float(accuracy_score(y_test, y_pred_best)),
        'roc_auc': float(roc_auc_score(y_test, y_pred_proba_best)),
        'precision': float(precision_score(y_test, y_pred_best)),
        'recall': float(recall_score(y_test, y_pred_best))
    },
    'training_samples': len(X_train),
    'test_samples': len(X_test)
}

with open(metadata_filename, 'w') as f:
    json.dump(metadata, f, indent=2)

print(f"✅ Metadata saved: {metadata_filename}")
print("\n📋 Metadata:")
print(json.dumps(metadata, indent=2))

print("\n📥 Loading pipeline...")
loaded_pipeline = joblib.load(pipeline_filename)
with open(metadata_filename, 'r') as f:
    loaded_metadata = json.load(f)

print("✅ Pipeline loaded successfully!")

test_pred = loaded_pipeline.predict(X_test.iloc[:5])
print(f"\n🧪 Test predictions: {test_pred}")

💾 SAVING PIPELINE


2025-10-08 18:06:53,469 - INFO - Outliers capped for 8 columns


✅ Pipeline saved: churn_prediction_pipeline.pkl
✅ Metadata saved: pipeline_metadata.json

📋 Metadata:
{
  "model_name": "Churn Prediction Pipeline",
  "version": "1.0.0",
  "created_date": "2025-10-08T18:06:53.300196",
  "features": [
    "age",
    "tenure_months",
    "monthly_charges",
    "total_charges",
    "num_services",
    "contract_type",
    "payment_method",
    "internet_service",
    "tech_support",
    "online_security",
    "device_protection",
    "streaming_tv",
    "streaming_movies",
    "satisfaction_score",
    "support_tickets",
    "late_payments"
  ],
  "numeric_features": [
    "age",
    "tenure_months",
    "monthly_charges",
    "total_charges",
    "num_services",
    "satisfaction_score",
    "support_tickets",
    "late_payments"
  ],
  "categorical_features": [
    "contract_type",
    "payment_method",
    "internet_service",
    "tech_support",
    "online_security",
    "device_protection",
    "streaming_tv",
    "streaming_movies"
  ],
  "performa

## 📌 Section 5: Production Deployment
### 🎯 API-Ready Pipeline

In [7]:
print("🚀 PRODUCTION DEPLOYMENT\n" + "="*40)

class ChurnPredictor:
    """Production-ready churn prediction class"""
    
    def __init__(self, model_path, metadata_path):
        self.model = joblib.load(model_path)
        with open(metadata_path, 'r') as f:
            self.metadata = json.load(f)
        self.features = self.metadata['features']
        logger.info(f"Model loaded: {self.metadata['model_name']} v{self.metadata['version']}")
    
    def validate_input(self, data):
        """Validate input data"""
        if not isinstance(data, pd.DataFrame):
            raise ValueError("Input must be a pandas DataFrame")
        
        missing_features = set(self.features) - set(data.columns)
        if missing_features:
            raise ValueError(f"Missing features: {missing_features}")
        
        return data[self.features]
    
    def predict(self, data):
        """Make predictions"""
        try:
            data = self.validate_input(data)
            predictions = self.model.predict(data)
            probabilities = self.model.predict_proba(data)[:, 1]
            
            response = {
                'predictions': predictions.tolist(),
                'probabilities': probabilities.tolist(),
                'model_version': self.metadata['version'],
                'timestamp': datetime.now().isoformat()
            }
            
            logger.info(f"Prediction completed for {len(data)} samples")
            return response
            
        except Exception as e:
            logger.error(f"Prediction error: {str(e)}")
            raise
    
    def predict_single(self, customer_data):
        """Predict for a single customer"""
        df = pd.DataFrame([customer_data])
        result = self.predict(df)
        
        return {
            'churn_prediction': 'Yes' if result['predictions'][0] == 1 else 'No',
            'churn_probability': result['probabilities'][0],
            'risk_level': self._get_risk_level(result['probabilities'][0]),
            'timestamp': result['timestamp']
        }
    
    def _get_risk_level(self, probability):
        """Categorize risk level"""
        if probability < 0.3:
            return 'Low'
        elif probability < 0.7:
            return 'Medium'
        else:
            return 'High'
    
    def get_model_info(self):
        """Get model information"""
        return self.metadata


predictor = ChurnPredictor(pipeline_filename, metadata_filename)

test_customer = X_test.iloc[0].to_dict()
result = predictor.predict_single(test_customer)

print("📊 Single Customer Prediction:")
for key, value in result.items():
    print(f"  {key}: {value}")

batch_result = predictor.predict(X_test.iloc[:10])
print(f"\n📊 Batch Prediction ({len(batch_result['predictions'])} customers):")
print(f"  Churn rate: {np.mean(batch_result['predictions'])*100:.1f}%")
print(f"  Average probability: {np.mean(batch_result['probabilities']):.3f}")

2025-10-08 18:06:53,640 - INFO - Model loaded: Churn Prediction Pipeline v1.0.0
2025-10-08 18:06:53,662 - INFO - Outliers capped for 8 columns
2025-10-08 18:06:53,696 - INFO - Outliers capped for 8 columns
2025-10-08 18:06:53,716 - INFO - Prediction completed for 1 samples


🚀 PRODUCTION DEPLOYMENT
📊 Single Customer Prediction:
  churn_prediction: No
  churn_probability: 0.2519526684780314
  risk_level: Low
  timestamp: 2025-10-08T18:06:53.716235


2025-10-08 18:06:53,737 - INFO - Outliers capped for 8 columns
2025-10-08 18:06:53,776 - INFO - Outliers capped for 8 columns
2025-10-08 18:06:53,799 - INFO - Prediction completed for 10 samples



📊 Batch Prediction (10 customers):
  Churn rate: 10.0%
  Average probability: 0.333


## 📌 Section 6: Monitoring and Validation
### 🎯 Pipeline Monitoring

In [8]:
print("📊 PIPELINE MONITORING\n" + "="*40)

class PipelineMonitor:
    """Monitor pipeline performance and data drift"""
    
    def __init__(self, reference_data):
        self.reference_data = reference_data
        self.reference_stats = self._calculate_stats(reference_data)
        self.monitoring_history = []
    
    def _calculate_stats(self, data):
        """Calculate statistics for monitoring"""
        stats = {}
        
        numeric_cols = data.select_dtypes(include=[np.number]).columns
        for col in numeric_cols:
            stats[col] = {
                'mean': data[col].mean(),
                'std': data[col].std(),
                'min': data[col].min(),
                'max': data[col].max(),
                'missing_pct': data[col].isnull().mean()
            }
        
        categorical_cols = data.select_dtypes(include=['object']).columns
        for col in categorical_cols:
            stats[col] = {
                'unique_values': data[col].nunique(),
                'mode': data[col].mode()[0] if not data[col].mode().empty else None,
                'missing_pct': data[col].isnull().mean()
            }
        
        return stats
    
    def detect_drift(self, new_data, threshold=0.1):
        """Detect data drift"""
        new_stats = self._calculate_stats(new_data)
        drift_report = {}
        
        for feature in self.reference_stats:
            if feature in new_stats:
                ref = self.reference_stats[feature]
                new = new_stats[feature]
                
                if 'mean' in ref:
                    mean_change = abs(new['mean'] - ref['mean']) / (ref['mean'] + 1e-8)
                    std_change = abs(new['std'] - ref['std']) / (ref['std'] + 1e-8)
                    
                    if mean_change > threshold or std_change > threshold:
                        drift_report[feature] = {
                            'mean_change': mean_change,
                            'std_change': std_change,
                            'status': 'DRIFT DETECTED'
                        }
        
        return drift_report
    
    def validate_predictions(self, predictions, expected_range=(0, 1)):
        """Validate prediction outputs"""
        validation_report = {
            'total_predictions': len(predictions),
            'null_predictions': np.isnan(predictions).sum(),
            'out_of_range': ((predictions < expected_range[0]) | (predictions > expected_range[1])).sum(),
            'mean_prediction': np.mean(predictions),
            'std_prediction': np.std(predictions)
        }
        
        if validation_report['null_predictions'] > 0:
            validation_report['status'] = 'FAILED - Null predictions found'
        elif validation_report['out_of_range'] > 0:
            validation_report['status'] = 'WARNING - Out of range predictions'
        else:
            validation_report['status'] = 'PASSED'
        
        return validation_report


monitor = PipelineMonitor(X_train)

print("🔍 Drift Detection Test:")
drift_report = monitor.detect_drift(X_test)
if drift_report:
    print("⚠️ Drift detected in features:")
    for feature, details in drift_report.items():
        print(f"  {feature}: {details['status']}")
else:
    print("✅ No significant drift detected")

test_predictions = predictor.predict(X_test)['probabilities']
validation_report = monitor.validate_predictions(np.array(test_predictions))

print("\n📊 Prediction Validation:")
for key, value in validation_report.items():
    print(f"  {key}: {value}")

2025-10-08 18:06:53,918 - INFO - Outliers capped for 8 columns
2025-10-08 18:06:53,979 - INFO - Outliers capped for 8 columns


📊 PIPELINE MONITORING
🔍 Drift Detection Test:
⚠️ Drift detected in features:
  total_charges: DRIFT DETECTED


2025-10-08 18:06:54,030 - INFO - Prediction completed for 1000 samples



📊 Prediction Validation:
  total_predictions: 1000
  null_predictions: 0
  out_of_range: 0
  mean_prediction: 0.30788873472528455
  std_prediction: 0.12338705851272763
  status: PASSED


## 📌 Section 7: API Simulation
### 🎯 REST API Endpoint Simulation

In [9]:
print("🌐 API ENDPOINT SIMULATION\n" + "="*40)

def api_predict(request_data):
    """Simulate API endpoint for predictions"""
    
    try:
        if not request_data or 'data' not in request_data:
            return {
                'status': 'error',
                'message': 'Invalid request. Missing data field.',
                'code': 400
            }
        
        df = pd.DataFrame(request_data['data'])
        results = predictor.predict(df)
        
        response = {
            'status': 'success',
            'code': 200,
            'data': {
                'predictions': results['predictions'],
                'probabilities': results['probabilities'],
                'model_version': loaded_metadata['version'],
                'timestamp': results['timestamp']
            },
            'metadata': {
                'samples_processed': len(results['predictions']),
                'model_name': loaded_metadata['model_name']
            }
        }
        
        return response
        
    except Exception as e:
        return {
            'status': 'error',
            'message': str(e),
            'code': 500
        }


print("📡 Testing API Endpoint:\n")

test_request = {
    'data': X_test.iloc[:3].to_dict('records')
}

print("Request:")
print(f"  Sending {len(test_request['data'])} samples")

response = api_predict(test_request)

print("\nResponse:")
print(f"  Status: {response['status']}")
print(f"  Code: {response['code']}")
if response['status'] == 'success':
    print(f"  Predictions: {response['data']['predictions']}")
    print(f"  Probabilities: {[round(p, 3) for p in response['data']['probabilities']]}")
    print(f"  Model Version: {response['data']['model_version']}")

print("\n🔍 Testing Error Handling:")
bad_request = {'invalid': 'data'}
error_response = api_predict(bad_request)
print(f"  Error Status: {error_response['status']}")
print(f"  Error Message: {error_response['message']}")

2025-10-08 18:06:54,140 - INFO - Outliers capped for 8 columns


🌐 API ENDPOINT SIMULATION
📡 Testing API Endpoint:

Request:
  Sending 3 samples


2025-10-08 18:06:54,246 - INFO - Outliers capped for 8 columns
2025-10-08 18:06:54,293 - INFO - Prediction completed for 3 samples



Response:
  Status: success
  Code: 200
  Predictions: [0, 0, 0]
  Probabilities: [0.252, 0.33, 0.359]
  Model Version: 1.0.0

🔍 Testing Error Handling:
  Error Status: error
  Error Message: Invalid request. Missing data field.


## 🎯 Summary & Next Steps

### 🏆 What You've Built:

✅ **Custom Transformers**
- OutlierRemover
- MissingIndicator  
- FeatureEngineer

✅ **Production Pipeline**
- Modular architecture
- Automated preprocessing
- Feature engineering
- Model training

✅ **Advanced Features**
- Cross-validation
- Hyperparameter tuning
- Performance monitoring
- Data drift detection

✅ **Deployment Ready**
- Pipeline persistence
- API endpoints
- Error handling
- Documentation

### 🚀 Next Steps:
1. Deploy to Production - Use Flask/FastAPI
2. Add More Models - Ensemble methods
3. Implement A/B Testing - Compare models
4. Setup CI/CD - Automated deployment
5. Add Real-time Monitoring - Dashboard

### 💡 Key Takeaways:
- Pipelines ensure reproducibility
- Automation reduces errors
- Monitoring prevents degradation
- Documentation enables collaboration

## 🎉 Congratulations!
You've built a complete production-ready ML pipeline!

This is how real data science projects are deployed.

**Keep building, keep deploying, keep improving!** 🚀

In [10]:
print("🎊" * 20)
print("\n    🏆 MASTER PIPELINE COMPLETE! 🏆")
print("\n    You've mastered:")
print("    ✅ Custom Transformers")
print("    ✅ Pipeline Architecture")
print("    ✅ Feature Engineering")
print("    ✅ Model Deployment")
print("    ✅ Production Best Practices")
print("\n    Ready for: Machine Learning Algorithms!")
print("\n" + "🎊" * 20)

🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊

    🏆 MASTER PIPELINE COMPLETE! 🏆

    You've mastered:
    ✅ Custom Transformers
    ✅ Pipeline Architecture
    ✅ Feature Engineering
    ✅ Model Deployment
    ✅ Production Best Practices

    Ready for: Machine Learning Algorithms!

🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊
