# UCI SECOM Semiconductor Defect Detection Analysis

**Objective**: Predict semiconductor manufacturing failures (defects) using sensor data.

**Approach**:
1. Label-free feature engineering (to avoid data leakage)
2. Multiple resampling strategies for class imbalance
3. Multiple models with hyperparameter tuning via CV
4. Primary metric: Recall@Precision>0.2
5. SHAP interpretability for top performers

In [None]:
# Cell 0: Install required libraries (run once)
# Uncomment and run if any packages are missing

!pip install pandas numpy matplotlib seaborn scikit-learn lightgbm xgboost imbalanced-learn shap --quiet

print("All required packages installed!")

In [None]:
# Cell 1: Setup and Imports
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from lightgbm import LGBMClassifier
from xgboost import XGBClassifier
from sklearn.metrics import (
    f1_score, precision_recall_curve, auc,
    confusion_matrix, classification_report,
    average_precision_score, recall_score, precision_score
)
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import TomekLinks, RandomUnderSampler
from imblearn.combine import SMOTETomek, SMOTEENN
import shap
import warnings
from itertools import product
from collections import defaultdict

warnings.filterwarnings('ignore')
np.random.seed(42)

print("All imports successful!")

In [None]:
# Cell 2: Load Data
df = pd.read_csv('uci-secom.csv')

# Rename target column
df = df.rename(columns={'Pass/Fail': 'target'})

# Convert target: -1 (Pass) -> 0, 1 (Fail) -> 1
df['target'] = df['target'].map({-1: 0, 1: 1})

print(f"Dataset shape: {df.shape}")
print(f"\nTarget distribution:")
print(df['target'].value_counts())
print(f"\nDefect rate: {df['target'].mean():.2%}")
print(f"\nFirst few rows:")
df.head()

In [None]:
# Cell 3: Train-Test Split (80/20 stratified)

# Separate features and target
# Drop 'Time' column if it exists (not a sensor feature)
feature_cols = [col for col in df.columns if col not in ['target', 'Time']]
X = df[feature_cols]
y = df['target']

print(f"Total features: {len(feature_cols)}")

# Stratified train-test split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

print(f"\nTrain set: {X_train.shape[0]} samples")
print(f"Test set: {X_test.shape[0]} samples")
print(f"\nTrain defect rate: {y_train.mean():.2%}")
print(f"Test defect rate: {y_test.mean():.2%}")

## Feature Engineering (Label-Free)

All feature selection is done without looking at labels to prevent data leakage.

In [None]:
# Cell 4: Remove columns with >=80% missing values

# Calculate missingness on training set
missing_pct = X_train.isnull().mean() * 100

# Columns to drop (>=80% missing)
cols_drop_high_missing = missing_pct[missing_pct >= 80].index.tolist()

print(f"Columns with >=80% missing: {len(cols_drop_high_missing)}")
print(f"Examples: {cols_drop_high_missing[:5]}")

# Store for later
feature_engineering_log = {'original_features': len(feature_cols)}
feature_engineering_log['dropped_high_missing'] = len(cols_drop_high_missing)

In [None]:
# Cell 5: Create missingness indicators for 50-80% missing columns

# Columns with 50-80% missing
cols_medium_missing = missing_pct[(missing_pct >= 50) & (missing_pct < 80)].index.tolist()

print(f"Columns with 50-80% missing: {len(cols_medium_missing)}")

# Create missingness indicator column names
missingness_indicators = [f"{col}_missing" for col in cols_medium_missing]

feature_engineering_log['medium_missing_to_indicators'] = len(cols_medium_missing)

In [None]:
# Cell 6: Remove constant and near-constant columns

# Work with remaining columns (exclude high missing ones)
remaining_cols = [col for col in X_train.columns 
                  if col not in cols_drop_high_missing and col not in cols_medium_missing]

cols_to_drop_constant = []

for col in remaining_cols:
    col_data = X_train[col].dropna()
    
    if len(col_data) == 0:
        cols_to_drop_constant.append(col)
        continue
    
    # Check 1: Single unique value
    if col_data.nunique() == 1:
        cols_to_drop_constant.append(col)
        continue
    
    # Check 2: IQR = 0 (zero variability in middle 50%)
    q1, q3 = col_data.quantile([0.25, 0.75])
    if q3 - q1 == 0:
        cols_to_drop_constant.append(col)
        continue
    
    # Check 3: Top value >99% frequency
    top_freq = col_data.value_counts(normalize=True).iloc[0]
    if top_freq > 0.99:
        cols_to_drop_constant.append(col)
        continue
    
    # Check 4: count_nonmode < 20
    mode_val = col_data.mode().iloc[0] if len(col_data.mode()) > 0 else None
    if mode_val is not None:
        count_nonmode = (col_data != mode_val).sum()
        if count_nonmode < 20:
            cols_to_drop_constant.append(col)
            continue

