In [8]:
"""
MLOps EDA Pipeline
==================
Exploratory Data Analysis for training data validation and quality checks.
Produces artifacts for downstream pipeline stages.
"""

# =============================================================================
# IMPORTS AND CONFIGURATION
# =============================================================================
import pandas as pd
import numpy as np
import os
import json
import warnings
import datetime
import logging
from pathlib import Path
from typing import Dict, List, Optional, Tuple

import matplotlib
matplotlib.use('Agg')  # Non-interactive backend to prevent memory leaks
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
try:
    import missingno as msno
    HAS_MISSINGNO = True
except ImportError:
    HAS_MISSINGNO = False

# Configure warnings and pandas display
warnings.filterwarnings('ignore')
pd.set_option('display.float_format', lambda x: "%.3f" % x)

# Plotting defaults
sns.set_style("whitegrid")
plt.rcParams.update({"figure.figsize": (10, 6), "figure.dpi": 120})

In [None]:

# =============================================================================
# LOGGING SETUP
# =============================================================================
def setup_logger(name: str = "eda_pipeline", level: int = logging.INFO) -> logging.Logger:
    """
    Configure logger for the EDA pipeline.
    
    Args:
        name: Logger name
        level: Logging level
        
    Returns:
        Configured logger instance
    """
    logger = logging.getLogger(name)
    logger.setLevel(level)
    
    # Console handler
    if not logger.handlers:
        handler = logging.StreamHandler()
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S'
        )
        handler.setFormatter(formatter)
        logger.addHandler(handler)
    
    return logger

logger = setup_logger()

# =============================================================================
# CONFIGURATION
# =============================================================================
class Config:
    """Configuration constants for the EDA pipeline."""
    PLOTS_DIR = Path("plots")
    DATA_PATH = Path("../data/raw/raw_data.csv")
    
    # Date range (can be parameterized)
    MAX_DATE = "2024-03-31"
    MIN_DATE = "2023-12-01"
    
    # EDA parameters
    MAX_HIST_COLS = 20  # Limit histograms to prevent memory issues
    MAX_CAT_TOP = 10     # Top categories to display
    
    @classmethod
    def setup_directories(cls):
        """Create necessary directories."""
        cls.PLOTS_DIR.mkdir(parents=True, exist_ok=True)
        logger.info(f"Created plots directory: {cls.PLOTS_DIR}")

In [10]:
# =============================================================================
# UTILITY FUNCTIONS
# =============================================================================
def to_json_safe(obj):
    """
    Convert numpy/pandas types to JSON-serializable Python types.
    
    Args:
        obj: Object to convert
        
    Returns:
        JSON-safe version of the object
    """
    if isinstance(obj, dict):
        return {str(k): to_json_safe(v) for k, v in obj.items()}
    elif isinstance(obj, (list, tuple)):
        return [to_json_safe(v) for v in obj]
    elif isinstance(obj, (np.integer, np.int64, np.int32)):
        return int(obj)
    elif isinstance(obj, (np.floating, np.float64, np.float32)):
        return float(obj)
    elif isinstance(obj, np.ndarray):
        return obj.tolist()
    elif isinstance(obj, (pd.Timestamp, datetime.date, datetime.datetime)):
        return obj.isoformat()
    elif isinstance(obj, np.generic):
        return obj.item()
    elif isinstance(obj, (int, float, str, bool, type(None))):
        return obj
    else:
        return str(obj)

def save_plot(fig: plt.Figure, filename: str) -> None:
    """
    Save matplotlib figure and properly close it to prevent memory leaks.
    
    Args:
        fig: Matplotlib figure object
        filename: Output filename
    """
    try:
        filepath = Config.PLOTS_DIR / filename
        fig.savefig(filepath, bbox_inches='tight', dpi=120)
        logger.debug(f"Saved plot: {filename}")
    except Exception as e:
        logger.error(f"Failed to save plot {filename}: {e}")
    finally:
        plt.close(fig)  # Critical for memory management

def describe_numeric_col(x: pd.Series) -> pd.Series:
    """
    Calculate descriptive statistics for a numeric column.
    
    Args:
        x: Pandas Series (numeric)
        
    Returns:
        Series with descriptive stats
    """
    return pd.Series({
        "Count": x.count(),
        "Missing": x.isnull().sum(),
        "Mean": x.mean(),
        "Std": x.std(),
        "Min": x.min(),
        "25%": x.quantile(0.25),
        "50%": x.quantile(0.50),
        "75%": x.quantile(0.75),
        "Max": x.max()
    })

def impute_missing_values(x: pd.Series, method: str = "mean") -> pd.Series:
    """
    Impute missing values in a column.
    
    Args:
        x: Pandas Series
        method: Imputation method ("mean", "median", "mode")
        
    Returns:
        Series with imputed values
    """
    if x.dtype in ['float64', 'int64']:
        if method == "mean":
            return x.fillna(x.mean())
        elif method == "median":
            return x.fillna(x.median())
    
    # For categorical/other types, use mode
    if not x.mode().empty:
        return x.fillna(x.mode()[0])
    return x


