In [1]:
!pip install tensorflow tensorflow-datasets pillow



In [None]:
import os
import subprocess

try:
    import tensorflow as tf
except ModuleNotFoundError:
    subprocess.check_call(["pip", "install", "tensorflow", "tensorflow-datasets"])
    import tensorflow as tf

import tensorflow_datasets as tfds
from tensorflow.keras import layers, models
import numpy as np

os.makedirs("models", exist_ok=True)
BATCH_SIZE = 32
IMG_SIZE = 224
EPOCHS = 6   # increase later for better accuracy

def preprocess(example):
    image = tf.image.resize(example['image'], (IMG_SIZE, IMG_SIZE))
    image = tf.cast(image, tf.float32) / 255.0
    label = example['label']
    return image, label

def prepare_datasets():
    print("Downloading Food-101 (this may take a while)...")
    ds_train, ds_info = tfds.load(
        'food101',
        split='train',
        with_info=True,
        shuffle_files=True,
        as_supervised=False
    )
    ds_test = tfds.load(
        'food101',
        split='validation',
        as_supervised=False
    )  # validation set is used as test in Food-101

    num_classes = ds_info.features['label'].num_classes
    print(f"Found {num_classes} classes.")

    train = ds_train.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
    train = train.shuffle(1000).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

    val = ds_test.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
    val = val.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

    return train, val, num_classes, ds_info