print(f"Columns dropped (constant/near-constant): {len(cols_to_drop_constant)}")

feature_engineering_log['dropped_constant'] = len(cols_to_drop_constant)

In [None]:
# Cell 7: Remove highly correlated columns (>95%)

# Get columns remaining after previous steps
cols_for_corr = [col for col in remaining_cols if col not in cols_to_drop_constant]

print(f"Checking correlation among {len(cols_for_corr)} columns...")

# Calculate correlation matrix (use pairwise complete observations)
corr_matrix = X_train[cols_for_corr].corr().abs()

# Find highly correlated pairs
cols_to_drop_corr = set()
checked_pairs = set()

for i, col1 in enumerate(cols_for_corr):
    for col2 in cols_for_corr[i+1:]:
        if (col1, col2) in checked_pairs or col1 in cols_to_drop_corr or col2 in cols_to_drop_corr:
            continue
        
        corr_val = corr_matrix.loc[col1, col2]
        
        if corr_val > 0.95:
            # Decide which to drop based on:
            # 1. More missing values
            # 2. Lower variability (std)
            missing1 = X_train[col1].isnull().sum()
            missing2 = X_train[col2].isnull().sum()
            
            if missing1 != missing2:
                drop_col = col1 if missing1 > missing2 else col2
            else:
                std1 = X_train[col1].std()
                std2 = X_train[col2].std()
                drop_col = col1 if std1 < std2 else col2
            
            cols_to_drop_corr.add(drop_col)
        
        checked_pairs.add((col1, col2))

cols_to_drop_corr = list(cols_to_drop_corr)
print(f"Columns dropped (high correlation): {len(cols_to_drop_corr)}")

feature_engineering_log['dropped_correlated'] = len(cols_to_drop_corr)

In [None]:
# Cell 8: Lock feature set

# Final feature columns (original features to keep)
all_cols_to_drop = set(cols_drop_high_missing + cols_medium_missing + 
                       cols_to_drop_constant + cols_to_drop_corr)

final_feature_cols = [col for col in feature_cols if col not in all_cols_to_drop]

# Add missingness indicators to final feature list
final_feature_cols_with_indicators = final_feature_cols + missingness_indicators

print("="*60)
print("FEATURE ENGINEERING SUMMARY")
print("="*60)
print(f"Original features: {feature_engineering_log['original_features']}")
print(f"Dropped (>=80% missing): {feature_engineering_log['dropped_high_missing']}")
print(f"Converted to indicators (50-80% missing): {feature_engineering_log['medium_missing_to_indicators']}")
print(f"Dropped (constant/near-constant): {feature_engineering_log['dropped_constant']}")
print(f"Dropped (high correlation): {feature_engineering_log['dropped_correlated']}")
print(f"\nFinal numeric features: {len(final_feature_cols)}")
print(f"Missingness indicators: {len(missingness_indicators)}")
print(f"Total final features: {len(final_feature_cols_with_indicators)}")

# Store for later use
FINAL_NUMERIC_COLS = final_feature_cols
MISSINGNESS_INDICATOR_SOURCE_COLS = cols_medium_missing
MISSINGNESS_INDICATOR_COLS = missingness_indicators

In [None]:
# Cell 9: Define CV Splits

N_SPLITS = 5
skf = StratifiedKFold(n_splits=N_SPLITS, shuffle=True, random_state=42)

print(f"Using {N_SPLITS}-fold stratified cross-validation")

# Preview fold sizes
for fold_idx, (train_idx, val_idx) in enumerate(skf.split(X_train, y_train)):
    print(f"Fold {fold_idx+1}: Train={len(train_idx)}, Val={len(val_idx)}, "
          f"Val defect rate={y_train.iloc[val_idx].mean():.2%}")

## Processing Functions

In [None]:
# Cell 10: Data preparation function

def prepare_features(X_data, numeric_cols, indicator_source_cols, indicator_cols):
    """
    Prepare feature matrix by:
    1. Selecting numeric columns
    2. Creating missingness indicators
    """
    # Select numeric features
    X_numeric = X_data[numeric_cols].copy()
    
    # Create missingness indicators
    for src_col, ind_col in zip(indicator_source_cols, indicator_cols):
        if src_col in X_data.columns:
            X_numeric[ind_col] = X_data[src_col].isnull().astype(int)
        else:
            X_numeric[ind_col] = 0
    
    return X_numeric

print("prepare_features() defined")

In [None]:
# Cell 11: Imputation and scaling functions