In [11]:
# =============================================================================
# DATA LOADING AND PREPROCESSING
# =============================================================================
def load_and_filter_data(
    data_path: Path,
    min_date: str,
    max_date: str
) -> Tuple[pd.DataFrame, Dict[str, str]]:
    """
    Load data and apply date filtering.
    
    Args:
        data_path: Path to the CSV file
        min_date: Minimum date string
        max_date: Maximum date string
        
    Returns:
        Filtered dataframe and date limits dictionary
    """
    logger.info(f"Loading training data from {data_path}")
    
    # Check if file exists
    if not data_path.exists():
        logger.warning(f"Data file not found at {data_path}. Attempting DVC pull...")
        os.system("dvc pull")
    
    data = pd.read_csv(data_path)
    logger.info(f"Loaded {len(data):,} rows, {len(data.columns)} columns")
    
    # Parse dates
    max_date_parsed = pd.to_datetime(max_date or datetime.datetime.now()).date()
    min_date_parsed = pd.to_datetime(min_date).date()
    
    # Apply date filter if date_part column exists
    if "date_part" in data.columns:
        data["date_part"] = pd.to_datetime(data["date_part"]).dt.date
        data = data[
            (data["date_part"] >= min_date_parsed) & 
            (data["date_part"] <= max_date_parsed)
        ].reset_index(drop=True)
        
        actual_min = data["date_part"].min()
        actual_max = data["date_part"].max()
        
        logger.info(f"Date filtered: {actual_min} to {actual_max}")
        logger.info(f"Total rows after filtering: {len(data):,}")
        
        date_limits = {"min_date": str(actual_min), "max_date": str(actual_max)}
    else:
        logger.warning("No 'date_part' column found; skipping date filtering")
        date_limits = {"min_date": min_date, "max_date": max_date}
    
    return data, date_limits

In [12]:
# =============================================================================
# MLOPS EDA VISUALS: ONLY THE ESSENTIALS
# =============================================================================

def plot_missingness(data: pd.DataFrame) -> None:
    """
    Visualize missing data patterns.
    """
    logger.info("Generating missingness visualizations")
    try:
        fig = msno.matrix(data, figsize=(12, 6)).get_figure()
        fig.suptitle('Missingness Matrix')
        save_plot(fig, "missingness_matrix.png")
    except Exception as e:
        logger.warning("missingno failed, using bar fallback: %s", e)
        missing = data.isnull().sum()
        missing = missing[missing > 0].sort_values(ascending=False)
        if not missing.empty:
            fig, ax = plt.subplots(figsize=(10, 6))
            missing.plot(kind='barh', ax=ax)
            ax.set_title('Missing Value Counts')
            ax.set_xlabel('Count')
            save_plot(fig, "missing_counts.png")
        else:
            logger.info("No missing values detected ✅")


def plot_correlation_heatmap(data: pd.DataFrame) -> None:
    """
    Correlation heatmap for key numeric columns (skip ID-like).
    """
    # Only use relevant (non-id) numeric columns
    numeric_cols = [c for c in data.select_dtypes(include=['int64', 'float64']).columns if 'id' not in c.lower()]
    if len(numeric_cols) < 2:
        logger.info("Not enough numeric columns for correlation heatmap")
        return
    try:
        corr = data[numeric_cols].corr()
        fig, ax = plt.subplots(figsize=(min(14, 1+len(numeric_cols)), min(12, 1+len(numeric_cols))))
        sns.heatmap(corr, annot=True, fmt='.2f', cmap='coolwarm', center=0, ax=ax)
        ax.set_title('Correlation Heatmap')
        save_plot(fig, "correlation_heatmap.png")
    except Exception as e:
        logger.error("Failed to create correlation heatmap: %s", e)


def plot_target_distribution(data: pd.DataFrame, target_col: str) -> None:
    """
    Plot for the key target/label column (for classification/regression). 
    """
    if target_col not in data.columns:
        logger.warning(f"{target_col} not in columns, skipping target plot.")
        return
    try:
        fig, ax = plt.subplots(figsize=(8, 5))
        if data[target_col].dtype in ['object', 'category', 'bool']:
            data[target_col].value_counts(dropna=False).plot(kind='bar', ax=ax)
            ax.set_title(f"Class Distribution: {target_col}")
        else:
            data[target_col].hist(bins=25, ax=ax)
            ax.set_title(f"Distribution: {target_col}")
        save_plot(fig, f"target_{target_col}_dist.png")
    except Exception as e:
        logger.error(f"Could not plot target {target_col}: {e}")


def plot_key_feature_distributions(data: pd.DataFrame, key_features: list) -> None:
    """
    Plot distribution for selected, relevant features only (not all columns).
    """
    for col in key_features:
        if col in data.columns:
            try:
                fig, ax = plt.subplots(figsize=(8, 5))
                if data[col].dtype in ['object', 'category', 'bool']:
                    data[col].value_counts(dropna=False).head(10).plot(kind='barh', ax=ax)
                    ax.set_title(f'Top Categories in {col}')
                else:
                    data[col].hist(bins=25, ax=ax)
                    ax.set_title(f'Distribution of {col}')
                save_plot(fig, f"{col}_dist.png")
            except Exception as e:
                logger.error(f"Could not plot feature {col}: {e}")