def build_model(num_classes):
    base = tf.keras.applications.MobileNetV2(
        input_shape=(IMG_SIZE, IMG_SIZE, 3),
        include_top=False,
        weights='imagenet'
    )
    base.trainable = False  # freeze base for quick MVP

    inputs = tf.keras.Input(shape=(IMG_SIZE, IMG_SIZE, 3))
    x = base(inputs, training=False)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dropout(0.3)(x)
    outputs = layers.Dense(num_classes, activation='softmax')(x)

    model = tf.keras.Model(inputs, outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    return model

def main():
    train_ds, val_ds, n_classes, ds_info = prepare_datasets()
    model = build_model(n_classes)
    print(model.summary())

    # quick training
    model.fit(train_ds, validation_data=val_ds, epochs=EPOCHS)

    model_path = "models/food101_mobilenetv2.h5"
    print(f"Saving model to {model_path}")
    model.save(model_path)

    # Save label names
    label_names = ds_info.features['label'].names
    import json
    with open("models/label_names.json", "w") as f:
        json.dump(label_names, f)
    print("Saved label names to models/label_names.json")
    print("Done.")

if __name__ == "__main__":
    main()

Downloading Food-101 (this may take a while)...




Downloading and preparing dataset Unknown size (download: Unknown size, generated: Unknown size, total: Unknown size) to /root/tensorflow_datasets/food101/2.0.0...


Dl Completed...: 0 url [00:00, ? url/s]

Dl Size...: 0 MiB [00:00, ? MiB/s]

Extraction completed...: 0 file [00:00, ? file/s]

In [None]:
%pip install streamlit

In [None]:
# app.py
import streamlit as st
import numpy as np
from PIL import Image
import os
import json
import requests
import io

MODEL_PATH = "models/food101_mobilenetv2.h5"
LABELS_PATH = "models/label_names.json"
IMG_SIZE = 224

st.set_page_config(page_title="FoodSave MVP", layout="centered")

st.title("FoodSave — MVP")
st.write("Upload a food photo. The app will try to identify the food and suggest actions to reduce waste.")

# Load model
model = None
labels = None
if os.path.exists(MODEL_PATH) and os.path.exists(LABELS_PATH):
    import tensorflow as tf
    model = tf.keras.models.load_model(MODEL_PATH)
    with open(LABELS_PATH, "r") as f:
        labels = json.load(f)
else:
    st.warning("Model not found. Run `python train_model.py` first to create the model.")
    st.info("You can still upload images to see local preview and suggestions.")

uploaded_file = st.file_uploader("Choose an image", type=["jpg", "jpeg", "png"])
if uploaded_file is not None:
    image = Image.open(uploaded_file).convert("RGB")
    st.image(image, caption="Uploaded image", use_column_width=True)
    st.write("")

    if model is not None and labels is not None:
        # preprocess
        img = image.resize((IMG_SIZE, IMG_SIZE))
        x = np.array(img) / 255.0
        x = np.expand_dims(x, axis=0)

        preds = model.predict(x)[0]
        top_idx = np.argsort(preds)[::-1][:3]
        st.markdown("### Predictions (top 3):")
        for i in top_idx:
            st.write(f"- **{labels[i]}** — confidence: {preds[i]:.3f}")

        predicted = labels[top_idx[0]]

        st.markdown("### Open Food Facts lookup (by predicted label)")
        try:
            # simple search by predicted label
            query = predicted.replace(" ", "+")
            url = f"https://world.openfoodfacts.org/cgi/search.pl?search_terms={query}&search_simple=1&action=process&json=1&page_size=5"
            r = requests.get(url, timeout=10)
            data = r.json()
            if data.get("products"):
                st.write(f"Found {len(data['products'])} product(s) related to *{predicted}* (sample):")
                for p in data["products"][:3]:
                    name = p.get("product_name") or p.get("generic_name") or "Unnamed product"
                    brands = p.get("brands", "")
                    nutri = p.get("nutriments", {})
                    st.write(f"- **{name}** — {brands}")
                    if 'expiration_date' in p:
                        st.write(f"  - expiry: {p['expiration_date']}")
                    # show minimal nutrition if exists
                    if nutri:
                        energy = nutri.get("energy-kcal_100g") or nutri.get("energy_100g")
                        if energy:
                            st.write(f"  - energy (per 100g): {energy}")
            else:
                st.write("No matching product found on Open Food Facts for this predicted label.")
        except Exception as e:
            st.write("Open Food Facts lookup failed:", e)

        st.markdown("### Suggestions to reduce waste")
        st.write("- If expiry is near: cook soon or freeze.")
        st.write("- Convert to recipes that use leftover ingredients (soups, stir-fry).")
        st.write("- Share via community apps or donor platforms.")
    else:
        st.info("Model not available. While the model is missing you can still use the app UI.")
        st.markdown("### Suggestions (based on generic rules)")
        st.write("- Smell & visual check for spoilage.")
        st.write("- If fruit/veg: use within 2–3 days or pickle/preserve.")
        st.write("- If packaged: check packaging & expiry date.")

st.sidebar.header("MVP Tips")
st.sidebar.write("1. Run `python train_model.py` to create the model (uses Food-101).")
st.sidebar.write("2. For better accuracy: increase epochs and unfreeze the base model.")
st.sidebar.write("3. Collect local photos using the app (consent) to fine-tune model for your region.")

In [None]:
import pickle

# Save model as pickle
model_path_pkl = "models/food101_mobilenetv2.pkl"
with open(model_path_pkl, 'wb') as f:
    pickle.dump(model, f)
print(f"Saving model to {model_path_pkl}")

# Save label names as pickle
label_names_path_pkl = "models/label_names.pkl"
with open(label_names_path_pkl, "wb") as f:
    pickle.dump(labels, f)
print(f"Saved label names to {label_names_path_pkl}")

In [None]:
# validate_and_generate_figures_with_shap.py
"""
Validation + explainability script for the programmer retention study.
Outputs in ./validation_results/
- cv_summary.csv
- confusion_matrices.json
- policy_simulation.json
- perm_<model>.csv and perm_<model>.png
- shap_<model>_summary.png and shap_<model>_mean_abs.csv (if shap available)
"""

import os, json, warnings
warnings.filterwarnings("ignore")
import pandas as pd, numpy as np
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, confusion_matrix
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.neural_network import MLPClassifier
from sklearn.inspection import permutation_importance
import matplotlib.pyplot as plt
plt.rcParams.update({'figure.max_open_warning': 0})

# optional libraries
try:
    import xgboost as xgb; HAS_XGB=True
except: HAS_XGB=False
try:
    import lightgbm as lgb; HAS_LGB=True
except: HAS_LGB=False
try:
    import catboost as cb; HAS_CAT=True
except: HAS_CAT=False
try:
    from imblearn.over_sampling import SMOTE
    HAS_IMB=True
except: HAS_IMB=False
try:
    import shap; HAS_SHAP=True
except: HAS_SHAP=False

# -------- CONFIG --------
INPUT_CSV = "/content/dataf n.csv"       # path to your uploaded CSV
OUT_DIR = "validation_results"
FIG_DIR = os.path.join(OUT_DIR, "figures")
FOLDS = 5                       # CV folds (set to 5 or 10)
BOOT_ITERS = 300                # bootstrap iters for CI (reduce to speed up)
RANDOM_STATE = 0
# ------------------------

os.makedirs(OUT_DIR, exist_ok=True)
os.makedirs(FIG_DIR, exist_ok=True)

# load CSV
df = pd.read_csv(INPUT_CSV)
# autodetect target (adjust heuristics if needed)
target_col = None
for c in df.columns:
    low = c.lower()
    if "intend" in low and "technology" in low:
        target_col = c; break
if target_col is None:
    # fallback: any column containing 'intend' or 'seek'
    for c in df.columns:
        if "intend" in c.lower() or "seek employment" in c.lower():
            target_col = c; break
if target_col is None:
    raise ValueError("Target column not found. Columns: " + ", ".join(df.columns[:30]))

# drop obvious PII columns
drop_cols = [c for c in df.columns if any(k in c.lower() for k in ["timestamp", "email", "name", "enter your"])]
df_clean = df.drop(columns=[c for c in drop_cols if c in df.columns])

# target mapping: yes-like -> 1
y = df_clean[target_col].astype(str).str.lower().str.contains("yes|intend|would").astype(int)

# feature selection heuristic: pick common survey columns
candidates = []
for col in df_clean.columns:
    low = col.lower()
    if col == target_col:
        continue
    if any(k in low for k in ["age","gender","living","residence","income","cgpa","prior","program","family encourage","parent","hours","proficiency","interest in technology"]):
        candidates.append(col)
if len(candidates) < 6:
    # fallback: first 12 non-target columns
    candidates = [c for c in df_clean.columns if c != target_col][:12]

X = df_clean[candidates].copy()
# normalize text-like categories
for c in X.select_dtypes(include='object').columns:
    X[c] = X[c].fillna("missing").astype(str).str.strip().str.replace(r"\s+", "_", regex=True).str.lower()

cat_cols = X.select_dtypes(include='object').columns.tolist()
num_cols = X.select_dtypes(exclude='object').columns.tolist()

# preprocessor
cat_tf = Pipeline([('imp', SimpleImputer(strategy='constant', fill_value='missing')), ('onehot', OneHotEncoder(handle_unknown='ignore'))])
num_tf = Pipeline([('imp', SimpleImputer(strategy='median')), ('sc', StandardScaler())])
preprocessor = ColumnTransformer([('num', num_tf, num_cols), ('cat', cat_tf, cat_cols)], remainder='drop')

# models list
models = {
    'LogisticRegression': LogisticRegression(max_iter=1000, class_weight='balanced', solver='liblinear', random_state=RANDOM_STATE),
    'DecisionTree': DecisionTreeClassifier(random_state=RANDOM_STATE),
    'RandomForest': RandomForestClassifier(n_estimators=200, random_state=RANDOM_STATE, class_weight='balanced'),
    'SVM': SVC(probability=True, random_state=RANDOM_STATE),
    'KNN': KNeighborsClassifier(),
    'NaiveBayes': GaussianNB(),
    'MLP': MLPClassifier(max_iter=500, random_state=RANDOM_STATE)
}
if HAS_XGB: models['XGBoost'] = xgb.XGBClassifier(use_label_encoder=False, eval_metric='logloss', random_state=RANDOM_STATE)
if HAS_LGB: models['LightGBM'] = lgb.LGBMClassifier(random_state=RANDOM_STATE)
if HAS_CAT: models['CatBoost'] = cb.CatBoostClassifier(verbose=0, random_state=RANDOM_STATE)

def stratified_cv(pipe, Xdf, yser, folds=5):
    skf = StratifiedKFold(n_splits=folds, shuffle=True, random_state=RANDOM_STATE)
    accs, f1s, aucs = [], [], []
    for train_idx, test_idx in skf.split(Xdf, yser):
        Xtr, Xte = Xdf.iloc[train_idx], Xdf.iloc[test_idx]
        ytr, yte = yser.iloc[train_idx], yser.iloc[test_idx]
        # Check if the classifier requires dense input
        if isinstance(pipe.named_steps['clf'], (GaussianNB, SVC, MLPClassifier)): # Add other classifiers that need dense if necessary
             Xtr_processed = pipe.named_steps['pre'].fit_transform(Xtr).toarray()
             Xte_processed = pipe.named_steps['pre'].transform(Xte).toarray()
             pipe.named_steps['clf'].fit(Xtr_processed, ytr)
             yp = pipe.named_steps['clf'].predict(Xte_processed)
             try:
                 probs = pipe.named_steps['clf'].predict_proba(Xte_processed)[:,1]
             except:
                 probs = np.full(len(yte), np.nan)
        else:
            pipe.fit(Xtr, ytr)
            yp = pipe.predict(Xte)
            try:
                probs = pipe.predict_proba(Xte)[:,1]
            except:
                probs = np.full(len(yte), np.nan)

        accs.append(accuracy_score(yte, yp))
        f1s.append(f1_score(yte, yp, zero_division=0))
        aucs.append(roc_auc_score(yte, probs) if not np.all(np.isnan(probs)) else np.nan)

    return np.array(accs), np.array(f1s), np.array(aucs)

def bootstrap_ci(arr, iters=300):
    # Ensure arr is a numpy array
    arr = np.asarray(arr)
    arr = arr[~np.isnan(arr)]
    if len(arr) == 0:
        return (float('nan'), float('nan'))
    bs = [np.mean(np.random.choice(arr, size=len(arr), replace=True)) for _ in range(iters)]
    return (float(np.percentile(bs, 2.5)), float(np.percentile(bs, 97.5)))

summary_rows = []
# MAIN: evaluate no-resample and resample
for scenario in ['no_resample', 'resample']:
    for mname, clf in models.items():
        pipe = Pipeline([('pre', preprocessor), ('clf', clf)])
        if scenario == 'no_resample':
            accs, f1s, aucs = stratified_cv(pipe, X, y, folds=FOLDS)
        else:
            # manual resampling inside CV folds
            accs, f1s, aucs = [], [], []
            skf = StratifiedKFold(n_splits=FOLDS, shuffle=True, random_state=RANDOM_STATE)
            for train_idx, test_idx in skf.split(X, y):
                Xtr, Xte = X.iloc[train_idx].copy(), X.iloc[test_idx].copy()
                ytr, yte = y.iloc[train_idx].copy(), y.iloc[test_idx].copy()
                train = pd.concat([Xtr, ytr.rename('target')], axis=1)
                maj = train[train['target'] == train['target'].mode()[0]]
                minr = train[train['target'] != train['target'].mode()[0]]
                if len(minr) == 0:
                    # Check if the classifier requires dense input
                    if isinstance(pipe.named_steps['clf'], (GaussianNB, SVC, MLPClassifier)):
                         Xtr_processed = pipe.named_steps['pre'].fit_transform(Xtr).toarray()
                         Xte_processed = pipe.named_steps['pre'].transform(Xte).toarray()
                         pipe.named_steps['clf'].fit(Xtr_processed, ytr)
                         yp = pipe.named_steps['clf'].predict(Xte_processed)
                         try:
                            probs = pipe.named_steps['clf'].predict_proba(Xte_processed)[:,1]
                         except:
                            probs = np.full(len(yte), np.nan)
                    else:
                        pipe.fit(Xtr, ytr)
                        yp = pipe.predict(Xte)
                        try:
                           probs = pipe.predict_proba(Xte)[:,1]
                        except:
                           probs = np.full(len(yte), np.nan)

                    accs.append(accuracy_score(yte, yp)); f1s.append(f1_score(yte, yp, zero_division=0))
                    aucs.append(roc_auc_score(yte, probs) if not np.all(np.isnan(probs)) else np.nan)
                    continue
                # try SMOTE on transformed features if available
                if HAS_IMB:
                    # fit preprocessor, transform training, apply SMOTE, fit clf on transformed features
                    pre = preprocessor.fit(Xtr, ytr)
                    Xtr_t = pre.transform(Xtr)
                    try:
                        sm = SMOTE(random_state=RANDOM_STATE)
                        X_res_t, y_res = sm.fit_resample(Xtr_t, ytr)
                        # fit classifier on transformed features directly
                        clf_t = clf
                        # Check if the classifier requires dense input
                        if isinstance(clf_t, (GaussianNB, SVC, MLPClassifier)):
                            clf_t.fit(X_res_t.toarray(), y_res)
                        else:
                            clf_t.fit(X_res_t, y_res)

                        # evaluate by transforming Xte
                        Xte_t = pre.transform(Xte)
                        # Check if the classifier requires dense input
                        if isinstance(clf_t, (GaussianNB, SVC, MLPClassifier)):
                             yp = clf_t.predict(Xte_t.toarray())
                        else:
                             yp = clf_t.predict(Xte_t)

                        accs.append(accuracy_score(yte, yp)); f1s.append(f1_score(yte, yp, zero_division=0))
                        try:
                            # Check if the classifier requires dense input for predict_proba
                            if isinstance(clf_t, (GaussianNB, SVC, MLPClassifier)):
                                aucs.append(roc_auc_score(yte, clf_t.predict_proba(Xte_t.toarray())[:,1]))
                            else:
                                aucs.append(roc_auc_score(yte, clf_t.predict_proba(Xte_t)[:,1]))
                        except:
                            aucs.append(np.nan)
                        continue
                    except Exception:
                        # fallback to simple upsampling below
                        pass
                # simple upsampling fallback
                from sklearn.utils import resample
                minr_up = resample(minr, replace=True, n_samples=len(maj), random_state=RANDOM_STATE)
                train_res = pd.concat([maj, minr_up])
                y_res = train_res['target']; X_res = train_res.drop(columns=['target'])
                # Check if the classifier requires dense input
                if isinstance(pipe.named_steps['clf'], (GaussianNB, SVC, MLPClassifier)):
                     X_res_processed = pipe.named_steps['pre'].fit_transform(X_res).toarray()
                     Xte_processed = pipe.named_steps['pre'].transform(Xte).toarray()
                     pipe.named_steps['clf'].fit(X_res_processed, y_res)
                     yp = pipe.named_steps['clf'].predict(Xte_processed)
                     try:
                        probs = pipe.named_steps['clf'].predict_proba(Xte_processed)[:,1]
                     except:
                        probs = np.full(len(yte), np.nan)
                else:
                    pipe.fit(X_res, y_res)
                    yp = pipe.predict(Xte)
                    try:
                        probs = pipe.predict_proba(Xte)[:,1]
                    except:
                        probs = np.full(len(yte), np.nan)

                accs.append(accuracy_score(yte, yp)); f1s.append(f1_score(yte, yp, zero_division=0))
                aucs.append(roc_auc_score(yte, probs) if not np.all(np.isnan(probs)) else np.nan)

        # summarize
        acc_mean, acc_std = float(np.nanmean(accs)), float(np.nanstd(accs))
        f1_mean, f1_std = float(np.nanmean(f1s)), float(np.nanstd(f1s))
        auc_mean, auc_std = float(np.nanmean(aucs)), float(np.nanstd(aucs))
        acc_ci = bootstrap_ci(accs, iters=BOOT_ITERS)
        f1_ci = bootstrap_ci(f1s, iters=BOOT_ITTERS)
        auc_ci = bootstrap_ci(np.array(aucs)[~np.isnan(aucs)], iters=BOOT_ITTERS) if not np.all(np.isnan(aucs)) else (float('nan'), float('nan'))
        summary_rows.append({
            'scenario': scenario, 'model': mname,
            'accuracy_mean': acc_mean, 'accuracy_std': acc_std, 'accuracy_ci95': acc_ci,
            'f1_mean': f1_mean, 'f1_std': f1_std, 'f1_ci95': f1_ci,
            'auc_mean': auc_mean, 'auc_std': auc_std, 'auc_ci95': auc_ci
        })

summary_df = pd.DataFrame(summary_rows)
summary_df.to_csv(os.path.join(OUT_DIR, "cv_summary.csv"), index=False)

# Train-test holdout for confusion matrices + importances (use upsampled train to help minority)
X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.2, random_state=RANDOM_STATE)
confusion_dict = {}
perm_files = []
shap_files = []
for mname, clf in models.items():
    pipe = Pipeline([('pre', preprocessor), ('clf', clf)])
    # simple upsample training
    train_df = pd.concat([X_train, y_train.rename('target')], axis=1)
    maj = train_df[train_df['target'] == train_df['target'].mode()[0]]
    minr = train_df[train_df['target'] != train_df['target'].mode()[0]]
    if len(minr) > 0:
        from sklearn.utils import resample
        minr_up = resample(minr, replace=True, n_samples=len(maj), random_state=RANDOM_STATE)
        train_res_df = pd.concat([maj, minr_up]); y_res = train_res_df['target']; X_res = train_res_df.drop(columns=['target'])
        # Check if the classifier requires dense input
        if isinstance(pipe.named_steps['clf'], (GaussianNB, SVC, MLPClassifier)):
            X_res_processed = pipe.named_steps['pre'].fit_transform(X_res).toarray()
            pipe.named_steps['clf'].fit(X_res_processed, y_res)
        else:
            pipe.fit(X_res, y_res)
    else:
         # Check if the classifier requires dense input
        if isinstance(pipe.named_steps['clf'], (GaussianNB, SVC, MLPClassifier)):
            X_train_processed = pipe.named_steps['pre'].fit_transform(X_train).toarray()
            pipe.named_steps['clf'].fit(X_train_processed, y_train)
        else:
            pipe.fit(X_train, y_train)

    # Predict on test set
    # Check if the classifier requires dense input
    if isinstance(pipe.named_steps['clf'], (GaussianNB, SVC, MLPClassifier)):
        X_test_processed = pipe.named_steps['pre'].transform(X_test).toarray()
        yp = pipe.named_steps['clf'].predict(X_test_processed)
    else:
        yp = pipe.predict(X_test)

    confusion_dict[mname] = confusion_matrix(y_test, yp).tolist()

    # permutation importance (on transformed features)
    try:
        Xt = pipe.named_steps['pre'].transform(X_test)
        clf_fitted = pipe.named_steps['clf']
        # Check if the classifier or permutation_importance requires dense input
        if isinstance(clf_fitted, (GaussianNB, SVC, MLPClassifier)):
             Xt_dense = Xt.toarray()
             imp = permutation_importance(clf_fitted, Xt_dense, y_test, n_repeats=10, random_state=RANDOM_STATE)
        else:
             imp = permutation_importance(clf_fitted, Xt, y_test, n_repeats=10, random_state=RANDOM_STATE)

        try:
            feat_names = pipe.named_steps['pre'].get_feature_names_out()
        except:
            feat_names = [f"f{i}" for i in range(len(imp.importances_mean))]
        imp_df = pd.DataFrame({'feature': feat_names, 'importance_mean': imp.importances_mean, 'importance_std': imp.importances_std})
        imp_df = imp_df.sort_values('importance_mean', ascending=False).head(40)
        fname = os.path.join(OUT_DIR, f"perm_{mname}.csv")
        imp_df.to_csv(fname, index=False); perm_files.append(fname)
        # plot top 10
        top = imp_df.head(10).iloc[::-1]
        plt.figure(figsize=(6,4)); plt.barh(top['feature'], top['importance_mean']); plt.title(f"Permutation importance - {mname}")
        plt.tight_layout(); plt.savefig(os.path.join(FIG_DIR, f"perm_{mname}.png"), dpi=300); plt.close()
    except Exception:
        pass

    # SHAP (if available): use model-specific explainer where appropriate. Save mean absolute SHAP per original feature.
    if HAS_SHAP:
        try:
            # transform train and test using preprocessor
            pre_fit = pipe.named_steps['pre'].fit(X_train, y_train)
            Xtr_t = pre_fit.transform(X_train)
            Xte_t = pre_fit.transform(X_test)
            clf_fitted = pipe.named_steps['clf']
            # choose explainer
            if mname in ['XGBoost'] and HAS_XGB:
                expl = shap.TreeExplainer(clf_fitted)
            elif mname in ['RandomForest','DecisionTree'] and hasattr(clf_fitted, 'estimators_'):
                expl = shap.TreeExplainer(clf_fitted)
            else:
                # Check if the classifier requires dense input for the explainer
                if isinstance(clf_fitted, (GaussianNB, SVC, MLPClassifier)):
                     expl = shap.Explainer(clf_fitted.predict_proba if hasattr(clf_fitted, "predict_proba") else clf_fitted.predict, Xtr_t.toarray())
                else:
                     expl = shap.Explainer(clf_fitted.predict_proba if hasattr(clf_fitted, "predict_proba") else clf_fitted.predict, Xtr_t)
            # Check if the classifier requires dense input for SHAP values calculation
            if isinstance(clf_fitted, (GaussianNB, SVC, MLPClassifier)):
                 shap_vals = expl(Xte_t.toarray())
            else:
                 shap_vals = expl(Xte_t)

            # summarize mean(|SHAP|)
            # shap_vals may be array-like; convert to 2D numeric for mean abs
            try:
                arr = np.abs(shap_vals.values if hasattr(shap_vals, "values") else np.array(shap_vals))
            except:
                arr = np.abs(np.array(shap_vals))
            mean_abs = np.mean(arr, axis=0)
            try:
                feat_names = pre_fit.get_feature_names_out()
            except:
                feat_names = [f"f{i}" for i in range(len(mean_abs))]
            shap_df = pd.DataFrame({'feature': feat_names, 'mean_abs_shap': mean_abs}).sort_values('mean_abs_shap', ascending=False).head(50)
            sfile = os.path.join(OUT_DIR, f"shap_{mname}_mean_abs.csv"); shap_files.append(sfile)
            # plot SHAP summary (requires shap plotting)
            try:
                import matplotlib
                plt.figure(figsize=(6,4))
                # Check if the classifier requires dense input for shap.summary_plot
                if isinstance(clf_fitted, (GaussianNB, SVC, MLPClassifier)):
                     shap.summary_plot(shap_vals, features=X_test.to_numpy(), feature_names=feat_names, show=False, max_display=15)
                else:
                     shap.summary_plot(shap_vals, features=X_test, feature_names=feat_names, show=False, max_display=15)

                plt.tight_layout()
                plt.savefig(os.path.join(FIG_DIR, f"shap_summary_{mname}.png"), dpi=300)
                plt.close()
            except Exception:
                pass
        except Exception:
            # fail gracefully and continue
            pass

