# Production Feature Engineering Pipeline

This notebook demonstrates production-ready feature engineering practices for MLOps workflows.
It includes data validation, feature creation, transformation pipelines, and monitoring components.

The purpose of this notebook is to prepare high-quality, validated, production ready features. A model is only as good as the data it's trained on, and this pipeline ensures the data is robust, clean and consistent.

## Table of Contents
1. [Setup and Imports](#setup)
2. [Data Loading and Validation](#data-loading)
3. [Exploratory Data Analysis](#eda)
4. [Feature Engineering Pipeline](#feature-engineering)
5. [Feature Validation and Quality Checks](#validation)
6. [Pipeline Serialization and Deployment](#deployment)
7. [Monitoring and Logging](#monitoring)

## 1. Setup and Imports

Import all necessary libraries for production feature engineering including:
- Data manipulation and analysis
- Feature engineering and preprocessing
- Pipeline creation and serialization
- Logging and monitoring
- Data validation

In [None]:
# Core data manipulation libs
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# Visualization libs
import matplotlib.pyplot as plt
import seaborn as sns
plt.style.use('seaborn-v0_8')

# Scikit-learn for preprocessing and pipeline creation
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import (
    StandardScaler, MinMaxScaler, RobustScaler,
    LabelEncoder, OneHotEncoder, OrdinalEncoder,
    PolynomialFeatures, PowerTransformer
)
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif

# Model evaluation
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix

# Serialization and deployment
import joblib
import pickle
from pathlib import Path

# Logging/monitoring
import logging
import json
import sys
from typing import Dict, List, Tuple, Any, Optional

# Data Validation
from scipy import stats
from scipy.stats import chi2_contingency

# Config
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', 50)

# Set random seed for reproducibility
RANDOM_STATE = 77
np.random.seed(RANDOM_STATE)

print(f"Setup completed at: {datetime.now()}")
print(f"Python version: {sys.version}")
print(f"Pandas version: {pd.__version__}")
print(f"NumPy version: {np.__version__}")

## 2. Data Loading and Validation

Load data with comprehensive validation including:
- Schema validation
- Data quality checks
- Missing value anlaysis
- Data type validation

In [None]:
# Configure logging to log file and console
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('feature_engineering.log'),
        logging.StreamHandler(sys.stdout)
    ]
)
# Logger object named after current module
logger = logging.getLogger(__name__)

class DataValidator:
    """
    Data validation class for production feature engineering.
    Performs comprehensive data quality checks and schema validation.
    """

    def __init__(self, expected_schema: Dict[str, str]):
        self.expected_schema = expected_schema
        self.validation_results = {}

    def validate_schema(self, df: pd.DataFrame) -> bool:
        """Validate dataframe schema against expected schema."""
        logger.info("Starting schema validation")

        missing_cols = set(self.expected_schema.keys()) - set(df.columns)
        extra_cols = set(df.columns) - set(self.expected_schema.keys())

        if missing_cols:
            logger.error(f"Missing columns: {missing_cols}")
            return False
        
        if extra_cols:
            logger.warning(f"Unexpected columns found: {extra_cols}")

        # Validate data types, iterate over each real col and expected data type
        type_mismatches = []
        for col, expected_type in self.expected_schema.items():
            if col in df.columns:
                actual_type = str(df[col].dtype)
                # lenient, as int might appear in int64
                if expected_type not in actual_type and actual_type not in expected_type:
                    type_mismatches.append((col, expected_type, actual_type))

        if type_mismatches:
            logger.warning(f"Data type mismatches: {type_mismatches}")

        logger.info("Schema validation completed!")
        return True
    
    def validate_data_quality(self, df: pd.DataFrame) -> Dict[str, Any]:
        """Perform comprehensive data quality validation."""
        logger.info("Starting data quality validation")

        quality_report = {
            'total_rows': len(df),
            'total_columns': len(df.columns),
            'missing_values': df.isnull().sum().to_dict(),
            'missing_percentage': (df.isnull().sum() / len(df) * 100).to_dict(),
            'duplicate_rows': df.duplicated().sum(),
            'data_types': df.dtypes.astype(str).to_dict(),
            'memory_usage': df.memory_usage(deep=True).sum() / 1024**2  # MB
        }

        # Check for columns with greater than 50% missing values
        high_missing_cols = [
            col for col, pct in quality_report['missing_precentage'].items()
            if pct > 50
        ]

        if high_missing_cols:
            logger.warning(f"Columns with >50% missing values: {high_missing_cols}")

        # check for columns with 0 variance (np.number is superclass for all numeric types)
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        zero_variance_cols = [col for col in numeric_cols if df[col].var() == 0]

        if zero_variance_cols:
            logger.warning(f"Columns with zero variance: {zero_variance_cols}")
        
        quality_report['high_missing_columns'] = high_missing_cols
        quality_report['zero_variance_columns'] = zero_variance_cols
        
        logger.info("Data quality validation completed!")
        return quality_report

In [None]:
# Create a sample dataset or load one
def create_sample_dataset():
    """Create a sample dataset for demonstration purposes."""
    np.random.seed(RANDOM_STATE)
    n_samples = 10000
    
    data = {
        'customer_id': range(1, n_samples + 1),
        'age': np.random.randint(18, 80, n_samples),
        'income': np.random.lognormal(10, 1, n_samples),
        'credit_score': np.random.randint(300, 850, n_samples),
        'account_balance': np.random.normal(5000, 2000, n_samples),
        'num_products': np.random.poisson(2, n_samples),
        'tenure_months': np.random.randint(1, 120, n_samples),
        'is_active': np.random.choice([0, 1], n_samples, p=[0.3, 0.7]),
        'geography': np.random.choice(['Urban', 'Suburban', 'Rural'], n_samples, p=[0.5, 0.3, 0.2]),
        'gender': np.random.choice(['Male', 'Female'], n_samples, p=[0.52, 0.48]),
        'has_credit_card': np.random.choice([0, 1], n_samples, p=[0.4, 0.6]),
        'last_transaction_date': pd.date_range('2023-01-01', '2024-01-01', periods=n_samples),
        'churn': None  # Target variable - will be generated based on features
    }
    
    df = pd.DataFrame(data)

    # Generate target variable based on features using a realistic relationship
    # We are adding these probabilities together
    churn_prob = (
        0.1 +                                            # Baseline risk
        0.001 * (df['age'] - 40)**2 +                    # U-shaped age effect
        -0.00001 * df['income'] +                        # Higher income = lower churn
        -0.0002 * df['credit_score'] +                   # Higher credit score = lower churn
        -0.01 * df['num_products'] +                     # More products = lower churn
        -0.002 * df['tenure_months'] +                   # Longer tenure = lower churn
        -0.1 * df['is_active'] +                         # Active users = lower churn
        0.05 * (df['geography'] == 'Rural').astype(int)  # Rural = higher churn
    )

    # Ensure probabilities are between 0 and 1
    churn_prob = np.clip(churn_prob, 0, 1)
    df['churn'] = np.random.binomial(1, churn_prob, n_samples)
    
    # Introduce some missing values to simulate real-world data
    df.loc[np.random.choice(df.index, size=500, replace=False), 'income'] = np.nan
    df.loc[np.random.choice(df.index, size=300, replace=False), 'credit_score'] = np.nan
    df.loc[np.random.choice(df.index, size=200, replace=False), 'account_balance'] = np.nan
    
    return df

# Load and validate data
logger.info("Loading dataset...")
df = create_sample_dataset()

# Define the expected schema
expected_schema = {
    'customer_id': 'int64',
    'age': 'int64',
    'income': 'float64',
    'credit_score': 'float64',
    'account_balance': 'float64',
    'num_products': 'int64',
    'tenure_months': 'int64',
    'is_active': 'int64',
    'geography': 'object',
    'gender': 'object',
    'has_credit_card': 'int64',
    'last_transaction_date': 'datetime64[ns]',
    'churn': 'int64'
}

# Initialize validator and perform validation
validator = DataValidator(expected_schema)
is_valid_schema = validator.validate_schema(df)
quality_report = validator.validate_data_quality(df)

print("Data Quality Report:")
print(json.dumps(quality_report, indent=2, default=str))

## 3. Exploratory Data Analysis

Perform comprehensive EDA to understand data patterns and inform feature engineering decisions.

In [None]:
def perform_eda(df: pd.DataFrame) -> None:
    """
    Perform comprehensive exploratory data analysis (EDA).
    """

    logger.info("Starting exploratory data analysis")
    
    print("Dataset Overview:")
    print(f"Shape: {df.shape}")
    print(f"Memory usage: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

    # Describe gives descriptive stats, works only on numeric columns
    # display makes the DF look like a table, part of Jupyter
    print("\nBasic Stats:")
    display(df.describe(include='all'))

    print("\nMissing Values Analysis:")
    missing_data = pd.DataFrame({
        'Missing Count': df.isnull().sum(),
        'Missing Percentage': (df.isnull().sum() /len(df)) * 100
    })
    # missing_data['Missing Count'] > 0 Creates a boolean mask: Series of True/False values, one per row
    # missing_data[boolean_mask] -> pandas does the filtering, only keeping where condition is true
    missing_data = missing_data[missing_data['Missing Count'] > 0].sort_values('Missing Count', ascending=False)
    # Assign it back to missing_data for clean reassignment
    print(missing_data)

    # Correlation analysis for numeric features (select only numeric column names)
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    if len(numeric_cols) > 1:
        plt.figure(figsize=(12, 8))
        # .corr() computes the correlation matrix (pairwise correlations between all numeric columns)
        correlation_matrix = df[numeric_cols].corr()
        sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0, fmt='.2f')
        plt.title('Feature Correlation Matrix')
        plt.tight_layout()
        plt.show()
    
    # Target variable distribution
    if 'churn' in df.columns:
        plt.figure(figsize=(15, 5))
        
        plt.subplot(1, 3, 1) # 1 row, 3 columns of plots, this is the first plot
        # plot the distribution of churn values, 0 vs 1
        df['churn'].value_counts().plot(kind='bar')
        plt.title('Target Variable Distribution')
        plt.xlabel('Churn')
        plt.ylabel('Count')
        
        # Age distribution by churn
        plt.subplot(1, 3, 2)
        for churn_val in df['churn'].unique():
            subset = df[df['churn'] == churn_val]['age'].dropna()
            plt.hist(subset, alpha=0.7, label=f'Churn = {churn_val}', bins=20)
        plt.title('Age Distribution by Churn')
        plt.xlabel('Age')
        plt.ylabel('Frequency')
        plt.legend()
        
        # Income distribution by churn
        plt.subplot(1, 3, 3)
        for churn_val in df['churn'].unique():
            subset = df[df['churn'] == churn_val]['income'].dropna()
            plt.hist(subset, alpha=0.7, label=f'Churn = {churn_val}', bins=20)
        plt.title('Income Distribution by Churn')
        plt.xlabel('Income')
        plt.ylabel('Frequency')
        plt.legend()
        plt.yscale('log')  # Log scale due to income distribution
        
        plt.tight_layout()
        plt.show()
    
    # Categorical variable analysis
    categorical_cols = df.select_dtypes(include=['object']).columns
    if len(categorical_cols) > 0:
        # create a row of subplots, 1 row, N columns (len(categorical_cols))
        fig, axes = plt.subplots(1, len(categorical_cols), figsize=(15, 5))
        if len(categorical_cols) == 1:
            # if only 1 column, then axes will not be a list, so make it one
            axes = [axes]
            
        # loop and plot each into its subplot
        for i, col in enumerate(categorical_cols):
            df[col].value_counts().plot(kind='bar', ax=axes[i])
            axes[i].set_title(f'{col} Distribution')
            axes[i].tick_params(axis='x', rotation=45)
        
        plt.tight_layout()
        plt.show()

# Perform EDA
perform_eda(df)

## 4. Feature Engineering Pipeline

Create comprehensive feature engineering pipeline with custom transformers and production-ready components.

In [None]:
# Create class inheriting from BaseEstimator and TransformerMixin
class DateFeatureExtractor(BaseEstimator, TransformerMixin):
    """
    Custom transformer to extract date-based features.
    Extracts multiple temporal features from datetime columns.
    class will be able to get_params(), set_params().
    """

    def __init__(self, date_column: str, reference_date: Optional[datetime] = None):
        self.date_column = date_column
        self.reference_date = reference_date or datetime.now()

    def fit(self, X, y=None):
        return self
    # returning self ensures the transformer can participate in chained operations

    def transform(self, X):
        """Extract comprehensive date features."""
        # Create a copy of the dataframe for manipulation
        X_copy = X.copy()

        # make sure datetime type for the 
        if not pd.api.types.is_datetime64_any_dtype(X_copy[self.date_column]):
            X_copy[self.date_column] = pd.to_datetime(X_copy[self.date_column])

        # Extract temporal features by creating new columns using pd functions
        X_copy[f'{self.date_column}_year'] = X_copy[self.date_column].dt.year
        X_copy[f'{self.date_column}_month'] = X_copy[self.date_column].dt.month
        X_copy[f'{self.date_column}_day'] = X_copy[self.date_column].dt.day
        X_copy[f'{self.date_column}_dayofweek'] = X_copy[self.date_column].dt.dayofweek
        X_copy[f'{self.date_column}_quarter'] = X_copy[self.date_column].dt.quarter
        X_copy[f'{self.date_column}_is_weekend'] = (X_copy[self.date_column].dt.dayofweek >= 5).astype(int)
        X_copy[f'{self.date_column}_is_month_end'] = X_copy[self.date_column].dt.is_month_end.astype(int)
        X_copy[f'{self.date_column}_is_month_start'] = X_copy[self.date_column].dt.is_month_start.astype(int)

        # Calculate days since reference date
        X_copy[f'{self.date_column}_days_since_reference'] = (
            self.reference_date - X_copy[self.date_column]
        ).dt.days 

        # Drop original date column since we no longer need it
        X_copy = X_copy.drop(columns=[self.date_column])

        return X_copy
    
class BusinessFeatureCreator(BaseEstimator, TransformerMixin):
    """
    Custom transformer to create business-specific features.
    Creates domain-knowledge based features for better model performance.
    """

    def fit(self, X, y=None):
        # Store statistical information for feature creation
        self.income_percentiles = np.percentile(X['income'].dropna(), [25, 50, 75]) # Split into percentiles
        self.credit_score_percentiles = np.percentile(X['credit_score'].dropna(), [25, 50, 75])
        return self
    
    def transform(self, X):
        """Creaste the business-specific features."""
        X_copy = X.copy()

        # Customer segmentation features with previously identified weights
        X_copy['customer_value_score'] = (
            0.3 * X_copy['account_balance'] + 
            0.3 * X_copy['income'] + 
            0.2 * X_copy['credit_score'] + 
            0.2 * X_copy['num_products'] * 1000
        )

        # Risk assessment features 
        # conditional assessment (where(condition, x, y)) and clipping
        # if false, assign 0
        X_copy['debt_to_income_ratio'] = np.where(
            X_copy['income'] > 0,
            -X_copy['account_balance'] / X_copy['income'],
            0
        )
        # Values less than 0 become 0, and values greater than 10 become 10
        X_copy['debt_to_income_ratio'] = np.clip(X_copy['debt_to_income_ratio'], 0, 10)

        # Age-based features
        # Cut converts continous age into categorical groups (one fewer for 0)
        X_copy['age_group'] = pd.cut(
            X_copy['age'],
            bins=[0, 25, 35, 45, 55, 100],
            labels=['young', 'young_adult', 'middle_age', 'mature', 'senior']
        )

        # Income-based features
        X_copy['income_tier'] = pd.cut(
            X_copy['income'], 
            bins=[-np.inf] + list(self.income_percentiles) + [np.inf],
            labels=['low', 'medium_low', 'medium_high', 'high']
        )
        
        # Credit score categories
        X_copy['credit_category'] = pd.cut(
            X_copy['credit_score'],
            bins=[0, 580, 670, 740, 850],
            labels=['poor', 'fair', 'good', 'excellent']
        )
        
        # Interaction features
        X_copy['products_per_tenure_month'] = X_copy['num_products'] / np.maximum(X_copy['tenure_months'], 1)
        X_copy['balance_per_product'] = X_copy['account_balance'] / np.maximum(X_copy['num_products'], 1)
        X_copy['age_tenure_interaction'] = X_copy['age'] * X_copy['tenure_months']
        
        # Boolean features
        X_copy['is_high_value_customer'] = (
            (X_copy['customer_value_score'] > X_copy['customer_value_score'].quantile(0.8)) & 
            (X_copy['num_products'] >= 3)
        ).astype(int)
        
        X_copy['is_at_risk'] = (
            (X_copy['debt_to_income_ratio'] > 0.5) | 
            (X_copy['account_balance'] < 0) |
            (X_copy['credit_score'] < 600)
        ).astype(int)
        
        return X_copy
    
class OutlierTreatment(BaseEstimator, TransformerMixin):
    """
    Custom transformer for outlier detection and treatment.
    Uses IQR method with configurable treatement strats.
    Interquartile Range finds the values between Q1-Q3
    """

    # Always run when new instance of class is created
    # method and factor are hyperparameters of the transformer
    # Factor is usually 1.5
    def __init__(self, method='cap', factor=1.5):
        """
        Initialize outlier treatment transformer.

        Parameters:
        method: 'cap', 'remove', or 'winsorize'
        factor: IQR multiplier for outlier detection
        """
        self.method = method
        self.factor = factor
        self.outlier_bounds = {}

    def fit(self, X, y=None):
        """Calculate outlier bounds for numeric columns."""
        numeric_cols = X.select_dtypes(include=[np.number]).columns

        for col in numeric_cols:
            Q1 = X[col].quantile(0.25)
            Q3 = X[col].quantile(0.75)
            IQR = Q3 - Q1

            # for each numeric column, the lower and upper bound are computed
            lower_bound = Q1 - self.factor * IQR
            upper_bound = Q3 + self.factor * IQR

            # Produces a dictionary of dictionaries (age: lower: x, upper: u)
            self.outlier_bounds[col] = {
                'lower': lower_bound,
                'upper': upper_bound
            }

            return self
        
    def transform(self, X):
        """Apply outlier treatment."""
        X_copy = X.copy()

        for col, bounds in self.outlier_bounds.items():
            if col in X_copy.columns:
                if self.method == 'cap':
                    X_copy[col] = np.clip(X_copy[col], bounds['lower'], bounds['upper'])
                elif self.method == 'winsorize':
                    X_copy[col] = X_copy[col].clip(lower=X_copy[col].quantile(0.05), 
                                                   upper=X_copy[col].quantile(0.95))
        
        return X_copy
    
def create_feature_engineering_pipeline():
    """
    Create feature engineering pipeline.
    Returns a scikit-learn pipeline ready for production usage.
    """

    # Define the column groups
    numeric_features = ['age', 'income', 'credit_score', 'account_balance', 
                       'num_products', 'tenure_months']
    categorical_features = ['geography', 'gender']
    binary_features = ['is_active', 'has_credit_card']

    # Define the numeric processing pipeline
    numeric_pipeline = Pipeline([
        ('imputer', KNNImputer(n_neighbors=5)),
        ('outlier_treatment', OutlierTreatment(method='cap')),
        ('scaler', RobustScaler()) # Robust to outliers
    ])

    # Define categorical pipeline
    categorical_pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy='constant', fill_value='unknown')),
        ('onehot', OneHotEncoder(drop='first', sparse_output=False, handle_unknown='ignore'))
    ])

    # Binary features pipeline with minimal preprocessing
    binary_pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy='most_frequent'))
    ])

    # Column transformer to apply different preprocessing to different column types
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_pipeline, numeric_features),
            ('cat', categorical_pipeline, categorical_features),
            ('bin', binary_pipeline, binary_features)
        ],
        remainder='passthrough' # keep other columns as is
    )

    # Complete feature pipeline
    feature_pipeline = Pipeline([
        ('date_features', DateFeatureExtractor('last_transaction_date')),
        ('business_features', BusinessFeatureCreator()),
        ('preprocessor', preprocessor),
        ('feature_selection', SelectKBest(score_func=f_classif, k=20))
    ])

    return feature_pipeline

