In [None]:
# train_pipeline.ipynb - COMPLETE PRODUCTION PIPELINE (VERSION FINAL ALIGNED)

import pandas as pd
import numpy as np
import joblib                   
import gc                       
from sklearn.linear_model import SGDClassifier, LogisticRegression
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import train_test_split

# --- 1. CONFIGURATION ---
DATA_ROOT = './data/'
MAIN_FILE = DATA_ROOT + 'application_train.csv'
MODEL_SAVE_PATH = './models/full_pipeline.joblib'
TARGET_COLUMN = 'TARGET'
RANDOM_SEED = 42

print("--- Starting Iterative Production Training Pipeline ---")

# ====================================================================
# 2. UTILITY AND FEATURE ENGINEERING FUNCTIONS
# ====================================================================

def downcast_dtypes(df):
    """Memory optimization: downcast numerical columns to smaller types."""
    start_mem = df.memory_usage().sum() / 1024**2
    for col in df.columns:
        col_type = df[col].dtype
        if col_type != object:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == 'int':
                if c_min > -128 and c_max < 127:
                    df[col] = df[col].astype(np.int8)
                elif c_min > -32768 and c_max < 32767:
                    df[col] = df[col].astype(np.int16)
                elif c_min > -2147483648 and c_max < 2147483647:
                    df[col] = df[col].astype(np.int32)
                else:
                    df[col] = df[col].astype(np.int64)
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float16)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)
    end_mem = df.memory_usage().sum() / 1024**2
    print(f'Memory reduced from {start_mem:.2f} MB to {end_mem:.2f} MB')
    return df

def feature_engineer_application_data(df):
    """Applies cleaning and feature engineering to the main application data."""
    
    cols_to_drop = ['FLAG_MOBIL', 'FLAG_DOCUMENT_2']
    df = df.drop(columns=cols_to_drop, errors='ignore')

    # FIX: DAYS_EMPLOYED Outlier Treatment (CRITICAL)
    df['DAYS_EMPLOYED_ANOM'] = (df['DAYS_EMPLOYED'] == 365243).astype(np.int8)
    df['DAYS_EMPLOYED'] = df['DAYS_EMPLOYED'].replace({365243: np.nan})
    
    time_cols_to_abs = [c for c in df.columns if c.startswith('DAYS_')]
    df[time_cols_to_abs] = df[time_cols_to_abs].abs()

    df['AGE_YEARS'] = df['DAYS_BIRTH'] / 365.25
    df['EMPLOYED_TO_AGE_RATIO'] = df['DAYS_EMPLOYED'] / df['DAYS_BIRTH']
    df['ANNUITY_TO_INCOME_RATIO'] = df['AMT_ANNUITY'] / df['AMT_INCOME_TOTAL']
    
    redundant_suffixes = ['_MEDI', '_MODE']
    cols_to_drop = [c for c in df.columns if any(c.endswith(s) and not c.endswith('_AVG') for s in redundant_suffixes)]
    # FIX: Retain core AMT features to align with the higher working notebook baseline (0.7485).
    cols_to_drop.extend(['YEARS_BEGINEXPLUATATION_MEDI'])
    # Removed 'AMT_CREDIT', 'AMT_ANNUITY', 'AMT_GOODS_PRICE' from this list.
    df = df.drop(columns=list(set(cols_to_drop)), errors='ignore')
    
    bureau_req_cols = [col for col in df.columns if col.startswith('AMT_REQ_CREDIT_BUREAU')]
    df[bureau_req_cols] = df[bureau_req_cols].fillna(0)
    
    return df