# save confusion matrices
with open(os.path.join(OUT_DIR, "confusion_matrices.json"), "w") as f:
    json.dump(confusion_dict, f, indent=2)

# POLICY SIMULATION (safe)
living_col = None
for c in X.columns:
    if "living" in c.lower() or "residence" in c.lower():
        living_col = c; break

policy = None
if living_col and 'LogisticRegression' in models:
    pipe_lr = Pipeline([('pre', preprocessor), ('clf', models['LogisticRegression'])])
    # fit on upsampled train to be consistent
    train_df = pd.concat([X_train, y_train.rename('target')], axis=1)
    maj = train_df[train_df['target'] == train_df['target'].mode()[0]]
    minr = train_df[train_df['target'] != train_df['target'].mode()[0]]
    if len(minr) > 0:
        from sklearn.utils import resample
        minr_up = resample(minr, replace=True, n_samples=len(maj), random_state=RANDOM_STATE)
        train_res_df = pd.concat([maj, minr_up]); y_res = train_res_df['target']; X_res = train_res_df.drop(columns=['target'])
        pipe_lr.fit(X_res, y_res)
    else:
        pipe_lr.fit(X_train, y_train)
    Xc = X_test.copy(); Xc[living_col] = Xc[living_col].astype(str)
    rural_mask = Xc[living_col].str.contains("rural", na=False)
    if rural_mask.sum() > 0:
        sample_size = max(1, int(np.ceil(0.1 * rural_mask.sum())))
        rural_idx = Xc[rural_mask].sample(n=sample_size, random_state=RANDOM_STATE).index
        before = float(pipe_lr.predict_proba(Xc)[:,1].mean())
        X_alt = Xc.copy(); X_alt.loc[rural_idx, living_col] = "urban"
        after = float(pipe_lr.predict_proba(X_alt)[:,1].mean())
        policy = {'living_col': living_col, 'n_rural': int(rural_mask.sum()), 'changed': int(len(rural_idx)),
                  'mean_prob_before': before, 'mean_prob_after': after, 'delta': after - before}
        with open(os.path.join(OUT_DIR, "policy_simulation.json"), "w") as f:
            json.dump(policy, f, indent=2)