def plot_time_series_row_count(data: pd.DataFrame) -> None:
    """
    Row count over time, using a date column for drift checks.
    """
    if 'date_part' in data.columns:
        try:
            fig, ax = plt.subplots(figsize=(12, 6))
            data.groupby('date_part').size().plot(ax=ax, marker='o')
            ax.set_title('Rows Over Time')
            ax.set_ylabel('Row Count')
            ax.set_xlabel('Date')
            save_plot(fig, "time_series_row_count.png")
        except Exception as e:
            logger.error("Could not plot time series row count: %s", e)
            
# =====================================================================
# Replace full-featured plotting blocks in your pipeline with ONLY these
# =====================================================================
def run_eda_pipeline():
    logger.info("="*70)
    logger.info("Starting Minimal EDA Pipeline For MLOps")
    logger.info("="*70)

    Config.setup_directories()
    data, date_limits = load_and_filter_data(Config.DATA_PATH, Config.MIN_DATE, Config.MAX_DATE)

    logger.info(f"Data shape: {data.shape}")
    logger.info(f"Columns: {list(data.columns)}")
    missing = analyze_missing_values(data)

    # === ESSENTIAL VISUALS ONLY ===
    plot_missingness(data)
    plot_correlation_heatmap(data)
    plot_target_distribution(data, target_col='lead_indicator')  # or your actual target
    # Pass the most important features for your use case here:
    plot_key_feature_distributions(data, key_features=['customer_group', 'time_spent', 'n_visits', 'purchases'])
    plot_time_series_row_count(data)

    logger.info("="*70)
    logger.info("Minimal EDA Pipeline Complete ✅")
    logger.info(f"Plots saved to: {Config.PLOTS_DIR}")
    logger.info("="*70)
    

def analyze_missing_values(data: pd.DataFrame) -> pd.Series:
    """
    Analyze and report missing values.
    
    Args:
        data: Input dataframe
        
    Returns:
        Series of columns with missing value counts
    """
    missing = data.isnull().sum()
    missing = missing[missing > 0].sort_values(ascending=False)
    
    if not missing.empty:
        logger.info(f"Found {len(missing)} columns with missing values")
        for col, count in missing.head(10).items():
            pct = 100 * count / len(data)
            logger.info(f"  {col}: {count:,} ({pct:.2f}%)")
    else:
        logger.info("No missing values detected ✅")
    
    return missing




In [13]:
# =====================================================================
# Replace full-featured plotting blocks in your pipeline with ONLY these
# =====================================================================
def run_eda_pipeline():
    logger.info("="*70)
    logger.info("Starting Minimal EDA Pipeline For MLOps")
    logger.info("="*70)

    Config.setup_directories()
    data, date_limits = load_and_filter_data(Config.DATA_PATH, Config.MIN_DATE, Config.MAX_DATE)

    logger.info(f"Data shape: {data.shape}")
    logger.info(f"Columns: {list(data.columns)}")
    missing = analyze_missing_values(data)

    # === ESSENTIAL VISUALS ONLY ===
    plot_missingness(data)
    plot_correlation_heatmap(data)
    plot_target_distribution(data, target_col='lead_indicator')  # or your actual target
    # Pass the most important features for your use case here:
    plot_key_feature_distributions(data, key_features=['customer_group', 'time_spent', 'n_visits', 'purchases'])
    plot_time_series_row_count(data)

    logger.info("="*70)
    logger.info("Minimal EDA Pipeline Complete ✅")
    logger.info(f"Plots saved to: {Config.PLOTS_DIR}")
    logger.info("="*70)
    

In [14]:
# =============================================================================
# EXECUTION
# =============================================================================
if __name__ == "__main__":
    run_eda_pipeline()


2025-12-09 21:33:28 - eda_pipeline - INFO - Starting Minimal EDA Pipeline For MLOps
2025-12-09 21:33:28 - eda_pipeline - INFO - Created plots directory: notebooks/plots
2025-12-09 21:33:28 - eda_pipeline - INFO - Loading training data from ../data/raw/raw_data.csv
2025-12-09 21:33:28 - eda_pipeline - INFO - Loaded 12,345 rows, 19 columns
2025-12-09 21:33:28 - eda_pipeline - INFO - Date filtered: 2024-01-01 to 2024-01-31
2025-12-09 21:33:28 - eda_pipeline - INFO - Total rows after filtering: 12,345
2025-12-09 21:33:28 - eda_pipeline - INFO - Data shape: (12345, 19)
2025-12-09 21:33:28 - eda_pipeline - INFO - Columns: ['lead_id', 'lead_indicator', 'date_part', 'is_active', 'marketing_consent', 'first_booking', 'existing_customer', 'last_seen', 'source', 'domain', 'country', 'visited_learn_more_before_booking', 'visited_faq', 'purchases', 'time_spent', 'customer_group', 'onboarding', 'customer_code', 'n_visits']
2025-12-09 21:33:28 - eda_pipeline - INFO - Found 2 columns with missing valu