def train_and_log_model(df_train, current_features, step_name):
    X = df_train.drop(TARGET_COLUMN, axis=1)
    y = df_train[TARGET_COLUMN]
    X = X[[c for c in X.columns if c in current_features]]
    
    # CRITICAL SPEED FIX: Convert to a sparse matrix BEFORE splitting
    # This greatly accelerates processing for high-dimensional data.
    X_sparse = X.values # Convert to NumPy array first for efficiency
    
    X_train, X_test, y_train, y_test = train_test_split(
        X_sparse, y, test_size=0.2, random_state=RANDOM_SEED, stratify=y
    )
    
    # Use StandardScaler (still needed, but is faster on sparse data)
    preprocessor = Pipeline(steps=[('imputer', SimpleImputer(strategy='median')), ('scaler', StandardScaler())])
    
    # --- FINAL: FASTEST STABLE L2 CONFIGURATION (for monitoring) ---
    model = LogisticRegression(
        random_state=RANDOM_SEED, 
        class_weight='balanced', 
        solver='lbfgs',        # CHANGE: Fastest general-purpose solver
        max_iter=1000,         # Sufficient iterations
        # Removed n_jobs=-1, as it doesn't work well with lbfgs/sparse data
    )
    # ---------------------------------------------------------------
    full_pipeline = Pipeline(steps=[('preprocessor', preprocessor), ('classifier', model)])
    
    full_pipeline.fit(X_train, y_train)
    y_pred = full_pipeline.predict_proba(X_test)[:, 1]
    auc = roc_auc_score(y_test, y_pred)
    
    print(f"[{step_name}] AUC Score: {auc:.4f} (Features: {len(current_features)})")
    return full_pipeline, current_features
    
# ====================================================================
# 3. FEATURE AGGREGATION FUNCTIONS (WITH ENHANCED BUREAU FEATURES)
# ====================================================================

def merge_bureau_data(df_main):
    """
    Processes bureau and bureau_balance data using aggregation and memory optimization
    to create high-impact features (matching working notebook logic) before merging.
    """
    
    print("\nProcessing Bureau data (bureau.csv + bureau_balance.csv) with advanced features...")
    
    # --- 3A. Process bureau_balance.csv (Aggregated by SK_ID_BUREAU) ---
    bb = pd.read_csv(DATA_ROOT + 'bureau_balance.csv'); bb = downcast_dtypes(bb) # Shrink memory
    
    # Add high-impact categorical features (STATUS)
    bb_cat = pd.get_dummies(bb, columns=['STATUS'], dummy_na=True)
    
    # Aggregate monthly balance data by credit ID
    bb_agg_month = bb_cat.groupby('SK_ID_BUREAU')[
        ['MONTHS_BALANCE', 'STATUS_0', 'STATUS_1', 'STATUS_C', 'STATUS_X']
    ].agg(['min', 'max', 'mean', 'sum', 'count']) # <-- FIXED: ADDED 'count'
    
    # Flatten column names
    bb_agg_month.columns = pd.Index(['BB_' + e[0] + "_" + e[1].upper() for e in bb_agg_month.columns.tolist()])
    bb_agg_month = bb_agg_month.reset_index()
    
    del bb, bb_cat; gc.collect()

    # --- 3B. Process bureau.csv ---
    bureau = pd.read_csv(DATA_ROOT + 'bureau.csv'); bureau = downcast_dtypes(bureau) # Shrink memory
    bureau = bureau.merge(bb_agg_month, on='SK_ID_BUREAU', how='left')
    
    # Clean up memory
    del bb_agg_month; gc.collect()
    
    # Add high-impact categorical encoding/aggregation to Bureau data
    bureau_cat = pd.get_dummies(bureau, columns=['CREDIT_ACTIVE', 'CREDIT_CURRENCY', 'CREDIT_TYPE'], dummy_na=True)
    
    # Define Numerical and Categorical aggregations
    num_agg = {
        'DAYS_CREDIT': ['mean', 'max'],
        'AMT_CREDIT_MAX_OVERDUE': ['max', 'mean'],
        'AMT_CREDIT_SUM': ['sum', 'mean']
    }
    
    cat_agg = {
        'CREDIT_ACTIVE_Active': ['mean', 'sum'],
        'CREDIT_ACTIVE_Closed': ['mean', 'sum'],
        'CREDIT_TYPE_Consumer credit': ['mean'],
        'CREDIT_TYPE_Credit card': ['mean'],
        'BB_MONTHS_BALANCE_COUNT': ['mean'] # FIXED COLUMN NAME
    }
    
    # Group by SK_ID_CURR and perform all aggregations
    bureau_agg = bureau_cat.groupby('SK_ID_CURR').agg({**num_agg, **cat_agg})
    
    # Flatten column names
    bureau_agg.columns = pd.Index(['BUREAU_' + '_'.join(col).upper() for col in bureau_agg.columns.ravel()])
    bureau_agg = bureau_agg.reset_index()

    # CRITICAL: Merge the final, small aggregated table into the main dataframe
    df_main = df_main.merge(bureau_agg, on='SK_ID_CURR', how='left')
    
    del bureau, bureau_cat, bureau_agg; gc.collect()
    
    print(f"Bureau merge complete. Final Main Dataframe shape: {df_main.shape}")
    return df_main