def impute_and_scale(X_train_fold, X_val_fold):
    """
    Impute missing values with median and scale features.
    Fitted on training fold only, applied to both.
    """
    # Imputation
    imputer = SimpleImputer(strategy='median')
    X_train_imputed = pd.DataFrame(
        imputer.fit_transform(X_train_fold),
        columns=X_train_fold.columns,
        index=X_train_fold.index
    )
    X_val_imputed = pd.DataFrame(
        imputer.transform(X_val_fold),
        columns=X_val_fold.columns,
        index=X_val_fold.index
    )
    
    # Scaling
    scaler = StandardScaler()
    X_train_scaled = pd.DataFrame(
        scaler.fit_transform(X_train_imputed),
        columns=X_train_imputed.columns,
        index=X_train_imputed.index
    )
    X_val_scaled = pd.DataFrame(
        scaler.transform(X_val_imputed),
        columns=X_val_imputed.columns,
        index=X_val_imputed.index
    )
    
    return X_train_scaled, X_val_scaled, imputer, scaler

print("impute_and_scale() defined")

In [None]:
# Cell 12: Resampling strategies

def apply_resampling(X_train_fold, y_train_fold, strategy):
    """
    Apply resampling strategy to training data.
    
    Strategies:
    - 'none': No resampling (use class weights in model)
    - 'undersample': Random undersampling to 1:3 pos:neg ratio
    - 'smote_tomek': SMOTE + Tomek links
    - 'smote_enn': SMOTE + ENN (k=3)
    """
    if strategy == 'none':
        return X_train_fold, y_train_fold
    
    elif strategy == 'undersample':
        # Target 1:3 ratio (pos:neg)
        n_pos = y_train_fold.sum()
        n_neg_target = n_pos * 3
        
        rus = RandomUnderSampler(
            sampling_strategy={0: min(n_neg_target, (y_train_fold == 0).sum()), 1: n_pos},
            random_state=42
        )
        X_resampled, y_resampled = rus.fit_resample(X_train_fold, y_train_fold)
        return X_resampled, y_resampled
    
    elif strategy == 'smote_tomek':
        # SMOTE first, then Tomek links
        n_pos = y_train_fold.sum()
        k_neighbors = min(3, n_pos - 1) if n_pos > 1 else 1
        
        smote_tomek = SMOTETomek(
            smote=SMOTE(k_neighbors=k_neighbors, random_state=42),
            random_state=42
        )
        X_resampled, y_resampled = smote_tomek.fit_resample(X_train_fold, y_train_fold)
        return X_resampled, y_resampled
    
    elif strategy == 'smote_enn':
        # SMOTE + ENN with k=3
        n_pos = y_train_fold.sum()
        k_neighbors = min(3, n_pos - 1) if n_pos > 1 else 1
        
        smote_enn = SMOTEENN(
            smote=SMOTE(k_neighbors=k_neighbors, random_state=42),
            random_state=42
        )
        X_resampled, y_resampled = smote_enn.fit_resample(X_train_fold, y_train_fold)
        return X_resampled, y_resampled
    
    else:
        raise ValueError(f"Unknown resampling strategy: {strategy}")

RESAMPLING_STRATEGIES = ['none', 'undersample', 'smote_tomek', 'smote_enn']
print(f"Resampling strategies: {RESAMPLING_STRATEGIES}")

## Model Definitions

In [None]:
# Cell 13: Define model configurations with hyperparameter grids

MODEL_CONFIGS = {
    'LogisticRegression': {
        'model_class': LogisticRegression,
        'base_params': {'solver': 'saga', 'penalty': 'elasticnet', 'max_iter': 1000, 'random_state': 42},
        'param_grid': {
            'C': [0.01, 0.1, 1.0],
            'l1_ratio': [0.3, 0.5, 0.7]
        }
    },
    'LightGBM': {
        'model_class': LGBMClassifier,
        'base_params': {'random_state': 42, 'verbose': -1, 'n_jobs': -1},
        'param_grid': {
            'num_leaves': [15, 31],
            'min_data_in_leaf': [10, 20],
            'learning_rate': [0.05, 0.1]
        }
    },
    'XGBoost': {
        'model_class': XGBClassifier,
        'base_params': {'random_state': 42, 'eval_metric': 'logloss', 'n_jobs': -1},
        'param_grid': {
            'max_depth': [3, 5],
            'min_child_weight': [1, 5],
            'learning_rate': [0.05, 0.1]  # XGBoost uses learning_rate, not eta
        }
    },
    'RandomForest': {
        'model_class': RandomForestClassifier,
        'base_params': {'random_state': 42, 'n_jobs': -1},
        'param_grid': {
            'n_estimators': [100, 200],
            'max_depth': [5, 10],
            'min_samples_leaf': [5, 10]
        }
    }
}

# Display grid sizes
for model_name, config in MODEL_CONFIGS.items():
    grid_size = 1
    for param_values in config['param_grid'].values():
        grid_size *= len(param_values)
    print(f"{model_name}: {grid_size} hyperparameter combinations")

In [None]:
# Cell 14: Evaluation metrics