# Now, create and display the pipeline
feature_pipeline = create_feature_engineering_pipeline()
print("Feature Engineering Pipeline Created:")
print(feature_pipeline)

## 5. Feature Validation and Quality Checks

Implement comprehensive feature validation and quality monitoring.

In [None]:
class FeatureValidator:
    """
    Comprehensive feature validation for production ML pipelines.
    Monitors feature quality, distribution shifts, and data integrity.
    """
    
    def __init__(self, reference_stats: Optional[Dict] = None):
        # If no reference stats (first run), empty dict
        self.reference_stats = reference_stats or {}
        self.validation_results = {}
    
    def calculate_feature_statistics(self, X: pd.DataFrame) -> Dict[str, Any]:
        """Calculate comprehensive feature statistics."""
        # Create the stats dict
        stats = {
            'shape': X.shape,
            'missing_values': X.isnull().sum().to_dict(),
            'data_types': X.dtypes.astype(str).to_dict(),
            'numeric_stats': {},
            'categorical_stats': {}
        }
        
        # Numeric feature statistics for numeric columns only
        numeric_cols = X.select_dtypes(include=[np.number]).columns
        for col in numeric_cols:
            col_stats = {
                'mean': X[col].mean(),
                'std': X[col].std(),
                'min': X[col].min(),
                'max': X[col].max(),
                'q25': X[col].quantile(0.25),
                'q50': X[col].quantile(0.50),
                'q75': X[col].quantile(0.75),
                'skewness': X[col].skew(), # Skew 0 is perfectly symmetric
                'kurtosis': X[col].kurtosis(), # "tailedness"/concentration of outliers
                'zero_count': (X[col] == 0).sum(),
                'unique_count': X[col].nunique()
            }
            # Add numeric_stats to the dict
            stats['numeric_stats'][col] = col_stats
        
        # Categorical feature statistics
        # iloc[0] grabs the first mode (in case of ties)
        categorical_cols = X.select_dtypes(include=['object', 'category']).columns
        for col in categorical_cols:
            col_stats = {
                'unique_count': X[col].nunique(),
                'mode': X[col].mode().iloc[0] if len(X[col].mode()) > 0 else None,
                'value_counts': X[col].value_counts().to_dict()
            }
            # Categorical stats key will be added, and each category has its own entry
            # with unique count, mode, and value counts dict
            stats['categorical_stats'][col] = col_stats
        
        return stats
    
    def detect_distribution_shift(self, current_stats: Dict, reference_stats: Dict, 
                                threshold: float = 0.1) -> Dict[str, Any]:
        """Detect distribution shifts in features."""
        shifts = {
            'numeric_shifts': {},
            'categorical_shifts': {},
            'alerts': []
        }
        
        # Check numeric feature shifts
        for col in current_stats['numeric_stats']:
            if col in reference_stats['numeric_stats']:
                current = current_stats['numeric_stats'][col]
                reference = reference_stats['numeric_stats'][col]
                
                # Calculate relative changes
                mean_shift = abs(current['mean'] - reference['mean']) / abs(reference['mean'])
                std_shift = abs(current['std'] - reference['std']) / abs(reference['std']) if reference['std'] > 0 else 0
                
                shifts['numeric_shifts'][col] = {
                    'mean_shift': mean_shift,
                    'std_shift': std_shift,
                    'alert': mean_shift > threshold or std_shift > threshold
                }
                
                if shifts['numeric_shifts'][col]['alert']:
                    shifts['alerts'].append(f"Distribution shift detected in {col}")
        
        # Check categorical feature shifts using chi-square test
        for col in current_stats['categorical_stats']:
            if col in reference_stats['categorical_stats']:
                current_counts = current_stats['categorical_stats'][col]['value_counts']
                reference_counts = reference_stats['categorical_stats'][col]['value_counts']
                
                # Align categories
                all_categories = set(current_counts.keys()) | set(reference_counts.keys())
                current_aligned = [current_counts.get(cat, 0) for cat in all_categories]
                reference_aligned = [reference_counts.get(cat, 0) for cat in all_categories]
                
                # Perform chi-square test if sufficient data
                if sum(current_aligned) > 0 and sum(reference_aligned) > 0:
                    try:
                        chi2, p_value = stats.chisquare(current_aligned, reference_aligned)
                        shifts['categorical_shifts'][col] = {
                            'chi2_statistic': chi2,
                            'p_value': p_value,
                            'alert': p_value < 0.05
                        }
                        
                        if p_value < 0.05:
                            shifts['alerts'].append(f"Categorical distribution shift detected in {col}")
                    except:
                        shifts['categorical_shifts'][col] = {'error': 'Could not perform test'}
        
        return shifts
    
    def validate_feature_quality(self, X: pd.DataFrame) -> Dict[str, Any]:
        """Perform comprehensive feature quality validation."""
        quality_issues = {
            'high_missing_features': [],
            'zero_variance_features': [],
            'high_cardinality_features': [],
            'potential_data_leakage': [],
            'correlation_issues': []
        }
        
        # Check for high missing value percentage
        missing_pct = (X.isnull().sum() / len(X)) * 100
        quality_issues['high_missing_features'] = missing_pct[missing_pct > 50].index.tolist()
        
        # Check for zero variance features
        numeric_cols = X.select_dtypes(include=[np.number]).columns
        for col in numeric_cols:
            if X[col].var() == 0:
                quality_issues['zero_variance_features'].append(col)
        
        # Check for high cardinality categorical features
        categorical_cols = X.select_dtypes(include=['object', 'category']).columns
        for col in categorical_cols:
            if X[col].nunique() > 100:  # Arbitrary threshold
                quality_issues['high_cardinality_features'].append(col)
        
        # Check for highly correlated features
        if len(numeric_cols) > 1:
            corr_matrix = X[numeric_cols].corr().abs()
            high_corr_pairs = []
            for i in range(len(corr_matrix.columns)):
                for j in range(i+1, len(corr_matrix.columns)):
                    if corr_matrix.iloc[i, j] > 0.95:
                        high_corr_pairs.append((corr_matrix.columns[i], corr_matrix.columns[j]))
            quality_issues['correlation_issues'] = high_corr_pairs
        
        return quality_issues