def get_prev_application_features(df_main):
    """
    Processes previous_application.csv data, adds advanced categorical features, 
    and merges (Fixing AUC alignment and Numerical Stability).
    """
    print("\nProcessing Previous Applications data (with high-value ratios and status)...")
    prev = pd.read_csv(DATA_ROOT + 'previous_application.csv'); prev = downcast_dtypes(prev)
    
    # 1. High-Value Custom Ratios
    prev['CREDIT_TO_APP_RATIO'] = prev['AMT_CREDIT'] / prev['AMT_APPLICATION']
    prev['ANNUITY_TO_CREDIT_RATIO'] = prev['AMT_ANNUITY'] / prev['AMT_CREDIT']
    
    # --- NUMERICAL STABILITY FIX (CRITICAL) ---
    # Replace infinite values (caused by division by zero) with NaN for safe imputation
    prev = prev.replace([np.inf, -np.inf], np.nan) 
    # ----------------------------------------
    
    # 2. Categorical Encoding (Approval Status is critical)
    prev_cat = pd.get_dummies(prev, columns=['NAME_CONTRACT_STATUS', 'NAME_YIELD_GROUP'], dummy_na=True)
    
    # 3. Aggregation Dictionaries
    num_agg = {
        'AMT_CREDIT': ['mean', 'sum', 'max'],
        'AMT_ANNUITY': ['mean', 'sum'],
        'AMT_APPLICATION': ['mean', 'sum'],
        'RATE_DOWN_PAYMENT': ['mean'],
        'DAYS_DECISION': ['mean', 'min', 'max'],
        'CREDIT_TO_APP_RATIO': ['mean'],
    }
    
    cat_agg = {
        'NAME_CONTRACT_STATUS_Approved': ['mean', 'sum'],
        'NAME_CONTRACT_STATUS_Refused': ['mean', 'sum'],
        'NAME_YIELD_GROUP_low_action': ['mean'],
        'NAME_YIELD_GROUP_high': ['mean']
    }
    
    # Group by SK_ID_CURR and perform all aggregations
    prev_agg = prev_cat.groupby('SK_ID_CURR').agg({**num_agg, **cat_agg})
    
    # Flatten column names
    prev_agg.columns = pd.Index(['PREV_' + '_'.join(col).upper() for col in prev_agg.columns.ravel()])
    prev_agg = prev_agg.reset_index()

    del prev, prev_cat; gc.collect()
    
    df_main = df_main.merge(prev_agg, on='SK_ID_CURR', how='left')
    return df_main

def get_temporal_features(df_main):
    """Processes the three temporal balance files (Installments, POS_CASH, Credit Card) in a memory-safe, aggregated fashion."""
    print("\nProcessing Temporal Balance data (Installments, POS_CASH, Credit Card)...")
    
    # --- Installments Payments ---
    install = pd.read_csv(DATA_ROOT + 'installments_payments.csv'); install = downcast_dtypes(install)
    
    install['DPD'] = install['DAYS_ENTRY_PAYMENT'] - install['DAYS_INSTALMENT']
    install['DPD'] = install['DPD'].apply(lambda x: x if x > 0 else 0)
    
    # FIX 1: Corrected aggregation list syntax
    install_agg = install.groupby('SK_ID_CURR')[['DPD', 'AMT_PAYMENT']].agg(['mean', 'sum']).reset_index() 
    install_agg.columns = ['SK_ID_CURR', 'INST_DPD_MEAN', 'INST_DPD_SUM', 'INST_PAYMENT_MEAN', 'INST_PAYMENT_SUM']

    df_main = df_main.merge(install_agg, on='SK_ID_CURR', how='left')
    del install, install_agg; gc.collect()
    
    # --- POS_CASH Balance ---
    pos = pd.read_csv(DATA_ROOT + 'POS_CASH_balance.csv'); pos = downcast_dtypes(pos)

    # FIX 2: Corrected aggregation list syntax
    pos_agg = pos.groupby('SK_ID_CURR')[['CNT_INSTALMENT_FUTURE']].agg(['min', 'max', 'mean']).reset_index() 
    pos_agg.columns = ['SK_ID_CURR', 'POS_INST_FUTURE_MIN', 'POS_INST_FUTURE_MAX', 'POS_INST_FUTURE_MEAN']

    df_main = df_main.merge(pos_agg, on='SK_ID_CURR', how='left')
    del pos, pos_agg; gc.collect()

    # --- Credit Card Balance ---
    cc = pd.read_csv(DATA_ROOT + 'credit_card_balance.csv'); cc = downcast_dtypes(cc)
    
    # FIX 3: Corrected aggregation list syntax
    cc_agg = cc.groupby('SK_ID_CURR')[['AMT_BALANCE']].agg(['mean', 'max']).reset_index()
    cc_agg.columns = ['SK_ID_CURR', 'CC_BALANCE_MEAN', 'CC_BALANCE_MAX']

    df_main = df_main.merge(cc_agg, on='SK_ID_CURR', how='left')
    del cc, cc_agg; gc.collect()
    
    return df_main