def recall_at_precision_threshold(y_true, y_proba, precision_threshold=0.2):
    """
    Calculate recall at the threshold where precision >= precision_threshold.
    Returns the maximum recall achievable while maintaining precision >= threshold.
    Used ONLY during CV for threshold optimization.
    """
    precisions, recalls, thresholds = precision_recall_curve(y_true, y_proba)
    
    # Find thresholds where precision >= threshold
    valid_indices = np.where(precisions >= precision_threshold)[0]
    
    if len(valid_indices) == 0:
        return 0.0, None  # No threshold achieves required precision
    
    # Get maximum recall among valid thresholds
    max_recall_idx = valid_indices[np.argmax(recalls[valid_indices])]
    best_recall = recalls[max_recall_idx]
    
    # Get corresponding threshold (handle edge case)
    if max_recall_idx < len(thresholds):
        best_threshold = thresholds[max_recall_idx]
    else:
        best_threshold = thresholds[-1] if len(thresholds) > 0 else 0.5
    
    return best_recall, best_threshold


def find_optimal_threshold(y_true, y_proba, precision_threshold=0.2):
    """
    Find optimal threshold that maximizes recall while maintaining precision >= threshold.
    Fallback: if no threshold satisfies precision constraint, use max F1.
    """
    recall_at_prec, threshold = recall_at_precision_threshold(y_true, y_proba, precision_threshold)
    
    if threshold is not None and recall_at_prec > 0:
        return threshold, 'recall@precision'
    
    # Fallback: find threshold with max F1
    precisions, recalls, thresholds = precision_recall_curve(y_true, y_proba)
    
    # Calculate F1 for each threshold
    f1_scores = 2 * (precisions * recalls) / (precisions + recalls + 1e-10)
    
    best_f1_idx = np.argmax(f1_scores[:-1])  # Exclude last element (undefined threshold)
    best_threshold = thresholds[best_f1_idx]
    
    return best_threshold, 'max_f1'


def calculate_metrics_cv(y_true, y_proba, threshold=0.5):
    """
    Calculate metrics for CV (includes threshold sweeping for recall@prec>0.2).
    """
    y_pred = (y_proba >= threshold).astype(int)
    
    pr_auc = average_precision_score(y_true, y_proba)
    f1 = f1_score(y_true, y_pred, zero_division=0)
    recall = recall_score(y_true, y_pred, zero_division=0)
    precision = precision_score(y_true, y_pred, zero_division=0)
    recall_at_prec, _ = recall_at_precision_threshold(y_true, y_proba, 0.2)
    
    return {
        'pr_auc': pr_auc,
        'f1': f1,
        'recall_at_prec_0.2': recall_at_prec,
        'recall': recall,
        'precision': precision,
        'threshold': threshold
    }


def calculate_metrics_test(y_true, y_proba, threshold):
    """
    Calculate metrics for test set (fixed threshold from CV, no sweeping).
    """
    y_pred = (y_proba >= threshold).astype(int)
    
    pr_auc = average_precision_score(y_true, y_proba)
    f1 = f1_score(y_true, y_pred, zero_division=0)
    recall = recall_score(y_true, y_pred, zero_division=0)
    precision = precision_score(y_true, y_pred, zero_division=0)
    
    return {
        'pr_auc': pr_auc,
        'f1': f1,
        'recall': recall,
        'precision': precision,
        'threshold': threshold
    }

print("Evaluation functions defined")

## Cross-Validation Training

In [None]:
# Cell 15: Main CV training loop