# final report
report = {
    'n_rows': int(df.shape[0]), 'n_features_used': int(X.shape[1]),
    'models_evaluated': list(models.keys()),
    'has_imblearn': bool(HAS_IMB), 'has_shap': bool(HAS_SHAP),
    'cv_summary_csv': os.path.join(OUT_DIR, "cv_summary.csv"),
    'confusion_matrices': os.path.join(OUT_DIR, "confusion_matrices.json"),
    'perm_files_count': len([f for f in os.listdir(OUT_DIR) if f.startswith("perm_")]),
    'shap_files_count': len(shap_files),
    'policy_simulation': policy
}
with open(os.path.join(OUT_DIR, "run_report.json"), "w") as f:
    json.dump(report, f, indent=2)

print("Validation completed. Outputs in:", OUT_DIR)
print("Run report:", json.dumps(report, indent=2))

In [None]:
# validate_and_generate_figures_with_shap.py
"""
Validation + explainability script for programmer retention study.
Outputs to ./validation_results/

- cv_summary.csv (cross-validation results with mean, std, CI)
- confusion_matrices.json
- policy_simulation.json
- perm_<model>.csv and perm_<model>.png
- shap_<model>_summary.png and shap_<model>_mean_abs.csv (if shap available)
"""

import os, json, warnings
warnings.filterwarnings("ignore")
import pandas as pd, numpy as np
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, confusion_matrix
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.neural_network import MLPClassifier
from sklearn.inspection import permutation_importance
import matplotlib.pyplot as plt
plt.rcParams.update({'figure.max_open_warning': 0})