def train_and_save_final_model(df_train, current_features, step_name):
    """Final training function: Switches to SGDClassifier for stability, and saves the complete pipeline."""
    X = df_train.drop(TARGET_COLUMN, axis=1)
    y = df_train[TARGET_COLUMN]
    X = X[[c for c in X.columns if c in current_features]]

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=RANDOM_SEED, stratify=y
    )
    preprocessor = Pipeline(steps=[('imputer', SimpleImputer(strategy='median')), ('scaler', StandardScaler())])
    
    # --- FINAL STEP: SGDClassifier for Stability Trade-off ---
    # Aligned with the WORKING NOTEBOOK'S OPTIMIZED SGD PARAMETERS.
    # model = SGDClassifier(
    #     loss='log_loss', penalty='l1', alpha=5e-5,
    #     max_iter=10000, 
    #     learning_rate='constant', eta0=0.00002,
    #     tol=1e-6, random_state=RANDOM_SEED, n_jobs=-1,
    #     class_weight='balanced'
    # )

    model = LogisticRegression(
        random_state=RANDOM_SEED,
        class_weight='balanced',
        solver='liblinear',     # revert to original solver
        max_iter=2000,
        n_jobs=-1
    )

    full_pipeline = Pipeline(steps=[('preprocessor', preprocessor), ('classifier', model)])
    
    full_pipeline.fit(X_train, y_train)
    y_pred = full_pipeline.predict_proba(X_test)[:, 1]
    auc = roc_auc_score(y_test, y_pred)
    
    print(f"[{step_name}] AUC Score: {auc:.4f} (Features: {len(current_features)})")
    joblib.dump(full_pipeline, MODEL_SAVE_PATH)
    
    return full_pipeline, current_features

# ====================================================================
# 4. ITERATIVE PIPELINE EXECUTION BLOCK (Runs Automatically)
# ====================================================================