def run_cv_experiment(model_name, model_config, resampling_strategy, X_train_full, y_train_full, skf):
    """
    Run cross-validation for a single model + resampling combination.
    Returns OOF predictions and best hyperparameters.
    """
    # Prepare features
    X_train_prepared = prepare_features(
        X_train_full, FINAL_NUMERIC_COLS, 
        MISSINGNESS_INDICATOR_SOURCE_COLS, MISSINGNESS_INDICATOR_COLS
    )
    
    # Generate all hyperparameter combinations
    param_grid = model_config['param_grid']
    param_names = list(param_grid.keys())
    param_combinations = list(product(*param_grid.values()))
    
    best_score = -np.inf
    best_params = None
    best_oof_proba = None
    
    # Grid search
    for param_values in param_combinations:
        params = dict(zip(param_names, param_values))
        full_params = {**model_config['base_params'], **params}
        
        # Add class_weight for 'none' resampling strategy
        if resampling_strategy == 'none' and model_name != 'XGBoost':
            full_params['class_weight'] = 'balanced'
        elif resampling_strategy == 'none' and model_name == 'XGBoost':
            # XGBoost uses scale_pos_weight
            n_neg = (y_train_full == 0).sum()
            n_pos = (y_train_full == 1).sum()
            full_params['scale_pos_weight'] = n_neg / n_pos
        
        # Collect OOF predictions
        oof_proba = np.zeros(len(y_train_full))
        fold_scores = []
        
        for fold_idx, (train_idx, val_idx) in enumerate(skf.split(X_train_prepared, y_train_full)):
            # Split
            X_fold_train = X_train_prepared.iloc[train_idx]
            X_fold_val = X_train_prepared.iloc[val_idx]
            y_fold_train = y_train_full.iloc[train_idx]
            y_fold_val = y_train_full.iloc[val_idx]
            
            # Impute and scale
            X_fold_train_scaled, X_fold_val_scaled, _, _ = impute_and_scale(X_fold_train, X_fold_val)
            
            # Resample (training only)
            X_fold_train_resampled, y_fold_train_resampled = apply_resampling(
                X_fold_train_scaled, y_fold_train, resampling_strategy
            )
            
            # Train model
            model = model_config['model_class'](**full_params)
            model.fit(X_fold_train_resampled, y_fold_train_resampled)
            
            # Predict probabilities
            fold_proba = model.predict_proba(X_fold_val_scaled)[:, 1]
            oof_proba[val_idx] = fold_proba
            
            # Fold score (PR-AUC for selection)
            fold_score = average_precision_score(y_fold_val, fold_proba)
            fold_scores.append(fold_score)
        
        # Average score across folds
        mean_score = np.mean(fold_scores)
        
        if mean_score > best_score:
            best_score = mean_score
            best_params = params
            best_oof_proba = oof_proba.copy()
    
    return best_oof_proba, best_params, best_score

print("run_cv_experiment() defined")

In [None]:
# Cell 16: Run all experiments

print("="*60)
print("STARTING CROSS-VALIDATION EXPERIMENTS")
print("="*60)

results = []
oof_predictions = {}  # Store OOF predictions for threshold optimization

total_experiments = len(MODEL_CONFIGS) * len(RESAMPLING_STRATEGIES)
exp_count = 0

for model_name, model_config in MODEL_CONFIGS.items():
    for resampling in RESAMPLING_STRATEGIES:
        exp_count += 1
        print(f"\n[{exp_count}/{total_experiments}] {model_name} + {resampling}")
        
        try:
            # Run CV
            oof_proba, best_params, cv_pr_auc = run_cv_experiment(
                model_name, model_config, resampling, X_train, y_train, skf
            )
            
            # Find optimal threshold
            opt_threshold, threshold_method = find_optimal_threshold(y_train, oof_proba, 0.2)
            
            # Calculate metrics with optimal threshold (CV uses sweep)
            metrics = calculate_metrics_cv(y_train, oof_proba, opt_threshold)
            
            # Store results
            result = {
                'model': model_name,
                'resampling': resampling,
                'best_params': best_params,
                'cv_pr_auc': cv_pr_auc,
                'optimal_threshold': opt_threshold,
                'threshold_method': threshold_method,
                **metrics
            }
            results.append(result)
            
            # Store OOF predictions
            key = f"{model_name}_{resampling}"
            oof_predictions[key] = {
                'proba': oof_proba,
                'best_params': best_params,
                'threshold': opt_threshold
            }
            
            print(f"  Best params: {best_params}")
            print(f"  CV PR-AUC: {cv_pr_auc:.4f}")
            print(f"  Recall@Prec>0.2: {metrics['recall_at_prec_0.2']:.4f}")
            print(f"  F1: {metrics['f1']:.4f}")
            print(f"  Optimal threshold: {opt_threshold:.3f} ({threshold_method})")
            
        except Exception as e:
            print(f"  ERROR: {str(e)}")
            continue

print("\n" + "="*60)
print("CV EXPERIMENTS COMPLETE")
print("="*60)

In [None]:
# Cell 17: Results summary

results_df = pd.DataFrame(results)

# Sort by primary metric (Recall@Precision>0.2)
results_df = results_df.sort_values('recall_at_prec_0.2', ascending=False)

# Display key columns
display_cols = ['model', 'resampling', 'recall_at_prec_0.2', 'f1', 'pr_auc', 'optimal_threshold']
print("\nCV RESULTS (sorted by Recall@Precision>0.2):")
print(results_df[display_cols].to_string(index=False))

# Best performers
print("\n" + "="*60)
print("TOP 3 PERFORMERS:")
print("="*60)
for i, row in results_df.head(3).iterrows():
    print(f"\n{row['model']} + {row['resampling']}")
    print(f"  Recall@Prec>0.2: {row['recall_at_prec_0.2']:.4f}")
    print(f"  F1: {row['f1']:.4f}")
    print(f"  PR-AUC: {row['pr_auc']:.4f}")
    print(f"  Best params: {row['best_params']}")

## Final Test Set Evaluation

In [None]:
# Cell 18: Prepare full training and test sets