# Apply feature engineering pipeline to our data
logger.info("Applying feature engineering pipeline...")

# Split data before feature engineering to prevent data leakage
X = df.drop(['customer_id', 'churn'], axis=1)
y = df['churn']

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=RANDOM_STATE, stratify=y
)

# Fit and transform the training data
X_train_transformed = feature_pipeline.fit_transform(X_train, y_train)
X_test_transformed = feature_pipeline.transform(X_test)

# Convert to DataFrame for easier analysis
feature_names = feature_pipeline.named_steps['feature_selection'].get_feature_names_out()
X_train_df = pd.DataFrame(X_train_transformed, columns=feature_names, index=X_train.index)
X_test_df = pd.DataFrame(X_test_transformed, columns=feature_names, index=X_test.index)

print(f"Original features: {X_train.shape[1]}")
print(f"Engineered features: {X_train_df.shape[1]}")
print(f"Training set shape: {X_train_df.shape}")
print(f"Test set shape: {X_test_df.shape}")

# Validate transformed features
validator = FeatureValidator()

# Calculate statistics for training data (reference)
train_stats = validator.calculate_feature_statistics(X_train_df)
print("\nTraining Data Statistics:")
print(f"Missing values: {sum(train_stats['missing_values'].values())}")
print(f"Numeric features: {len(train_stats['numeric_stats'])}")
print(f"Categorical features: {len(train_stats['categorical_stats'])}")