# optional libraries
try: import xgboost as xgb; HAS_XGB=True
except: HAS_XGB=False
try: import lightgbm as lgb; HAS_LGB=True
except: HAS_LGB=False
try: import catboost as cb; HAS_CAT=True
except: HAS_CAT=False
try:
    from imblearn.over_sampling import SMOTE
    HAS_IMB=True
except: HAS_IMB=False
try:
    import shap; HAS_SHAP=True
except: HAS_SHAP=False

# -------- CONFIG --------
INPUT_CSV = "/content/dataf n.csv"  # path to your dataset
OUT_DIR = "validation_results"
FIG_DIR = os.path.join(OUT_DIR, "figures")
FOLDS = 5
BOOT_ITERS = 300
RANDOM_STATE = 0
# ------------------------

os.makedirs(OUT_DIR, exist_ok=True)
os.makedirs(FIG_DIR, exist_ok=True)

# load
df = pd.read_csv(INPUT_CSV)
target_col = None
for c in df.columns:
    low = c.lower()
    if "intend" in low and "technology" in low:
        target_col = c; break
if target_col is None:
    for c in df.columns:
        if "intend" in c.lower() or "seek employment" in c.lower():
            target_col = c; break
if target_col is None:
    raise ValueError("Target column not found.")

drop_cols = [c for c in df.columns if any(k in c.lower() for k in ["timestamp", "email", "name", "enter your"])]
df_clean = df.drop(columns=[c for c in drop_cols if c in df.columns])

