# Credit Risk Preprocessing and Visualization (Notebook)

This notebook consolidates the full preprocessing pipeline and visualization utilities for the UCI Credit Card default dataset.
It optionally supports MPI (via mpi4py) if launched under an MPI environment for distributed preprocessing.

- Input CSV: `archive/UCI_Credit_Card.csv`
- Outputs: cleaned data, engineered features, train/test splits (CSV and Parquet) under `processed_data/`
- Visualizations saved under `processed_data/visualizations/` or `visualizations/` as configured

In [26]:
# Optional: install extra packages if missing (uncomment as needed)
# %pip install pyarrow fastparquet shap mpi4py
# %pip install seaborn scikit-learn joblib psutil

In [27]:
# Imports
import os, time, gc, logging, warnings, json, traceback
from pathlib import Path
from typing import Tuple, Dict, Union, Optional, List, Any

import numpy as np
import pandas as pd
import psutil
from multiprocessing import cpu_count
from functools import partial
from joblib import Parallel, delayed

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import RobustScaler, StandardScaler
from sklearn.impute import SimpleImputer

import matplotlib.pyplot as plt
import seaborn as sns

# Optional libraries
try:
    import shap
except Exception:
    shap = None

try:
    from mpi4py import MPI
    MPI_AVAILABLE = True
except Exception:
    MPI_AVAILABLE = False

warnings.filterwarnings('ignore')
sns.set_style('whitegrid')
plt.rcParams['figure.figsize'] = (12, 8)
plt.rcParams['font.size'] = 12

# Logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger('NotebookPipeline')

In [28]:
# Configuration
from pathlib import Path
BASE_DIR = Path.cwd()
# If running from notebooks/ folder, use parent as base
if (BASE_DIR.name == 'notebooks') or not (BASE_DIR / 'archive').exists():
    if (BASE_DIR / '..' / 'archive').exists():
        BASE_DIR = BASE_DIR.parent

CONFIG_PATH = str(BASE_DIR / 'configs' / 'credit_preprocessing_config.json')
DEFAULT_INPUT = str(BASE_DIR / 'archive' / 'UCI_Credit_Card.csv')
DEFAULT_OUTPUT_DIR = str(BASE_DIR / 'processed_data')

def load_config(path: str = CONFIG_PATH) -> Dict[str, Any]:
    try:
        with open(path, 'r') as f:
            return json.load(f)
    except Exception as e:
        logger.warning(f'Could not read config at {path}: {e}. Using defaults.')
        return {
            'input_path': DEFAULT_INPUT,
            'output_dir': DEFAULT_OUTPUT_DIR,
            'preprocessing': {
                'test_size': 0.2,
                'random_state': 42,
                'n_jobs': -1,
                'memory_threshold': 0.8,
                'save_intermediates': True,
                'output_formats': ['csv','parquet'],
                'chunk_size': 5000
            }
        }

In [None]:

import matplotlib as mpl  # required by some plotting funcs