# Calculate statistics for test data
test_stats = validator.calculate_feature_statistics(X_test_df)

# Check for distribution shifts between train and test
shifts = validator.detect_distribution_shift(test_stats, train_stats, threshold=0.1)
print(f"\nDistribution Shift Analysis:")
print(f"Numeric shifts detected: {len([k for k, v in shifts['numeric_shifts'].items() if v['alert']])}")
print(f"Categorical shifts detected: {len([k for k, v in shifts['categorical_shifts'].items() if v['alert']])}")

if shifts['alerts']:
    print("Alerts:")
    for alert in shifts['alerts']:
        print(f"  - {alert}")

# Validate feature quality
quality_issues = validator.validate_feature_quality(X_train_df)
print(f"\nFeature Quality Issues:")
for issue_type, issues in quality_issues.items():
    if issues:
        print(f"  {issue_type}: {issues}")

## 6. Pipeline Serialization and Deployment

Save the feature engineering pipeline and create deployment-ready artifacts.

In [None]:
class FeaturePipelineManager:
    """
    Manager class for feature pipeline serialization, versioning, and deployment.
    Handles saving/loading pipelines and maintaining metadata.
    Pipelines json and joblib will be in feature_pipelines/pipeline_name/v1..
    """

    def __init__(self, pipeline_dir: str = "feature_pipelines"):
        self.pipeline_dir = Path(pipeline_dir)
        self.pipeline_dir.mkdir(exist_ok=True)

    def save_pipeline(self, pipeline, pipeline_name: str, version: str = "v1",
                      metadata: Optional[Dict] = None):
        """Save feature pipeline with versioning and metadata."""
    
        # Create version dir
        version_dir = self.pipeline_dir / pipeline_name / version
        version_dir.mkdir(parents=True, exist_ok=True)

        pipeline_path = version_dir / "pipeline.joblib"
        joblib.dump(pipeline, pipeline_path)

        metadata = metadata or {}
        metadata.update({
            'pipeline_name': pipeline_name,
            'version': version,
            'created_at': datetime.now().isoformat(),
            'pipeline_type': str(type(pipeline)),
            'sklearn_version': joblib.__version__
        })

        metadata_path = version_dir / "metadata.json"
        with open(metadata_path, 'w') as f:
            json.dump(metadata, f, indent=2, default=str)
        
        logger.info(f"Pipeline saved: {pipeline_path}")
        logger.info(f"Metadata saved: {metadata_path}")
        
        return pipeline_path, metadata_path
    
    def load_pipeline(self, pipeline_name: str, version: str = "latest"):
        """Load feature pipeline with version control."""
        
        pipeline_base = self.pipeline_dir / pipeline_name
        
        if version == "latest":
            # Find the latest version
            version_dirs = [d for d in pipeline_base.iterdir() if d.is_dir()]
            if not version_dirs:
                raise FileNotFoundError(f"No versions found for pipeline {pipeline_name}")
            
            # Sort versions (assuming semantic versioning)
            version_dirs.sort(key=lambda x: x.name)
            version_dir = version_dirs[-1]
        else:
            version_dir = pipeline_base / version
            
        if not version_dir.exists():
            raise FileNotFoundError(f"Version {version} not found for pipeline {pipeline_name}")
        
        # Load pipeline
        pipeline_path = version_dir / "pipeline.joblib"
        pipeline = joblib.load(pipeline_path)

        # Load metadata
        metadata_path = version_dir / "metadata.json"
        with open(metadata_path, 'r') as f:
            metadata = json.load(f)
        
        logger.info(f"Pipeline loaded: {pipeline_path}")
        return pipeline, metadata
        
    def list_pipelines(self):
        """List all available pipelines and versions."""
        pipelines = {}
        
        for pipeline_dir in self.pipeline_dir.iterdir():
            if pipeline_dir.is_dir():
                versions = []
                for version_dir in pipeline_dir.iterdir():
                    if version_dir.is_dir() and (version_dir / "pipeline.joblib").exists():
                        metadata_path = version_dir / "metadata.json"
                        if metadata_path.exists():
                            with open(metadata_path, 'r') as f:
                                metadata = json.load(f)
                            versions.append({
                                'version': version_dir.name,
                                'created_at': metadata.get('created_at'),
                                'metadata': metadata
                            })
                
                if versions:
                    pipelines[pipeline_dir.name] = sorted(versions, key=lambda x: x['created_at'])
        
        return pipelines