# Prepare features for full training set
X_train_prepared = prepare_features(
    X_train, FINAL_NUMERIC_COLS,
    MISSINGNESS_INDICATOR_SOURCE_COLS, MISSINGNESS_INDICATOR_COLS
)

# Prepare features for test set
X_test_prepared = prepare_features(
    X_test, FINAL_NUMERIC_COLS,
    MISSINGNESS_INDICATOR_SOURCE_COLS, MISSINGNESS_INDICATOR_COLS
)

# Impute and scale (fit on full training set)
X_train_final, X_test_final, final_imputer, final_scaler = impute_and_scale(
    X_train_prepared, X_test_prepared
)

print(f"Final training set shape: {X_train_final.shape}")
print(f"Final test set shape: {X_test_final.shape}")

In [None]:
# Cell 19: Refit top models and evaluate on test set

print("="*60)
print("TEST SET EVALUATION")
print("="*60)

test_results = []
final_models = {}  # Store for SHAP

# Evaluate all model + resampling combinations
for idx, row in results_df.iterrows():
    model_name = row['model']
    resampling = row['resampling']
    best_params = row['best_params']
    opt_threshold = row['optimal_threshold']
    
    model_config = MODEL_CONFIGS[model_name]
    
    print(f"\nEvaluating: {model_name} + {resampling}")
    
    try:
        # Prepare params
        full_params = {**model_config['base_params'], **best_params}
        
        if resampling == 'none' and model_name != 'XGBoost':
            full_params['class_weight'] = 'balanced'
        elif resampling == 'none' and model_name == 'XGBoost':
            n_neg = (y_train == 0).sum()
            n_pos = (y_train == 1).sum()
            full_params['scale_pos_weight'] = n_neg / n_pos
        
        # Apply resampling to full training set
        X_train_resampled, y_train_resampled = apply_resampling(
            X_train_final, y_train, resampling
        )
        
        # Fit model
        model = model_config['model_class'](**full_params)
        model.fit(X_train_resampled, y_train_resampled)
        
        # Predict on test set
        test_proba = model.predict_proba(X_test_final)[:, 1]
        test_pred = (test_proba >= opt_threshold).astype(int)
        
        # Calculate metrics - use fixed threshold, no sweeping
        test_metrics = calculate_metrics_test(y_test, test_proba, opt_threshold)
        
        test_result = {
            'model': model_name,
            'resampling': resampling,
            **test_metrics
        }
        test_results.append(test_result)
        
        # Store model
        key = f"{model_name}_{resampling}"
        final_models[key] = {
            'model': model,
            'threshold': opt_threshold,
            'test_proba': test_proba,
            'test_pred': test_pred
        }
        
        print(f"  F1: {test_metrics['f1']:.4f}")
        print(f"  PR-AUC: {test_metrics['pr_auc']:.4f}")
        print(f"  Recall: {test_metrics['recall']:.4f}")
        print(f"  Precision: {test_metrics['precision']:.4f}")
        
    except Exception as e:
        print(f"  ERROR: {str(e)}")
        continue

print("\n" + "="*60)

In [None]:
# Cell 20: Test results summary and ranking

test_results_df = pd.DataFrame(test_results)
test_results_df = test_results_df.sort_values('f1', ascending=False)

print("\nTEST SET RESULTS (sorted by F1):")
print("="*80)
display_cols = ['model', 'resampling', 'f1', 'pr_auc', 'recall', 'precision', 'threshold']
print(test_results_df[display_cols].to_string(index=False))

# Highlight top performers
print("\n" + "="*80)
print("TOP 3 TEST SET PERFORMERS (by F1):")
print("="*80)
top_3 = test_results_df.head(3)
for i, (_, row) in enumerate(top_3.iterrows()):
    print(f"\n#{i+1}: {row['model']} + {row['resampling']}")
    print(f"     F1: {row['f1']:.4f}")
    print(f"     PR-AUC: {row['pr_auc']:.4f}")
    print(f"     Recall: {row['recall']:.4f}, Precision: {row['precision']:.4f}")
    print(f"     Threshold (from CV): {row['threshold']:.4f}")

In [None]:
# Cell 21: Confusion matrices for top 3

fig, axes = plt.subplots(1, 3, figsize=(15, 4))

top_3_keys = [f"{row['model']}_{row['resampling']}" for _, row in top_3.iterrows()]

for ax, key in zip(axes, top_3_keys):
    if key in final_models:
        test_pred = final_models[key]['test_pred']
        cm = confusion_matrix(y_test, test_pred)
        
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=ax,
                    xticklabels=['Pass', 'Fail'], yticklabels=['Pass', 'Fail'])
        ax.set_title(key.replace('_', '\n'))
        ax.set_xlabel('Predicted')
        ax.set_ylabel('Actual')