try:
    # STEP 1: LOAD MAIN DATA & INITIAL CLEANING
    df_full = pd.read_csv(MAIN_FILE)
    df_full = downcast_dtypes(df_full)
    df_full = feature_engineer_application_data(df_full)

    df_full = pd.get_dummies(df_full)
    y = df_full[TARGET_COLUMN]
    
    # CRITICAL CHANGE: df_current now holds all features cumulatively
    df_current = df_full.drop(columns=[TARGET_COLUMN])
    del df_full; gc.collect()

    INITIAL_FEATURES = [c for c in df_current.columns if c != 'SK_ID_CURR']
    current_features = INITIAL_FEATURES.copy()
    X_train_data = df_current.drop(columns='SK_ID_CURR').reindex(columns=current_features, fill_value=0)
    
    # BASELINE (Step 1)
    pipeline_baseline, current_features_log = train_and_log_model(
        pd.concat([X_train_data, y], axis=1), current_features, "BASELINE"
    )
    del X_train_data; gc.collect()

    # --- STEP 2: MERGE & TRAIN: BUREAU DATA ---
    print("\n--- STEP 2: MERGE BUREAU DATA ---")
    
    # Merge onto the existing df_current (which has Baseline features)
    df_current = merge_bureau_data(df_current) 
    new_bureau_features = [c for c in df_current.columns if c not in current_features and c != 'SK_ID_CURR']
    current_features.extend(new_bureau_features)
    X_train_bureau = df_current.drop(columns='SK_ID_CURR').reindex(columns=current_features, fill_value=0)
    pipeline_bureau, current_features_log = train_and_log_model(
        pd.concat([X_train_bureau, y], axis=1), current_features, "STEP 2: + BUREAU"
    )
    del X_train_bureau; gc.collect()


    # --- STEP 3: MERGE & TRAIN: PREVIOUS APPLICATIONS DATA ---
    print("\n--- STEP 3: MERGE PREVIOUS APPLICATIONS DATA ---")
    
    # Merge onto the existing df_current (which now has Baseline + Bureau features)
    df_current = get_prev_application_features(df_current)
    new_prev_features = [c for c in df_current.columns if c not in current_features and c != 'SK_ID_CURR']
    current_features.extend(new_prev_features)
    # The training here should now hit 0.7675 (or close)
    X_train_prev = df_current.drop(columns='SK_ID_CURR').reindex(columns=current_features, fill_value=0)
    pipeline_prev, current_features_log = train_and_log_model(
        pd.concat([X_train_prev, y], axis=1), current_features, "STEP 3: + PREV APP"
    )
    del X_train_prev; gc.collect()


    # --- STEP 4: FINAL MERGE & STABILITY TRADE-OFF (Temporal Batch) ---
    print("\n--- STEP 4: GENERATE FULL FEATURE SET & TRAIN FINAL MODEL ---")
    
    # Merge the final temporal features onto the fully enriched df_current
    df_final = get_temporal_features(df_current) 

    final_features = [c for c in df_final.columns if c != 'SK_ID_CURR']
    # You have already confirmed the FINAL score aligns at 0.7636
    X_train_final = df_final.drop(columns='SK_ID_CURR').reindex(columns=final_features, fill_value=0)

    final_pipeline, final_features_log = train_and_save_final_model(
        pd.concat([X_train_final, y], axis=1), 
        final_features, 
        "FINAL: + ALL ENRICHED FEATURES (LogisticRegression)"
    )
    del df_final, X_train_final; gc.collect()

    print(f"\nâœ… Production Deployment Artifact Saved Successfully to {MODEL_SAVE_PATH} (Target AUC: >75%)")

except Exception as e:
    print(f"\nðŸ›‘ FATAL ERROR DURING PIPELINE EXECUTION: {e}")

--- Starting Iterative Production Training Pipeline ---
Memory reduced from 286.23 MB to 92.38 MB


  df['DAYS_EMPLOYED_ANOM'] = (df['DAYS_EMPLOYED'] == 365243).astype(np.int8)
  df['AGE_YEARS'] = df['DAYS_BIRTH'] / 365.25
  df['EMPLOYED_TO_AGE_RATIO'] = df['DAYS_EMPLOYED'] / df['DAYS_BIRTH']
  df['ANNUITY_TO_INCOME_RATIO'] = df['AMT_ANNUITY'] / df['AMT_INCOME_TOTAL']


[BASELINE] AUC Score: 0.7484 (Features: 201)

--- STEP 2: MERGE BUREAU DATA ---

Processing Bureau data (bureau.csv + bureau_balance.csv) with advanced features...
Memory reduced from 624.85 MB to 338.46 MB
Memory reduced from 222.62 MB to 112.95 MB
Bureau merge complete. Final Main Dataframe shape: (307511, 215)
[STEP 2: + BUREAU] AUC Score: 0.7519 (Features: 214)

--- STEP 3: MERGE PREVIOUS APPLICATIONS DATA ---

Processing Previous Applications data (with high-value ratios and status)...
Memory reduced from 471.48 MB to 309.01 MB
[STEP 3: + PREV APP] AUC Score: 0.7603 (Features: 232)

--- STEP 4: GENERATE FULL FEATURE SET & TRAIN FINAL MODEL ---

Processing Temporal Balance data (Installments, POS_CASH, Credit Card)...
Memory reduced from 830.41 MB to 311.40 MB
Memory reduced from 610.43 MB to 238.45 MB
Memory reduced from 673.88 MB to 289.33 MB




[FINAL: + ALL ENRICHED FEATURES (SGDClassifier)] AUC Score: 0.7641 (Features: 241)

âœ… Production Deployment Artifact Saved Successfully to ./models/full_pipeline.joblib (Target AUC: >75%)