y = df_clean[target_col].astype(str).str.lower().str.contains("yes|intend|would").astype(int)

candidates = []
for col in df_clean.columns:
    if col == target_col: continue
    low = col.lower()
    if any(k in low for k in ["age","gender","living","residence","income","cgpa","prior","program","family","parent","hours","proficiency","interest"]):
        candidates.append(col)
if len(candidates) < 6:
    candidates = [c for c in df_clean.columns if c != target_col][:12]

X = df_clean[candidates].copy()
for c in X.select_dtypes(include='object').columns:
    X[c] = X[c].fillna("missing").astype(str).str.strip().str.replace(r"\s+","_",regex=True).str.lower()

cat_cols = X.select_dtypes(include='object').columns.tolist()
num_cols = X.select_dtypes(exclude='object').columns.tolist()

# --- FIX: force dense output ---
cat_tf = Pipeline([
    ('imp', SimpleImputer(strategy='constant', fill_value='missing')),
    ('onehot', OneHotEncoder(handle_unknown='ignore', sparse=False))
])
num_tf = Pipeline([
    ('imp', SimpleImputer(strategy='median')),
    ('sc', StandardScaler())
])
preprocessor = ColumnTransformer([
    ('num', num_tf, num_cols),
    ('cat', cat_tf, cat_cols)
], remainder='drop')