class CreditCardPreprocessor:
    def __init__(self, input_path: str, output_dir: str, test_size: float = 0.2, random_state: int = 42,
                 n_jobs: int = 1, memory_threshold: float = 0.7, save_intermediates: bool = False,
                 output_formats: List[str] = ['csv','parquet'], chunk_size: int = 5000):
        # Resolve paths relative to workspace root when running from notebooks/
        cwd = Path.cwd()
        root = cwd.parent if cwd.name == 'notebooks' else cwd
        ip = Path(input_path)
        if not ip.is_absolute():
            ip = (root / ip)
        self.input_path = str(ip)
        out = Path(output_dir)
        if not out.is_absolute():
            out = (root / out)
        self.output_dir = out
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self.test_size = test_size
        self.random_state = random_state
        total_memory_gb = psutil.virtual_memory().total / (1024**3)
        if total_memory_gb < 32 and n_jobs == -1:
            logger.warning(f'Limited memory ({total_memory_gb:.1f} GB). Setting n_jobs=1 for stability.')
            self.n_jobs = 1
        else:
            self.n_jobs = n_jobs if n_jobs > 0 else min(cpu_count(), 4)
        self.memory_threshold = memory_threshold
        self.save_intermediates = save_intermediates
        self.output_formats = output_formats
        self.chunk_size = chunk_size
        self.raw_data = None
        self.processed_data = None
        self.X_train = self.X_test = self.y_train = self.y_test = None
        self.demographic_cols = ['LIMIT_BAL','SEX','EDUCATION','MARRIAGE','AGE']
        self.payment_history_cols = ['PAY_0','PAY_2','PAY_3','PAY_4','PAY_5','PAY_6']
        self.bill_amount_cols = ['BILL_AMT1','BILL_AMT2','BILL_AMT3','BILL_AMT4','BILL_AMT5','BILL_AMT6']
        self.payment_amount_cols = ['PAY_AMT1','PAY_AMT2','PAY_AMT3','PAY_AMT4','PAY_AMT5','PAY_AMT6']
        self.target_col = 'default.payment.next.month'
        gc.enable(); gc.set_threshold(100,5,5)

    def _check_memory_usage(self) -> None:
        current = psutil.virtual_memory().percent / 100
        if current > self.memory_threshold:
            logger.warning(f'Memory usage ({current:.2f}) exceeds threshold ({self.memory_threshold:.2f})')
            gc.collect()

    def load_data(self) -> pd.DataFrame:
        logger.info(f'Loading data from {self.input_path}')
        dtypes = {'ID':'int32','LIMIT_BAL':'float32','SEX':'int8','EDUCATION':'int8','MARRIAGE':'int8','AGE':'int16',
                  'default.payment.next.month':'int8'}
        for col in self.payment_history_cols: dtypes[col] = 'int8'
        for col in self.bill_amount_cols + self.payment_amount_cols: dtypes[col] = 'float32'
        try:
            file_size_mb = os.path.getsize(self.input_path) / (1024*1024)
            if file_size_mb > 50 and self.chunk_size > 0:
                chunks, total_rows = [], 0
                for i, chunk in enumerate(pd.read_csv(self.input_path, dtype=dtypes, chunksize=self.chunk_size)):
                    chunks.append(chunk); total_rows += len(chunk)
                    if i % 5 == 0: logger.info(f'Loaded chunk {i+1}, total rows: {total_rows}')
                    if i % 10 == 0: self._check_memory_usage()
                self.raw_data = pd.concat(chunks, ignore_index=True); del chunks; gc.collect()
            else:
                self.raw_data = pd.read_csv(self.input_path, dtype=dtypes)
            logger.info(f'Data loaded: {self.raw_data.shape}, mem ~ {self.raw_data.memory_usage().sum()/1024**2:.2f} MB')
            self.analyze_data()
            return self.raw_data
        except Exception as e:
            logger.error(f'Error loading data: {e}')
            raise

    def analyze_data(self) -> None:
        if self.raw_data is None: return
        mv = self.raw_data.isnull().sum()
        if mv.sum() > 0: logger.warning(f'Missing values detected:\n{mv[mv>0]}')
        if self.target_col in self.raw_data.columns:
            tgt = self.raw_data[self.target_col].value_counts(normalize=True) * 100
            for v,p in tgt.items(): logger.info(f'Class {v}: {p:.2f}%')
        self._check_data_quality()

    def _check_data_quality(self) -> None:
        rd = self.raw_data
        if rd is None: return
        if 'SEX' in rd.columns and (rd['SEX'].min() < 1 or rd['SEX'].max() > 2):
            logger.warning(f'Invalid SEX values: {rd["SEX"].unique()}')
        if 'EDUCATION' in rd.columns and (rd['EDUCATION'].min() < 1 or rd['EDUCATION'].max() > 6):
            logger.warning(f'Invalid EDUCATION values: {rd["EDUCATION"].unique()}')
        if 'MARRIAGE' in rd.columns and (rd['MARRIAGE'].min() < 0 or rd['MARRIAGE'].max() > 3):
            logger.warning(f'Invalid MARRIAGE values: {rd["MARRIAGE"].unique()}')
        for col in self.bill_amount_cols:
            if col in rd.columns and (rd[col] < 0).any():
                logger.warning(f'Found {(rd[col] < 0).sum()} negative values in {col}')

    def clean_data(self) -> pd.DataFrame:
        if self.raw_data is None: raise ValueError('Raw data not loaded')
        logger.info('Cleaning data...')
        df = self.raw_data.copy()
        if 'ID' in df.columns: df.drop('ID', axis=1, inplace=True)
        df = self._impute_missing_values(df)
        df = self._handle_outliers(df)
        df = self._standardize_categorical(df)
        if self.save_intermediates:
            p = self.output_dir / 'cleaned_data.csv'; df.to_csv(p, index=False)
        self.processed_data = df; return df

    def _impute_missing_values(self, df: pd.DataFrame) -> pd.DataFrame:
        self._check_memory_usage(); d = df.copy()
        mv = d.isnull().sum().sum()
        if mv == 0: return d
        num_cols = d.select_dtypes(include=['number']).columns.tolist()
        cat_cols = d.select_dtypes(exclude=['number']).columns.tolist()
        if self.target_col in num_cols: num_cols.remove(self.target_col)
        if self.target_col in cat_cols: cat_cols.remove(self.target_col)
        if num_cols:
            d[num_cols] = SimpleImputer(strategy='median').fit_transform(d[num_cols])
        if cat_cols:
            d[cat_cols] = SimpleImputer(strategy='most_frequent').fit_transform(d[cat_cols])
        return d

    def _handle_outliers(self, df: pd.DataFrame) -> pd.DataFrame:
        self._check_memory_usage(); d = df.copy()
        num_cols = d.select_dtypes(include=['number']).columns.tolist()
        if self.target_col in num_cols: num_cols.remove(self.target_col)
        for col in ['SEX','EDUCATION','MARRIAGE']:
            if col in num_cols: num_cols.remove(col)
        def cap(col):
            Q1, Q3 = d[col].quantile(0.25), d[col].quantile(0.75)
            IQR = Q3 - Q1
            lb, ub = Q1 - 1.5*IQR, Q3 + 1.5*IQR
            return d[col].clip(lower=lb, upper=ub)
        if self.n_jobs > 1:
            capped = Parallel(n_jobs=self.n_jobs)(delayed(cap)(c) for c in num_cols)
            for c, s in zip(num_cols, capped): d[c] = s
        else:
            for c in num_cols: d[c] = cap(c)
        return d

    def _standardize_categorical(self, df: pd.DataFrame) -> pd.DataFrame:
        d = df.copy()
        if 'EDUCATION' in d.columns:
            d.loc[d['EDUCATION'] >= 5, 'EDUCATION'] = 4
        if 'MARRIAGE' in d.columns:
            d.loc[d['MARRIAGE'] == 0, 'MARRIAGE'] = 3
        for col in self.payment_history_cols:
            if col in d.columns:
                d.loc[d[col] < -2, col] = -2
                d.loc[d[col] > 9, col] = 9
        return d

    def engineer_features(self) -> pd.DataFrame:
        if self.processed_data is None:
            if self.raw_data is not None:
                self.clean_data()
            else:
                raise ValueError('Raw data not loaded')
        df = self.processed_data.copy()
        df = self._create_utilization_features(df)
        df = self._create_payment_features(df)
        df = self._create_demographic_features(df)
        df = self._create_financial_stress_features(df)
        df = self._create_trend_features(df)
        if self.save_intermediates:
            (self.output_dir / 'engineered_features.csv').parent.mkdir(parents=True, exist_ok=True)
            df.to_csv(self.output_dir / 'engineered_features.csv', index=False)
        self.processed_data = df; return df

    def _create_utilization_features(self, df: pd.DataFrame) -> pd.DataFrame:
        if not all(c in df.columns for c in self.bill_amount_cols + ['LIMIT_BAL']):
            return df
        util_cols = []
        for i, bill_col in enumerate(self.bill_amount_cols, start=1):
            col = f'UTIL_RATIO_{i}'
            util_cols.append(col)
            df.loc[:, col] = df[bill_col] / df['LIMIT_BAL'].replace(0, np.nan).fillna(1)
            df.loc[:, f'UTIL_RATIO_CAPPED_{i}'] = df[col].clip(0,1)
        df.loc[:, 'UTIL_RATIO_AVG'] = df[util_cols].mean(axis=1)
        df.loc[:, 'UTIL_RATIO_MAX'] = df[util_cols].max(axis=1)
        df.loc[:, 'UTIL_RATIO_MIN'] = df[util_cols].min(axis=1)
        df.loc[:, 'UTIL_RATIO_STD'] = df[util_cols].std(axis=1)
        recent = util_cols[:3]
        if len(recent) == 3:
            df.loc[:, 'UTIL_RATIO_RECENT_AVG'] = df[recent].mean(axis=1)
        for i in range(len(util_cols)-1):
            df.loc[:, f'UTIL_CHANGE_{i+1}_{i+2}'] = df[util_cols[i]] - df[util_cols[i+1]]
        df.loc[:, 'HIGH_UTIL_COUNT'] = (df[util_cols] > 0.8).sum(axis=1)
        df.loc[:, 'ZERO_UTIL_COUNT'] = (df[util_cols] == 0).sum(axis=1)
        return df

    def _create_payment_features(self, df: pd.DataFrame) -> pd.DataFrame:
        if not all(c in df.columns for c in self.payment_history_cols):
            return df
        amount_cols_exist = all(c in df.columns for c in self.payment_amount_cols + self.bill_amount_cols)
        df['DELAY_COUNT'] = (df[self.payment_history_cols] > 0).sum(axis=1)
        df['MAX_DELAY'] = df[self.payment_history_cols].max(axis=1)
        delay_mask = df[self.payment_history_cols] > 0
        df['AVG_DELAY'] = df[self.payment_history_cols].where(delay_mask).mean(axis=1).fillna(0)
        df['RECENT_DELAY_COUNT'] = (df[['PAY_0','PAY_2','PAY_3']] > 0).sum(axis=1)
        df['PAYMENT_CONSISTENCY'] = df[self.payment_history_cols].var(axis=1)
        df['PAYMENT_TREND'] = df['PAY_6'] - df['PAY_0']
        if amount_cols_exist:
            ratio_cols = []
            for i, (pcol, bcol) in enumerate(zip(self.payment_amount_cols, self.bill_amount_cols), start=1):
                r = f'PAYMENT_RATIO_{i}'
                df[r] = (df[pcol] / df[bcol].replace(0, np.nan)).fillna(0)
                df[f'PAYMENT_RATIO_CAPPED_{i}'] = df[r].clip(0,1)
                ratio_cols.append(r)
            df['PAYMENT_RATIO_AVG'] = df[ratio_cols].mean(axis=1)
            df['PAYMENT_RATIO_MIN'] = df[ratio_cols].min(axis=1)
            df['FULL_PAYMENT_COUNT'] = (df[ratio_cols] >= 0.99).sum(axis=1)
            df['MIN_PAYMENT_COUNT'] = ((df[ratio_cols] >= 0.05) & (df[ratio_cols] < 0.1)).sum(axis=1)
        return df

    def _create_demographic_features(self, df: pd.DataFrame) -> pd.DataFrame:
        if not all(c in df.columns for c in ['AGE','EDUCATION','MARRIAGE']): return df
        df['AGE_GROUP'] = pd.cut(df['AGE'], bins=[0,25,35,45,55,65,100], labels=['<25','25-35','36-45','46-55','56-65','65+'])
        edu_risk_map = {1:'Low',2:'Low',3:'Medium',4:'High'}
        df['EDUCATION_RISK'] = df['EDUCATION'].map(edu_risk_map)
        marriage_stability_map = {1:'Stable',2:'Medium',3:'Variable'}
        df['MARRIAGE_STABILITY'] = df['MARRIAGE'].map(marriage_stability_map)
        df['YOUNG_UNMARRIED'] = ((df['AGE'] < 30) & (df['MARRIAGE'] == 2)).astype(int)
        df['AGE_RISK'] = pd.cut(df['AGE'], bins=[0,25,30,40,50,100], labels=[4,3,2,1,0]).astype(int)
        df['EDU_RISK'] = df['EDUCATION'].map({1:0,2:1,3:2,4:3})
        df['MARR_RISK'] = df['MARRIAGE'].map({1:0,2:1,3:2})
        df['DEMO_RISK_SCORE'] = df['AGE_RISK'] + df['EDU_RISK'] + df['MARR_RISK']
        return df

    def _create_financial_stress_features(self, df: pd.DataFrame) -> pd.DataFrame:
        if not all(c in df.columns for c in self.bill_amount_cols) or not all(c in df.columns for c in self.payment_amount_cols):
            return df
        df['AVG_PAYMENT'] = df[self.payment_amount_cols].mean(axis=1)
        mask = df['AVG_PAYMENT'] > 0
        df.loc[mask, 'CREDIT_TO_PAYMENT'] = df.loc[mask, 'LIMIT_BAL'] / df.loc[mask, 'AVG_PAYMENT']
        df['CREDIT_TO_PAYMENT'] = df['CREDIT_TO_PAYMENT'].fillna(df['CREDIT_TO_PAYMENT'].median())
        df['CREDIT_TO_PAYMENT'] = df['CREDIT_TO_PAYMENT'].clip(upper=df['CREDIT_TO_PAYMENT'].quantile(0.99))
        df['BILL_TREND'] = df['BILL_AMT1'] - df['BILL_AMT6']
        df['BILL_GROWTH_RATE'] = df['BILL_TREND'] / df[['BILL_AMT2','BILL_AMT3','BILL_AMT4','BILL_AMT5','BILL_AMT6']].mean(axis=1)
        df['BILL_GROWTH_RATE'].replace([np.inf,-np.inf], np.nan, inplace=True)
        df['BILL_GROWTH_RATE'].fillna(0, inplace=True)
        total_payments = df[self.payment_amount_cols].sum(axis=1)
        total_bills = df[self.bill_amount_cols].sum(axis=1)
        df['PAYMENT_TO_BILL_RATIO'] = total_payments / total_bills
        df['PAYMENT_TO_BILL_RATIO'].replace([np.inf,-np.inf], np.nan, inplace=True)
        df['PAYMENT_TO_BILL_RATIO'].fillna(0, inplace=True)
        stress_cols = []
        for i, (pcol, bcol) in enumerate(zip(self.payment_amount_cols, self.bill_amount_cols), start=1):
            col = f'PAYMENT_STRESS_{i}'
            df[col] = (df[pcol] < (df[bcol] * 0.05)) & (df[bcol] > 0)
            stress_cols.append(col)
        df['PAYMENT_STRESS_COUNT'] = df[stress_cols].sum(axis=1)
        df['DEBT_SPIRAL'] = ((df['BILL_TREND'] > 0) & (df['PAYMENT_TO_BILL_RATIO'] < 0.1) & (df['UTIL_RATIO_AVG'] > 0.8)).astype(int)
        df['UTIL_NORM'] = df['UTIL_RATIO_AVG'] / 1.0
        df['DELAY_NORM'] = df['DELAY_COUNT'] / 6.0
        df['STRESS_NORM'] = df['PAYMENT_STRESS_COUNT'] / 6.0
        df['FINANCIAL_STRESS_SCORE'] = ((df['UTIL_NORM']*0.4) + (df['DELAY_NORM']*0.4) + (df['STRESS_NORM']*0.2)) * 10
        return df

    def _create_trend_features(self, df: pd.DataFrame) -> pd.DataFrame:
        if not all(c in df.columns for c in self.bill_amount_cols) or not all(c in df.columns for c in self.payment_amount_cols):
            return df
        for i in range(1, len(self.bill_amount_cols)):
            df[f'BILL_MOMENTUM_{i}'] = df[f'BILL_AMT{i}'] - df[f'BILL_AMT{i+1}']
        for i in range(1, len(self.payment_amount_cols)):
            df[f'PAYMENT_MOMENTUM_{i}'] = df[f'PAY_AMT{i}'] - df[f'PAY_AMT{i+1}']
        bill_std = df[self.bill_amount_cols].std(axis=1); bill_mean = df[self.bill_amount_cols].mean(axis=1)
        df['BILL_VOLATILITY'] = (bill_std / bill_mean).replace([np.inf,-np.inf], np.nan).fillna(0)
        pay_std = df[self.payment_amount_cols].std(axis=1); pay_mean = df[self.payment_amount_cols].mean(axis=1)
        df['PAYMENT_VOLATILITY'] = (pay_std / pay_mean).replace([np.inf,-np.inf], np.nan).fillna(0)
        df['BILL_MA_3M'] = df[['BILL_AMT1','BILL_AMT2','BILL_AMT3']].mean(axis=1)
        df['BILL_MA_6M'] = df[self.bill_amount_cols].mean(axis=1)
        df['BILL_SEASONAL'] = df['BILL_AMT1'] + df['BILL_AMT4'] - df['BILL_MA_6M']
        df['BILL_ACCEL'] = (df['BILL_AMT1'] - df['BILL_AMT2']) - (df['BILL_AMT2'] - df['BILL_AMT3'])
        df['PAYMENT_ACCEL'] = (df['PAY_AMT1'] - df['PAY_AMT2']) - (df['PAY_AMT2'] - df['PAY_AMT3'])
        return df

    def encode_categoricals(self, df: pd.DataFrame = None) -> pd.DataFrame:
        d = self.processed_data.copy() if df is None else df.copy()
        cats = d.select_dtypes(include=['object','category']).columns.tolist()
        if not cats: return d
        for col in cats:
            dum = pd.get_dummies(d[col], prefix=col, drop_first=True)
            d = pd.concat([d, dum], axis=1).drop(columns=[col])
        return d

    def scale_features(self, df: pd.DataFrame = None) -> Tuple[pd.DataFrame, Optional[RobustScaler]]:
        d = self.processed_data.copy() if df is None else df.copy()
        y = d[self.target_col].copy() if self.target_col in d.columns else None
        num_cols = d.select_dtypes(include=['number']).columns.tolist()
        if self.target_col in num_cols: num_cols.remove(self.target_col)
        if not num_cols: return d, None
        scaler = RobustScaler(); d[num_cols] = scaler.fit_transform(d[num_cols])
        if y is not None: d[self.target_col] = y
        return d, scaler

    def split_data(self, df: pd.DataFrame = None) -> Tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]:
        d = self.processed_data.copy() if df is None else df.copy()
        if self.target_col not in d.columns:
            raise ValueError(f"Target column '{self.target_col}' not found in data")
        X, y = d.drop(self.target_col, axis=1), d[self.target_col]
        Xtr, Xte, ytr, yte = train_test_split(X, y, test_size=self.test_size, random_state=self.random_state, stratify=y)
        self.X_train, self.X_test, self.y_train, self.y_test = Xtr, Xte, ytr, yte
        return Xtr, Xte, ytr, yte

    def save_full_processed(self) -> None:
        if self.processed_data is None:
            return
        # Always save final processed dataset
        df = self.processed_data
        csv_path = self.output_dir / 'processed_full.csv'
        df.to_csv(csv_path, index=False)
        # Save parquet when available
        if 'parquet' in [f.lower() for f in self.output_formats]:
            try:
                df.to_parquet(self.output_dir / 'processed_full.parquet', index=False)
            except Exception as e:
                logger.warning(f'Parquet save failed for processed_full ({e}).')

    def save_data(self) -> None:
        if self.X_train is None or self.X_test is None: return
        (self.output_dir / 'train').mkdir(exist_ok=True); (self.output_dir / 'test').mkdir(exist_ok=True)
        for fmt in self.output_formats:
            fmt = fmt.lower()
            if fmt == 'csv':
                pd.concat([self.X_train, self.y_train], axis=1).to_csv(self.output_dir / 'train' / 'train.csv', index=False)
                pd.concat([self.X_test, self.y_test], axis=1).to_csv(self.output_dir / 'test' / 'test.csv', index=False)
            elif fmt == 'parquet':
                try:
                    pd.concat([self.X_train, self.y_train], axis=1).to_parquet(self.output_dir / 'train' / 'train.parquet', index=False)
                    pd.concat([self.X_test, self.y_test], axis=1).to_parquet(self.output_dir / 'test' / 'test.parquet', index=False)
                except Exception as e:
                    logger.warning(f'Parquet save failed ({e}). Saving CSV instead.')
                    pd.concat([self.X_train, self.y_train], axis=1).to_csv(self.output_dir / 'train' / 'train.csv', index=False)
                    pd.concat([self.X_test, self.y_test], axis=1).to_csv(self.output_dir / 'test' / 'test.csv', index=False)

    def _log_feature_importance(self) -> None:
        if self.processed_data is None or self.target_col not in self.processed_data.columns: return
        corrs = self.processed_data.corr(numeric_only=True)[self.target_col].sort_values(ascending=False)
        logger.info('Top features by correlation with target:')
        count = 0
        for feature, corr in corrs.items():
            if feature == self.target_col: continue
            logger.info(f'  {feature}: {corr:.4f}')
            count += 1
            if count >= 10: break

    def process_pipeline(self) -> Tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]:
        logger.info('Starting preprocessing pipeline...')
        self.load_data()
        self.clean_data()
        self.engineer_features()
        self.processed_data = self.encode_categoricals()
        self.processed_data, _ = self.scale_features()
        # Ensure full processed dataset is persisted
        self.save_full_processed()
        Xtr, Xte, ytr, yte = self.split_data()
        self.save_data()
        self._log_feature_importance()
        return Xtr, Xte, ytr, yte