plt.tight_layout()
plt.suptitle('Confusion Matrices - Top 3 Models', y=1.02)
plt.show()

In [None]:
# Cell 22: PR curves for top 3

plt.figure(figsize=(10, 6))

for key in top_3_keys:
    if key in final_models:
        test_proba = final_models[key]['test_proba']
        precision, recall, _ = precision_recall_curve(y_test, test_proba)
        pr_auc = auc(recall, precision)
        plt.plot(recall, precision, label=f"{key} (AUC={pr_auc:.3f})")

plt.axhline(y=0.2, color='r', linestyle='--', label='Precision=0.2 threshold')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curves - Top 3 Models')
plt.legend(loc='best')
plt.grid(True, alpha=0.3)
plt.show()

## SHAP Interpretability

**Warning**: Do not change anything after viewing SHAP results. If changes are needed, rerun the entire pipeline.

In [None]:
# Cell 23: Select top 2-3 performers for SHAP analysis

# Select top 3 (accounting for potential winner's curse, we analyze multiple)
shap_models = []

for i, (_, row) in enumerate(top_3.iterrows()):
    key = f"{row['model']}_{row['resampling']}"
    if key in final_models:
        shap_models.append({
            'name': key,
            'model': final_models[key]['model'],
            'threshold': final_models[key]['threshold']
        })

print(f"Models selected for SHAP analysis: {[m['name'] for m in shap_models]}")

In [None]:
# Cell 24: Global SHAP - Bar and Beeswarm plots

for model_info in shap_models:
    model_name = model_info['name']
    model = model_info['model']
    
    print(f"\n{'='*60}")
    print(f"SHAP Analysis: {model_name}")
    print(f"{'='*60}")
    
    # Create SHAP explainer
    if 'LightGBM' in model_name or 'XGBoost' in model_name or 'RandomForest' in model_name:
        explainer = shap.TreeExplainer(model)
    else:
        # For Logistic Regression, use a sample for background
        background = shap.sample(X_train_final, 100, random_state=42)
        explainer = shap.LinearExplainer(model, background)
    
    # Calculate SHAP values on test set
    shap_values = explainer.shap_values(X_test_final)
    
    # Handle different SHAP output formats
    if isinstance(shap_values, list):
        # For classifiers that return [class_0, class_1]
        shap_values_display = shap_values[1]  # Use positive class
    else:
        shap_values_display = shap_values
    
    # Bar plot (global feature importance)
    plt.figure(figsize=(10, 8))
    shap.summary_plot(shap_values_display, X_test_final, plot_type='bar', 
                      max_display=20, show=False)
    plt.title(f'Global Feature Importance - {model_name}')
    plt.tight_layout()
    plt.show()
    
    # Beeswarm plot
    plt.figure(figsize=(10, 8))
    shap.summary_plot(shap_values_display, X_test_final, max_display=20, show=False)
    plt.title(f'SHAP Beeswarm - {model_name}')
    plt.tight_layout()
    plt.show()
    
    # Store for later use
    model_info['explainer'] = explainer
    model_info['shap_values'] = shap_values_display

In [None]:
# Cell 25: Local SHAP - Waterfall and Force plots for example cases

# Use first model for local explanations
model_info = shap_models[0]
model_name = model_info['name']
model = model_info['model']
threshold = model_info['threshold']
shap_values_display = model_info['shap_values']

print(f"\nLocal explanations for: {model_name}")
print("="*60)

# Get predictions
test_proba = model.predict_proba(X_test_final)[:, 1]
test_pred = (test_proba >= threshold).astype(int)

# Find example cases
tp_idx = np.where((test_pred == 1) & (y_test.values == 1))[0]
fn_idx = np.where((test_pred == 0) & (y_test.values == 1))[0]
fp_idx = np.where((test_pred == 1) & (y_test.values == 0))[0]

print(f"True Positives: {len(tp_idx)}")
print(f"False Negatives: {len(fn_idx)}")
print(f"False Positives: {len(fp_idx)}")

# Create SHAP Explanation object for waterfall plots
shap_exp = shap.Explanation(
    values=shap_values_display,
    base_values=model_info['explainer'].expected_value if not isinstance(model_info['explainer'].expected_value, list) 
                else model_info['explainer'].expected_value[1],
    data=X_test_final.values,
    feature_names=X_test_final.columns.tolist()
)

In [None]:
# Cell 25b: Waterfall plots for TP, FN, FP examples

# True Positive example
if len(tp_idx) > 0:
    idx = tp_idx[0]
    print(f"\nTrue Positive Example (index {idx}):")
    print(f"  Predicted probability: {test_proba[idx]:.3f}")
    print(f"  Actual: Fail")
    
    plt.figure(figsize=(10, 6))
    shap.plots.waterfall(shap_exp[idx], max_display=15, show=False)
    plt.title(f'True Positive - Sample {idx}')
    plt.tight_layout()
    plt.show()