In [None]:
def create_deployment_artifacts():
    """Create all necessary artifacts for deployment."""
    
    # Initialize pipeline manager
    pipeline_manager = FeaturePipelineManager()
    
    # Pipeline metadata
    pipeline_metadata = {
        'description': 'Customer churn prediction feature engineering pipeline',
        'features': {
            'input_features': list(X_train.columns),
            'output_features': list(X_train_df.columns),
            'feature_count': X_train_df.shape[1]
        },
        'preprocessing_steps': [
            'Date feature extraction',
            'Business feature creation',
            'Missing value imputation',
            'Outlier treatment',
            'Scaling and encoding',
            'Feature selection'
        ],
        'validation_results': {
            'train_shape': X_train_df.shape,
            'test_shape': X_test_df.shape,
            'quality_issues': quality_issues
        }
    }
    
    # Save pipeline using pipeline manager
    pipeline_path, metadata_path = pipeline_manager.save_pipeline(
        pipeline=feature_pipeline,
        pipeline_name="customer_churn_features",
        version="v1.0",
        metadata=pipeline_metadata
    )
    
    # Create feature schema for API validation using X_train
    feature_schema = {
        'input_schema': {
            col: {
                'type': str(X_train[col].dtype),
                'nullable': X_train[col].isnull().any(),
                'min_value': X_train[col].min() if pd.api.types.is_numeric_dtype(X_train[col]) else None,
                'max_value': X_train[col].max() if pd.api.types.is_numeric_dtype(X_train[col]) else None,
                'categories': X_train[col].unique().tolist() if X_train[col].dtype == 'object' else None
            }
            for col in X_train.columns
        },
        'output_schema': {
            col: {
                'type': str(X_train_df[col].dtype),
                'nullable': False
            }
            for col in X_train_df.columns
        }
    }
    
    # Save feature schema
    schema_path = pipeline_path.parent / "feature_schema.json"
    with open(schema_path, 'w') as f:
        json.dump(feature_schema, f, indent=2, default=str)
    
    # Create sample prediction function
    prediction_code = '''
def predict_with_features(raw_data, pipeline_path):
    """
    Production prediction function with feature engineering.
    
    Args:
        raw_data (dict or pd.DataFrame): Raw input data
        pipeline_path (str): Path to saved feature pipeline
        
    Returns:
        dict: Predictions with engineered features
    """
    import joblib
    import pandas as pd
    
    # Load pipeline
    feature_pipeline = joblib.load(pipeline_path)
    
    # Convert to DataFrame if needed
    if isinstance(raw_data, dict):
        df = pd.DataFrame([raw_data])
    else:
        df = raw_data.copy()
    
    # Apply feature engineering
    features = feature_pipeline.transform(df)
    
    # Here you would load your trained model and make predictions
    # predictions = model.predict(features)
    
    return {
        'features': features.tolist(),
        'feature_names': feature_pipeline.named_steps['feature_selection'].get_feature_names_out().tolist()
    }

# Example usage:
# result = predict_with_features(
#     raw_data={'age': 35, 'income': 50000, ...},
#     pipeline_path='feature_pipelines/customer_churn_features/v1.0/pipeline.joblib'
# )
'''
    
    # Save prediction function
    function_path = pipeline_path.parent / "prediction_function.py"
    with open(function_path, 'w') as f:
        f.write(prediction_code)
    
    return {
        'pipeline_path': pipeline_path,
        'metadata_path': metadata_path,
        'schema_path': schema_path,
        'function_path': function_path
    }