# models
models = {
    'LogisticRegression': LogisticRegression(max_iter=1000, class_weight='balanced', solver='liblinear', random_state=RANDOM_STATE),
    'DecisionTree': DecisionTreeClassifier(random_state=RANDOM_STATE),
    'RandomForest': RandomForestClassifier(n_estimators=200, random_state=RANDOM_STATE, class_weight='balanced'),
    'SVM': SVC(probability=True, random_state=RANDOM_STATE),
    'KNN': KNeighborsClassifier(),
    'NaiveBayes': GaussianNB(),
    'MLP': MLPClassifier(max_iter=500, random_state=RANDOM_STATE)
}
if HAS_XGB: models['XGBoost'] = xgb.XGBClassifier(use_label_encoder=False, eval_metric='logloss', random_state=RANDOM_STATE)
if HAS_LGB: models['LightGBM'] = lgb.LGBMClassifier(random_state=RANDOM_STATE)
if HAS_CAT: models['CatBoost'] = cb.CatBoostClassifier(verbose=0, random_state=RANDOM_STATE)

def stratified_cv(pipe, Xdf, yser, folds=5):
    skf = StratifiedKFold(n_splits=folds, shuffle=True, random_state=RANDOM_STATE)
    accs, f1s, aucs = [], [], []
    for tr, te in skf.split(Xdf, yser):
        Xtr, Xte = Xdf.iloc[tr], Xdf.iloc[te]
        ytr, yte = yser.iloc[tr], yser.iloc[te]
        pipe.fit(Xtr, ytr)
        yp = pipe.predict(Xte)
        accs.append(accuracy_score(yte, yp))
        f1s.append(f1_score(yte, yp, zero_division=0))
        try:
            probs = pipe.predict_proba(Xte)[:,1]
            aucs.append(roc_auc_score(yte, probs))
        except:
            aucs.append(np.nan)
    return np.array(accs), np.array(f1s), np.array(aucs)