In [30]:
# Visualization utilities (from src/feature_visualization.py)
from sklearn.decomposition import PCA
from sklearn.feature_selection import mutual_info_classif

class FeatureVisualization:
    def __init__(self, data: pd.DataFrame, target_col: str = 'default.payment.next.month', output_dir: str = './visualizations', feature_groups: Optional[Dict[str, List[str]]] = None):
        self.data = data.copy()
        self.target_col = target_col
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
        nan_count = self.data.isna().sum().sum()
        if nan_count > 0:
            num_cols = self.data.select_dtypes(include=['number']).columns
            self.data[num_cols] = self.data[num_cols].fillna(self.data[num_cols].median())
            cat_cols = self.data.select_dtypes(exclude=['number']).columns
            for c in cat_cols:
                if self.data[c].isna().sum() > 0:
                    mode_val = self.data[c].mode()[0] if not self.data[c].mode().empty else 'unknown'
                    self.data[c] = self.data[c].fillna(mode_val)
        self.feature_groups = feature_groups if feature_groups else self._infer_feature_groups()
        self.X = self.data.drop(columns=[self.target_col])
        self.y = self.data[self.target_col] if self.target_col in self.data.columns else pd.Series(dtype='int64')

    def _infer_feature_groups(self) -> Dict[str, List[str]]:
        cols = [c for c in self.data.columns if c != self.target_col]
        groups = {
            'Demographics': [c for c in cols if any(x in c.upper() for x in ['AGE','SEX','EDUCATION','MARRIAGE','DEMO'])],
            'Utilization': [c for c in cols if any(x in c.upper() for x in ['UTIL','LIMIT'])],
            'Payment_History': [c for c in cols if 'PAY_' in c],
            'Payment_Amount': [c for c in cols if 'PAY_AMT' in c],
            'Bill_Amount': [c for c in cols if 'BILL_AMT' in c],
            'Payment_Behavior': [c for c in cols if any(x in c.upper() for x in ['DELAY','PAYMENT_RATIO','CONSISTENCY'])],
            'Financial_Stress': [c for c in cols if any(x in c.upper() for x in ['STRESS','DEBT','SPIRAL'])],
            'Trends': [c for c in cols if any(x in c.upper() for x in ['TREND','MOMENTUM','VOLATILITY','MA_','ACCEL'])],
        }
        categorized = sum(groups.values(), [])
        groups['Other'] = [c for c in cols if c not in categorized]
        return {k:v for k,v in groups.items() if v}

    def plot_target_distribution(self, save: bool = True):
        plt.figure(figsize=(10,6))
        d = self.data.dropna(subset=[self.target_col]) if self.target_col in self.data.columns else self.data
        ax = sns.countplot(x=self.target_col, data=d) if self.target_col in d.columns else None
        if ax is not None:
            total = len(d)
            for p in ax.patches:
                h = p.get_height(); ax.text(p.get_x()+p.get_width()/2., h*1.01, f'{100*h/total:.1f}%', ha='center')
        plt.title('Distribution of Default vs Non-Default'); plt.xlabel('Default (1) vs Non-Default (0)'); plt.ylabel('Count')
        if save: plt.tight_layout(); plt.savefig(self.output_dir / 'target_distribution.png', dpi=300); plt.close()

    def plot_correlation_matrix(self, top_n: int = 20, save: bool = True):
        correlations = self.data.corr(numeric_only=True)[self.target_col].sort_values(ascending=False) if self.target_col in self.data.columns else self.data.corr(numeric_only=True).iloc[:,0]
        top_features = correlations.drop(self.target_col).abs().nlargest(top_n).index.tolist() if self.target_col in correlations.index else correlations.abs().nlargest(top_n).index.tolist()
        selected = top_features + ([self.target_col] if self.target_col in self.data.columns else [])
        corr_matrix = self.data[selected].corr(numeric_only=True)
        plt.figure(figsize=(12,10)); mask = np.triu(np.ones_like(corr_matrix, dtype=bool)); cmap = sns.diverging_palette(230, 20, as_cmap=True)
        sns.heatmap(corr_matrix, mask=mask, cmap=cmap, vmin=-1, vmax=1, center=0, square=True, linewidths=.5, cbar_kws={'shrink': .5}, annot=True, fmt='.2f')
        plt.title(f'Correlation Matrix: Top {len(top_features)} Features')
        if save: plt.tight_layout(); plt.savefig(self.output_dir / 'correlation_matrix.png', dpi=300); plt.close()

    def plot_feature_importance(self, method: str = 'mutual_info', top_n: int = 20, save: bool = True) -> pd.Series:
        X = self.X.copy();
        num_cols = X.select_dtypes(include=['number']).columns
        X[num_cols] = X[num_cols].fillna(X[num_cols].median())
        cat_cols = X.select_dtypes(exclude=['number']).columns
        for c in cat_cols: X[c] = X[c].fillna(X[c].mode()[0] if not X[c].mode().empty else 'unknown')
        if method == 'mutual_info' and shap is not None:
            try:
                scores = mutual_info_classif(X.select_dtypes(include=['number']), self.y)
                s = pd.Series(scores, index=X.select_dtypes(include=['number']).columns)
            except Exception:
                s = self.data.corr(numeric_only=True)[self.target_col].drop(self.target_col).abs()
        else:
            s = self.data.corr(numeric_only=True)[self.target_col].drop(self.target_col).abs()
        s = s.sort_values(ascending=False).head(top_n)
        plt.figure(figsize=(12,8)); s.plot(kind='barh'); plt.title(f'Top {len(s)} Features ({method})'); plt.xlabel('Importance'); plt.ylabel('Feature'); plt.grid(axis='x')
        if save: plt.tight_layout(); plt.savefig(self.output_dir / f'feature_importance_{method}.png', dpi=300); plt.close()
        return s

    def generate_feature_report(self) -> None:
        self.plot_target_distribution()
        self.plot_correlation_matrix()
        self.plot_feature_importance(method='mutual_info')
        self.plot_feature_importance(method='correlation')
        print(f'Feature visualizations saved to {self.output_dir}')