# Create deployment artifacts
deployment_artifacts = create_deployment_artifacts()

print("Deployment Artifacts Created:")
for artifact_type, path in deployment_artifacts.items():
    print(f"  {artifact_type}: {path}")

# List all available pipelines
pipeline_manager = FeaturePipelineManager()
available_pipelines = pipeline_manager.list_pipelines()

print("\nAvailable Pipelines:")
for pipeline_name, versions in available_pipelines.items():
    print(f"  {pipeline_name}:")
    for version_info in versions:
        print(f"    - {version_info['version']} (created: {version_info['created_at']})")

## 7. Monitoring and Logging

Implement comprehensive monitoring for production feature pipelines.

In [None]:
class FeatureMonitor:
    """
    Production monitoring class for feature engineering pipelines created in this notebook.
    Tracks feature quality, performance, and data drift over time.
    """
    
    # Set up the directory monitoring_logs
    def __init__(self, log_dir: str = "monitoring_logs"):
        self.log_dir = Path(log_dir)
        self.log_dir.mkdir(exist_ok=True)
        
        # Setup monitoring logger
        self.monitor_logger = logging.getLogger('feature_monitor')
        handler = logging.FileHandler(self.log_dir / 'feature_monitoring.log')
        handler.setFormatter(logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        ))
        self.monitor_logger.addHandler(handler)
        self.monitor_logger.setLevel(logging.INFO)
    
    # Record metrics each time the pipeline runs
    def log_pipeline_execution(self, pipeline_name: str, input_shape: Tuple, 
                              output_shape: Tuple, execution_time: float, 
                              quality_metrics: Dict):
        """Log pipeline execution metrics."""
        
        log_entry = {
            'timestamp': datetime.now().isoformat(),
            'pipeline_name': pipeline_name,
            'input_shape': input_shape,
            'output_shape': output_shape,
            'execution_time_seconds': execution_time,
            'quality_metrics': quality_metrics
        }
        
        # Add the log entry created to the logger
        self.monitor_logger.info(f"Pipeline execution: {json.dumps(log_entry)}")
        
        # Save detailed metrics
        metrics_file = self.log_dir / f"metrics_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        with open(metrics_file, 'w') as f:
            json.dump(log_entry, f, indent=2)
        
        return log_entry
    
    def monitor_data_drift(self, current_data: pd.DataFrame, 
                          reference_data: pd.DataFrame, 
                          threshold: float = 0.05) -> Dict:
        """
        Monitor for data drift using statistical tests.
        Threshold or p value is 5%
        """
        
        drift_results = {
            'timestamp': datetime.now().isoformat(),
            'drift_detected': False,
            'drifted_features': [],
            'drift_scores': {}
        }
        
        numeric_cols = current_data.select_dtypes(include=[np.number]).columns
        
        for col in numeric_cols:
            if col in reference_data.columns:
                # Kolmogorov-Smirnov test for distribution comparison
                try:
                    ks_statistic, p_value = stats.ks_2samp(
                        reference_data[col].dropna(), 
                        current_data[col].dropna()
                    )
                    
                    drift_results['drift_scores'][col] = {
                        'ks_statistic': ks_statistic,
                        'p_value': p_value,
                        'drift_detected': p_value < threshold
                    }
                    
                    if p_value < threshold:
                        drift_results['drifted_features'].append(col)
                        drift_results['drift_detected'] = True
                        
                except Exception as e:
                    self.monitor_logger.error(f"Error testing drift for {col}: {str(e)}")
        
        # Log drift results
        if drift_results['drift_detected']:
            self.monitor_logger.warning(f"Data drift detected: {json.dumps(drift_results)}")
        else:
            self.monitor_logger.info("No significant data drift detected")
        
        return drift_results
    
    def generate_monitoring_report(self, days_back: int = 7) -> Dict:
        """Generate comprehensive monitoring report."""
        
        # Read recent log entries
        log_file = self.log_dir / 'feature_monitoring.log'
        
        report = {
            'report_period': f"Last {days_back} days",
            'generated_at': datetime.now().isoformat(),
            'pipeline_executions': 0,
            'average_execution_time': 0,
            'error_count': 0,
            'drift_alerts': 0
        }
        
        if log_file.exists():
            with open(log_file, 'r') as f:
                logs = f.readlines()
            
            # Parse recent logs
            cutoff_date = datetime.now() - timedelta(days=days_back)
            execution_times = []
            
            for log_line in logs:
                try:
                    if 'Pipeline execution:' in log_line:
                        # Extract timestamp and check if within period
                        timestamp_str = log_line.split(' - ')[0]
                        timestamp = datetime.fromisoformat(timestamp_str)
                        
                        if timestamp > cutoff_date:
                            report['pipeline_executions'] += 1
                            
                            # Extract execution time from JSON
                            json_part = log_line.split('Pipeline execution: ')[1]
                            execution_data = json.loads(json_part)
                            execution_times.append(execution_data.get('execution_time_seconds', 0))
                    
                    elif 'ERROR' in log_line and cutoff_date < datetime.fromisoformat(log_line.split(' - ')[0]):
                        report['error_count'] += 1
                    
                    elif 'Data drift detected' in log_line and cutoff_date < datetime.fromisoformat(log_line.split(' - ')[0]):
                        report['drift_alerts'] += 1
                        
                except Exception:
                    continue  # Skip malformed log lines
            
            if execution_times:
                report['average_execution_time'] = sum(execution_times) / len(execution_times)
        
        return report