def bootstrap_ci(arr, iters=300):
    arr = np.array(arr[~np.isnan(arr)])
    if len(arr) == 0: return (float('nan'), float('nan'))
    bs = [np.mean(np.random.choice(arr, size=len(arr), replace=True)) for _ in range(iters)]
    return (float(np.percentile(bs, 2.5)), float(np.percentile(bs, 97.5)))

# --- CV evaluation ---
summary_rows = []
for scenario in ['no_resample','resample']:
    for mname, clf in models.items():
        pipe = Pipeline([('pre', preprocessor), ('clf', clf)])
        if scenario == 'no_resample':
            accs, f1s, aucs = stratified_cv(pipe, X, y, folds=FOLDS)
        else:
            # fallback simple upsampling
            accs,f1s,aucs=[],[],[]
            skf = StratifiedKFold(n_splits=FOLDS, shuffle=True, random_state=RANDOM_STATE)
            for tr,te in skf.split(X,y):
                Xtr,Xte = X.iloc[tr].copy(), X.iloc[te].copy()
                ytr,yte = y.iloc[tr].copy(), y.iloc[te].copy()
                train = pd.concat([Xtr,ytr.rename('target')],axis=1)
                maj = train[train['target']==train['target'].mode()[0]]
                minr = train[train['target']!=train['target'].mode()[0]]
                if len(minr)==0: continue
                from sklearn.utils import resample
                minr_up=resample(minr,replace=True,n_samples=len(maj),random_state=RANDOM_STATE)
                train_res=pd.concat([maj,minr_up])
                y_res=train_res['target']; X_res=train_res.drop(columns=['target'])
                pipe.fit(X_res,y_res)
                yp=pipe.predict(Xte)
                accs.append(accuracy_score(yte,yp)); f1s.append(f1_score(yte,yp,zero_division=0))
                try: aucs.append(roc_auc_score(yte,pipe.predict_proba(Xte)[:,1]))
                except: aucs.append(np.nan)
        summary_rows.append({
            'scenario':scenario,'model':mname,
            'acc_mean':float(np.nanmean(accs)),'f1_mean':float(np.nanmean(f1s)),'auc_mean':float(np.nanmean(aucs)),
            'acc_ci95':bootstrap_ci(accs,BOOT_ITERS),'f1_ci95':bootstrap_ci(f1s,BOOT_ITERS),
            'auc_ci95':bootstrap_ci(aucs,BOOT_ITERS) if not np.all(np.isnan(aucs)) else (np.nan,np.nan)
        })

pd.DataFrame(summary_rows).to_csv(os.path.join(OUT_DIR,"cv_summary.csv"),index=False)

# --- Confusion matrices ---
Xtr,Xte,ytr,yte=train_test_split(X,y,stratify=y,test_size=0.2,random_state=RANDOM_STATE)
cms={}
for mname,clf in models.items():
    pipe=Pipeline([('pre',preprocessor),('clf',clf)]); pipe.fit(Xtr,ytr)
    yp=pipe.predict(Xte); cms[mname]=confusion_matrix(yte,yp).tolist()
with open(os.path.join(OUT_DIR,"confusion_matrices.json"),"w") as f: json.dump(cms,f,indent=2)

# --- Policy simulation ---
living_col=None
for c in X.columns:
    if "living" in c.lower() or "residence" in c.lower(): living_col=c; break
policy=None
if living_col and 'LogisticRegression' in models:
    pipe_lr=Pipeline([('pre',preprocessor),('clf',models['LogisticRegression'])]); pipe_lr.fit(Xtr,ytr)
    Xc=Xte.copy(); Xc[living_col]=Xc[living_col].astype(str)
    rural_mask=Xc[living_col].str.contains("rural",na=False)
    if rural_mask.sum()>0:
        idx=Xc[rural_mask].sample(frac=0.1,random_state=RANDOM_STATE).index
        before=float(pipe_lr.predict_proba(Xc)[:,1].mean())
        Xalt=Xc.copy(); Xalt.loc[idx,living_col]="urban"
        after=float(pipe_lr.predict_proba(Xalt)[:,1].mean())
        policy={'living_col':living_col,'n_rural':int(rural_mask.sum()),'changed':len(idx),'before':before,'after':after,'delta':after-before}
        with open(os.path.join(OUT_DIR,"policy_simulation.json"),"w") as f: json.dump(policy,f,indent=2)

print("Done. Results in:",OUT_DIR)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import json

# Load your confusion matrices
with open("validation_results/confusion_matrices.json", "r") as f:
    cms = json.load(f)

# Class labels
labels = ["Not Retained", "Retained"]

# Export each confusion matrix
for model, cm in cms.items():
    cm = np.array(cm)
    fig, ax = plt.subplots(figsize=(3, 3))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
                xticklabels=labels, yticklabels=labels, cbar=False, ax=ax)

    ax.set_xlabel("Predicted Label")
    ax.set_ylabel("True Label")
    ax.set_title(model)

    # Clean file name (lowercase, no spaces)
    fname = "confusion_" + model.lower().replace(" ", "") + ".png"
    plt.tight_layout()
    plt.savefig(f"validation_results/figures/{fname}", dpi=300)
    plt.close()