In [31]:
# Minimal MPI helpers (optional)
def setup_mpi():
    if MPI_AVAILABLE and ('OMPI_COMM_WORLD_RANK' in os.environ or 'PMI_RANK' in os.environ):
        comm = MPI.COMM_WORLD; return comm, comm.Get_rank(), comm.Get_size()
    return None, 0, 1

def distribute_and_process(comm, rank, size, preprocessor: CreditCardPreprocessor):
    if size <= 1:
        return False  # not distributed
    logger.info(f'Rank {rank}: starting distributed preprocessing')
    if rank == 0:
        data = preprocessor.load_data()
        chunks = np.array_split(data, size)
        local = chunks[0]
        for i in range(1, size):
            comm.send(chunks[i], dest=i)
    else:
        local = comm.recv(source=0)
    preprocessor.raw_data = local
    preprocessor.clean_data(); preprocessor.engineer_features()
    if rank == 0:
        all_parts = [preprocessor.processed_data]
        for i in range(1, size):
            all_parts.append(comm.recv(source=i))
        preprocessor.processed_data = pd.concat(all_parts, ignore_index=True)
    else:
        comm.send(preprocessor.processed_data, dest=0)
    return True

In [32]:
# Run the pipeline
cfg = load_config()
pre = CreditCardPreprocessor(
    input_path=cfg.get('input_path', DEFAULT_INPUT),
    output_dir=cfg.get('output_dir', DEFAULT_OUTPUT_DIR),
    test_size=cfg.get('preprocessing',{}).get('test_size', 0.2),
    random_state=cfg.get('preprocessing',{}).get('random_state', 42),
    n_jobs=cfg.get('preprocessing',{}).get('n_jobs', -1),
    memory_threshold=cfg.get('preprocessing',{}).get('memory_threshold', 0.8),
    save_intermediates=cfg.get('preprocessing',{}).get('save_intermediates', True),
    output_formats=cfg.get('preprocessing',{}).get('output_formats', ['csv','parquet']),
    chunk_size=cfg.get('preprocessing',{}).get('chunk_size', 5000)
)
comm, rank, size = setup_mpi()
distributed = distribute_and_process(comm, rank, size, pre) if size > 1 else False
if (not distributed) or (rank == 0):
    Xtr, Xte, ytr, yte = pre.process_pipeline()
    print('Train/Test shapes:', Xtr.shape, Xte.shape)

    # Generate visualizations on the combined training data
    train_df = pd.concat([Xtr, ytr], axis=1)
    viz = FeatureVisualization(train_df, target_col='default.payment.next.month', output_dir=str(Path(pre.output_dir)/'visualizations'))
    viz.generate_feature_report()