def demonstrate_monitoring():
    """Demonstrate the monitoring system with our pipeline."""
    
    # Initialize monitor
    monitor = FeatureMonitor()
    
    # Simulate pipeline execution monitoring
    start_time = datetime.now()
    
    # Mock pipeline execution (in real scenario, this would be actual pipeline run)
    execution_time = 2.5  # seconds
    
    # Calculate quality metrics
    quality_metrics = {
        'missing_values_count': int(X_train_df.isnull().sum().sum()),
        'feature_count': X_train_df.shape[1],
        'outlier_count': 0,  # Would be calculated by outlier detection
        'data_quality_score': 0.95  # Overall quality score
    }
    
    # Log pipeline execution
    log_entry = monitor.log_pipeline_execution(
        pipeline_name="customer_churn_features_v1",
        input_shape=X_train.shape,
        output_shape=X_train_df.shape,
        execution_time=execution_time,
        quality_metrics=quality_metrics
    )
    
    print("Pipeline Execution Logged:")
    print(json.dumps(log_entry, indent=2))
    
    # Monitor for data drift between train and test sets
    drift_results = monitor.monitor_data_drift(X_test_df, X_train_df, threshold=0.05)
    
    print("\nDrift Monitoring Results:")
    print(f"Drift detected: {drift_results['drift_detected']}")
    print(f"Drifted features: {drift_results['drifted_features']}")
    
    if drift_results['drift_detected']:
        print("Drift scores for affected features:")
        for feature in drift_results['drifted_features']:
            scores = drift_results['drift_scores'][feature]
            print(f"  {feature}: KS statistic = {scores['ks_statistic']:.4f}, p-value = {scores['p_value']:.6f}")
    
    # Generate monitoring report
    report = monitor.generate_monitoring_report(days_back=1)
    
    print("\nMonitoring Report:")
    print(json.dumps(report, indent=2))
    
    return monitor, log_entry, drift_results, report

# Run monitoring demonstration
monitor, log_entry, drift_results, report = demonstrate_monitoring()

## 8. Model Training with Engineered Features

Demonstrate how the engineered features improve model performance.