# False Negative example
if len(fn_idx) > 0:
    idx = fn_idx[0]
    print(f"\nFalse Negative Example (index {idx}):")
    print(f"  Predicted probability: {test_proba[idx]:.3f}")
    print(f"  Actual: Fail (missed!)")
    
    plt.figure(figsize=(10, 6))
    shap.plots.waterfall(shap_exp[idx], max_display=15, show=False)
    plt.title(f'False Negative - Sample {idx}')
    plt.tight_layout()
    plt.show()

# False Positive example
if len(fp_idx) > 0:
    idx = fp_idx[0]
    print(f"\nFalse Positive Example (index {idx}):")
    print(f"  Predicted probability: {test_proba[idx]:.3f}")
    print(f"  Actual: Pass (false alarm)")
    
    plt.figure(figsize=(10, 6))
    shap.plots.waterfall(shap_exp[idx], max_display=15, show=False)
    plt.title(f'False Positive - Sample {idx}')
    plt.tight_layout()
    plt.show()

In [None]:
# Cell 26: Force plots for the same examples

# Initialize JS visualization
shap.initjs()

# Force plot for TP
if len(tp_idx) > 0:
    idx = tp_idx[0]
    print(f"\nForce Plot - True Positive (index {idx}):")
    display(shap.force_plot(
        shap_exp.base_values[idx] if hasattr(shap_exp.base_values, '__len__') and len(shap_exp.base_values) > 1 else shap_exp.base_values,
        shap_exp.values[idx],
        X_test_final.iloc[idx],
        feature_names=X_test_final.columns.tolist()
    ))

# Force plot for FN
if len(fn_idx) > 0:
    idx = fn_idx[0]
    print(f"\nForce Plot - False Negative (index {idx}):")
    display(shap.force_plot(
        shap_exp.base_values[idx] if hasattr(shap_exp.base_values, '__len__') and len(shap_exp.base_values) > 1 else shap_exp.base_values,
        shap_exp.values[idx],
        X_test_final.iloc[idx],
        feature_names=X_test_final.columns.tolist()
    ))

In [None]:
# Cell 27: Feature clustering (correlated features grouped)

model_info = shap_models[0]
shap_values_display = model_info['shap_values']

print(f"\nFeature Clustering Analysis for: {model_info['name']}")
print("="*60)

# Hierarchical clustering of features based on SHAP values
plt.figure(figsize=(12, 8))

# Create clustering plot
clustering = shap.utils.hclust(X_test_final, y_test)

shap.plots.bar(
    shap.Explanation(
        values=shap_values_display,
        base_values=model_info['explainer'].expected_value if not isinstance(model_info['explainer'].expected_value, list) 
                    else model_info['explainer'].expected_value[1],
        data=X_test_final.values,
        feature_names=X_test_final.columns.tolist()
    ),
    max_display=20,
    clustering=clustering,
    clustering_cutoff=0.5,
    show=False
)
plt.title(f'Feature Importance with Clustering - {model_info["name"]}')
plt.tight_layout()
plt.show()

## Summary and Conclusions

In [None]:
# Cell 28: Final summary

print("="*80)
print("FINAL ANALYSIS SUMMARY")
print("="*80)

print("\n1. DATA OVERVIEW:")
print(f"   - Original features: {feature_engineering_log['original_features']}")
print(f"   - Final features after engineering: {len(final_feature_cols_with_indicators)}")
print(f"   - Training samples: {len(y_train)}")
print(f"   - Test samples: {len(y_test)}")
print(f"   - Defect rate: {y_train.mean():.2%}")

print("\n2. BEST MODEL CONFIGURATION:")
best_result = test_results_df.iloc[0]
print(f"   - Model: {best_result['model']}")
print(f"   - Resampling: {best_result['resampling']}")
print(f"   - Threshold (from CV): {best_result['threshold']:.3f}")

print("\n3. TEST SET PERFORMANCE (using fixed threshold from CV):")
print(f"   - F1 Score: {best_result['f1']:.4f}")
print(f"   - PR-AUC: {best_result['pr_auc']:.4f}")
print(f"   - Recall: {best_result['recall']:.4f}")
print(f"   - Precision: {best_result['precision']:.4f}")

print("\n4. KEY INSIGHTS FROM SHAP:")
if shap_models:
    # Get top features
    mean_abs_shap = np.abs(shap_models[0]['shap_values']).mean(axis=0)
    top_features_idx = np.argsort(mean_abs_shap)[-5:][::-1]
    top_features = [X_test_final.columns[i] for i in top_features_idx]
    print(f"   Top 5 most important features:")
    for i, feat in enumerate(top_features, 1):
        print(f"   {i}. {feat}")

print("\n" + "="*80)
print("Analysis complete!")
print("="*80)