2025-08-24 20:50:54,411 - INFO - Starting preprocessing pipeline...
2025-08-24 20:50:54,411 - INFO - Loading data from /home/satyakarthikeya/Desktop/Hpc/archive/UCI_Credit_Card.csv
2025-08-24 20:50:54,411 - INFO - Starting preprocessing pipeline...
2025-08-24 20:50:54,411 - INFO - Loading data from /home/satyakarthikeya/Desktop/Hpc/archive/UCI_Credit_Card.csv
2025-08-24 20:50:54,446 - INFO - Data loaded: (30000, 25), mem ~ 1.95 MB
2025-08-24 20:50:54,448 - INFO - Class 0: 77.88%
2025-08-24 20:50:54,448 - INFO - Class 1: 22.12%
2025-08-24 20:50:54,453 - INFO - Cleaning data...
2025-08-24 20:50:54,446 - INFO - Data loaded: (30000, 25), mem ~ 1.95 MB
2025-08-24 20:50:54,448 - INFO - Class 0: 77.88%
2025-08-24 20:50:54,448 - INFO - Class 1: 22.12%
2025-08-24 20:50:54,453 - INFO - Cleaning data...
2025-08-24 20:51:03,927 - INFO - Top features by correlation with target:
2025-08-24 20:51:03,927 - INFO -   DELAY_COUNT: 0.3984
2025-08-24 20:51:03,928 - INFO -   DELAY_NORM: 0.3984
2025-08-24 20

Train/Test shapes: (24000, 117) (6000, 117)
Feature visualizations saved to /home/satyakarthikeya/Desktop/Hpc/processed_data/visualizations
Feature visualizations saved to /home/satyakarthikeya/Desktop/Hpc/processed_data/visualizations


In [33]:
# Debug: verify class has process_pipeline
print('has process_pipeline:', hasattr(CreditCardPreprocessor, 'process_pipeline'))
methods = [m for m in dir(CreditCardPreprocessor) if 'process' in m]
print('methods containing "process":', methods)

has process_pipeline: True
methods containing "process": ['process_pipeline', 'save_full_processed']