In [None]:
def train_and_evaluate_models():
    """
    Train models with and without feature engineering to demonstrate improvement.
    """
    
    # Model with original features (minimal preprocessing)
    original_pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy='mean')),
        ('scaler', StandardScaler()),
        ('classifier', RandomForestClassifier(n_estimators=100, random_state=RANDOM_STATE))
    ])
    
    # Model with engineered features
    engineered_pipeline = Pipeline([
        ('classifier', RandomForestClassifier(n_estimators=100, random_state=RANDOM_STATE))
    ])
    
    # Prepare original features (numeric only for simplicity)
    X_train_original = X_train.select_dtypes(include=[np.number])
    X_test_original = X_test.select_dtypes(include=[np.number])
    
    # Train model with original features
    print("Training model with original features...")
    original_pipeline.fit(X_train_original, y_train)
    y_pred_original = original_pipeline.predict(X_test_original)
    
    # Train model with engineered features
    print("Training model with engineered features...")
    engineered_pipeline.fit(X_train_df, y_train)
    y_pred_engineered = engineered_pipeline.predict(X_test_df)
    
    # Compare performance
    from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
    
    def calculate_metrics(y_true, y_pred, y_prob=None):
        metrics = {
            'accuracy': accuracy_score(y_true, y_pred),
            'precision': precision_score(y_true, y_pred),
            'recall': recall_score(y_true, y_pred),
            'f1_score': f1_score(y_true, y_pred)
        }
        if y_prob is not None:
            metrics['roc_auc'] = roc_auc_score(y_true, y_prob)
        return metrics
    
    # y_test is the true labels
    # y_pred is the models predicted labels, and y_prob is the predicted probability
    
    # Calculate probabilities for ROC AUC
    y_prob_original = original_pipeline.predict_proba(X_test_original)[:, 1]
    y_prob_engineered = engineered_pipeline.predict_proba(X_test_df)[:, 1]
    
    # Calculate metrics
    original_metrics = calculate_metrics(y_test, y_pred_original, y_prob_original)
    engineered_metrics = calculate_metrics(y_test, y_pred_engineered, y_prob_engineered)
    
    # Display results with border
    print("\n" + "="*60)
    print("MODEL PERFORMANCE COMPARISON")
    print("="*60)
    
    print(f"{'Metric':<15} {'Original':<12} {'Engineered':<12} {'Improvement':<12}")
    print("-" * 60)
    
    for metric in original_metrics:
        original_val = original_metrics[metric]
        engineered_val = engineered_metrics[metric]
        improvement = ((engineered_val - original_val) / original_val) * 100
        
        print(f"{metric:<15} {original_val:<12.4f} {engineered_val:<12.4f} {improvement:>+8.2f}%")
    
    # Feature importance analysis
    feature_importance = engineered_pipeline.named_steps['classifier'].feature_importances_
    feature_names = X_train_df.columns
    
    importance_df = pd.DataFrame({
        'feature': feature_names,
        'importance': feature_importance
    }).sort_values('importance', ascending=False)
    
    print(f"\nTop 10 Most Important Features:")
    print(importance_df.head(10).to_string(index=False))
    
    # Visualize feature importance
    plt.figure(figsize=(12, 8))
    top_features = importance_df.head(15)
    
    plt.barh(range(len(top_features)), top_features['importance'])
    plt.yticks(range(len(top_features)), top_features['feature'])
    plt.xlabel('Feature Importance')
    plt.title('Top 15 Feature Importances (Engineered Features)')
    plt.gca().invert_yaxis()
    plt.tight_layout()
    plt.show()
    
    return {
        'original_metrics': original_metrics,
        'engineered_metrics': engineered_metrics,
        'feature_importance': importance_df,
        'models': {
            'original': original_pipeline,
            'engineered': engineered_pipeline
        }
    }

# Train and evaluate models
results = train_and_evaluate_models()

## 9. Summary and Best Practices
PRODUCTION FEATURE ENGINEERING BEST PRACTICES DEMONSTRATED:

1. DATA VALIDATION
   ✓ Schema validation with expected data types
   ✓ Data quality checks and missing value analysis
   ✓ Outlier detection and treatment

2. FEATURE ENGINEERING PIPELINE
   ✓ Custom transformers for domain-specific features
   ✓ Proper handling of different data types (numeric, categorical, datetime)
   ✓ Feature scaling and normalization
   ✓ Feature selection for dimensionality reduction

3. PRODUCTION READINESS
   ✓ Pipeline serialization and versioning
   ✓ Comprehensive logging and monitoring
   ✓ Data drift detection
   ✓ Feature validation and quality checks

4. MLOPS INTEGRATION
   ✓ Reproducible pipeline with version control
   ✓ Monitoring and alerting capabilities
   ✓ Deployment artifacts and schemas
   ✓ Performance tracking and comparison

5. MODEL IMPROVEMENT
   ✓ Significant performance gains through feature engineering
   ✓ Feature importance analysis
   ✓ Business domain knowledge incorporation

KEY TAKEAWAYS:
- Feature engineering can significantly improve model performance
- Production pipelines need comprehensive validation and monitoring
- Proper versioning and serialization enable reliable deployments
- Domain knowledge is crucial for creating meaningful features
- Continuous monitoring helps detect data drift and quality issues

NEXT STEPS FOR PRODUCTION:
1. Integrate with your ML platform (MLflow, Kubeflow, etc.)
2. Set up automated retraining based on drift detection
3. Implement A/B testing for feature engineering changes
4. Add more sophisticated feature validation rules
5. Create automated feature engineering based on data profiling

In [None]:
# Final logging
logger.info("Feature engineering pipeline demonstration completed successfully")
logger.info(f"Final model performance improvement: {((results['engineered_metrics']['f1_score'] - results['original_metrics']['f1_score']) / results['original_metrics']['f1_score']) * 100:.2f}% F1-score improvement")

## Complete Sections:

1. **Setup and Imports** - All necessary libraries with proper configuration
2. **Data Loading and Validation** - Schema validation, data quality checks, and synthetic dataset creation
3. **Exploratory Data Analysis** - Comprehensive EDA with visualizations and statistical analysis
4. **Feature Engineering Pipeline** - Custom transformers, preprocessing pipelines, and feature creation
5. **Feature Validation and Quality Checks** - Distribution shift detection and quality monitoring
6. **Pipeline Serialization and Deployment** - Versioning, metadata management, and deployment artifacts
7. **Monitoring and Logging** - Production monitoring with drift detection and reporting
8. **Model Training and Evaluation** - Performance comparison showing feature engineering benefits
9. **Summary and Best Practices** - Key takeaways and production guidelines

## Key Features Demonstrated:

- **Custom Transformers**: DateFeatureExtractor, BusinessFeatureCreator, OutlierTreatment.
- **Production Pipeline**: Complete scikit-learn pipeline with proper preprocessing.
- **Validation Framework**: Comprehensive data validation and quality checks.
- **Deployment Ready**: Serialization, versioning, and schema management.
- **Monitoring System**: Data drift detection and performance tracking.
- **Performance Analysis**: Clear demonstration of feature engineering benefits.