# ML Report: Dropout Prediction and Evidence Fusion

This notebook reproduces key figures and tables for the dissertation:
- RF: ROC/PR with F1-optimal threshold, confusion matrix, metrics table
- DS: Belief–plausibility scatter, risk-tier histogram vs true labels, interval coverage, specificity–recall vs threshold
- Feature importance (RF)
- Ablation: RF w/ vs w/o anomaly features
- Decision analysis: cost-sensitive sweep / net benefit

Outputs will be saved under `reports/figures/` and `reports/tables/`.


In [1]:
# Setup
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.ensemble import IsolationForest, RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    confusion_matrix, roc_curve, auc, precision_recall_curve, roc_auc_score
)

from imblearn.over_sampling import SMOTE

# Paths
FIG_DIR = 'reports/figures'
TAB_DIR = 'reports/tables'
os.makedirs(FIG_DIR, exist_ok=True)
os.makedirs(TAB_DIR, exist_ok=True)

np.random.seed(42)
sns.set_context('talk')



ModuleNotFoundError: No module named 'imblearn'

In [None]:
# Data: load synthetic from existing CSV if present, else generate
CSV_PATH = 'uploads/student_data.csv'

# Optional: prefer dataset with label `dropout`
if os.path.exists(CSV_PATH):
    df_raw = pd.read_csv(CSV_PATH)
else:
    # Fallback: generate synthetic consistent with the original notebook
    def generate_student_data(n_samples=1000, dropout_rate=0.3):
        student_ids = [f'S{i:04d}' for i in range(1, n_samples + 1)]
        gpa = np.clip(np.random.normal(3.0, 0.7, n_samples), 0, 4)
        attendance = np.clip(np.random.normal(85, 10, n_samples), 50, 100)
        semester = np.random.randint(1, 9, n_samples)
        prev_gpa = np.clip(gpa + np.random.normal(0, 0.3, n_samples), 0, 4)
        failed_courses = np.random.poisson(1, n_samples)
        feedback_engagement = np.clip(np.random.normal(70, 20, n_samples), 0, 100)
        late_assignments = np.clip(np.random.normal(20, 15, n_samples), 0, 100)
        forum_participation = np.random.poisson(3, n_samples)
        meeting_attendance = np.clip(np.random.normal(80, 15, n_samples), 0, 100)
        study_group = np.random.choice([0, 1, 2, 3], n_samples, p=[0.4, 0.3, 0.2, 0.1])
        days_active = np.clip(np.random.normal(5, 1, n_samples), 0, 7)
        clicks_per_week = np.clip(np.random.negative_binomial(10, 0.5, n_samples), 0, 500)
        assessments_submitted = np.random.poisson(5, n_samples)
        previous_attempts = np.random.poisson(0.7, n_samples)
        studied_credits = np.random.randint(10, 40, n_samples)
        risk_score = (
            (gpa < 2.5).astype(float) * 0.4 +
            (attendance < 70).astype(float) * 0.3 +
            (failed_courses > 2).astype(float) * 0.2 +
            (feedback_engagement < 50).astype(float) * 0.1
        )
        risk_score += np.random.normal(0, 0.1, n_samples)
        dropout = (risk_score > 0.5).astype(int)
        current_rate = dropout.mean()
        if current_rate != dropout_rate:
            n_to_change = int(abs(current_rate - dropout_rate) * n_samples)
            if current_rate < dropout_rate:
                idx = np.where(dropout == 0)[0]
                change_idx = np.random.choice(idx, n_to_change, replace=False)
                dropout[change_idx] = 1
            else:
                idx = np.where(dropout == 1)[0]
                change_idx = np.random.choice(idx, n_to_change, replace=False)
                dropout[change_idx] = 0
        return pd.DataFrame({
            'student_id': student_ids,
            'gpa': gpa,
            'attendance': attendance,
            'semester': semester,
            'prev_gpa': prev_gpa,
            'failed_courses': failed_courses,
            'feedback_engagement': feedback_engagement,
            'late_assignments': late_assignments,
            'forum_participation': forum_participation,
            'meeting_attendance': meeting_attendance,
            'study_group': study_group,
            'days_active': days_active,
            'clicks_per_week': clicks_per_week,
            'assessments_submitted': assessments_submitted,
            'previous_attempts': previous_attempts,
            'studied_credits': studied_credits,
            'dropout': dropout
        })
    df_raw = generate_student_data(1000, 0.3)

# Keep only columns used downstream
cols = [c for c in df_raw.columns if c != 'student_id']
df = df_raw[cols].copy()

# Split
X = df.drop(columns=['dropout'])
y = df['dropout']
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp)

print(X_train.shape, X_val.shape, X_test.shape)



In [None]:
# Train Isolation Forest and compute anomaly features
ANOMALY_FEATURES = ['clicks_per_week', 'days_active', 'previous_attempts', 'studied_credits', 'assessments_submitted']
ANOMALY_FEATURES = [f for f in ANOMALY_FEATURES if f in X_train.columns]

iso = IsolationForest(n_estimators=100, contamination=0.1, random_state=42, n_jobs=-1)
iso.fit(X_train[ANOMALY_FEATURES])

def add_anomaly_features(X, model, features):
    raw = -model.decision_function(X[features])
    scores = (raw - raw.min()) / (raw.max() - raw.min() + 1e-12)
    flags = (model.predict(X[features]) == -1).astype(int)
    X_enh = X.copy()
    X_enh['anomaly_score'] = scores
    X_enh['is_anomaly'] = flags
    if 'gpa' in X_enh.columns:
        X_enh['anomaly_gpa_interaction'] = X_enh['anomaly_score'] * X_enh['gpa']
    if 'attendance' in X_enh.columns:
        X_enh['anomaly_attendance_interaction'] = X_enh['anomaly_score'] * X_enh['attendance']
    return X_enh, scores, flags

X_train_enh, train_anom_scores, train_is_anom = add_anomaly_features(X_train, iso, ANOMALY_FEATURES)
X_val_enh, val_anom_scores, val_is_anom = add_anomaly_features(X_val, iso, ANOMALY_FEATURES)
X_test_enh, test_anom_scores, test_is_anom = add_anomaly_features(X_test, iso, ANOMALY_FEATURES)

print(X_train_enh.shape, X_val_enh.shape, X_test_enh.shape)



In [None]:
# Train Random Forest with SMOTE and find F1-optimal threshold
smote = SMOTE(random_state=42, sampling_strategy=0.6)
X_res, y_res = smote.fit_resample(X_train_enh, y_train)

rf = RandomForestClassifier(
    n_estimators=200,
    max_depth=10,
    min_samples_split=5,
    min_samples_leaf=3,
    max_features='sqrt',
    bootstrap=True,
    class_weight={0: 1, 1: 10},
    random_state=42,
    n_jobs=-1
)
rf.fit(X_res, y_res)

# Validation probabilities and threshold
val_probs = rf.predict_proba(X_val_enh)[:, 1]
prec, rec, thr = precision_recall_curve(y_val, val_probs)
f1s = 2 * (prec * rec) / (prec + rec + 1e-12)
opt_idx = np.nanargmax(f1s[:-1])  # last precision point has no threshold
opt_thr = thr[opt_idx]
print('Optimal threshold (F1 on val):', round(float(opt_thr), 3))



In [None]:
# RF: Curves, confusion matrix, metrics table
from sklearn.metrics import PrecisionRecallDisplay, RocCurveDisplay

def rf_reports(model, Xv, yv, thr, name='Validation'):
    probs = model.predict_proba(Xv)[:, 1]
    preds = (probs >= thr).astype(int)
    cm = confusion_matrix(yv, preds)
    tn, fp, fn, tp = cm.ravel()
    metrics = {
        'dataset': name,
        'threshold': float(thr),
        'accuracy': accuracy_score(yv, preds),
        'precision': precision_score(yv, preds, zero_division=0),
        'recall': recall_score(yv, preds, zero_division=0),
        'f1': f1_score(yv, preds, zero_division=0),
        'specificity': tn / (tn + fp + 1e-12),
        'roc_auc': roc_auc_score(yv, probs)
    }
    # Save metrics
    mdf = pd.DataFrame([metrics])
    mdf.to_csv(f"{TAB_DIR}/rf_metrics_{name.lower()}.csv", index=False)

    # Confusion matrix plot
    plt.figure(figsize=(5,4))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=['No Dropout','Dropout'],
                yticklabels=['No Dropout','Dropout'])
    plt.title(f'RF Confusion Matrix - {name} (thr={thr:.2f})')
    plt.xlabel('Predicted'); plt.ylabel('Actual')
    plt.tight_layout()
    plt.savefig(f"{FIG_DIR}/rf_confmat_{name.lower()}.png", dpi=200)
    plt.show()

    # ROC curve
    RocCurveDisplay.from_predictions(yv, probs)
    plt.title(f'RF ROC - {name} (AUC={metrics["roc_auc"]:.2f})')
    plt.tight_layout()
    plt.savefig(f"{FIG_DIR}/rf_roc_{name.lower()}.png", dpi=200)
    plt.show()

    # PR curve with F1-optimal marker
    precision, recall, thresholds = precision_recall_curve(yv, probs)
    f1s = 2 * (precision * recall) / (precision + recall + 1e-12)
    best_idx = np.nanargmax(f1s[:-1])

    plt.figure(figsize=(5,4))
    plt.plot(recall, precision, label='PR')
    plt.scatter(recall[best_idx], precision[best_idx], c='red', label='F1-optimal')
    plt.xlabel('Recall'); plt.ylabel('Precision')
    plt.title(f'RF PR Curve - {name}')
    plt.legend(); plt.tight_layout()
    plt.savefig(f"{FIG_DIR}/rf_pr_{name.lower()}.png", dpi=200)
    plt.show()

    return metrics

val_metrics = rf_reports(rf, X_val_enh, y_val, opt_thr, name='Validation')
test_metrics = rf_reports(rf, X_test_enh, y_test, opt_thr, name='Test')



In [None]:
# Dempster–Shafer: combiner, results, and threshold sweep
class DempsterShaferCombination:
    def __init__(self, classes=("non-dropout","dropout")):
        self.classes = classes
        self.frame = [set(), {classes[0]}, {classes[1]}, set(classes)]

    def _convert_proba_to_mass(self, proba, uncertainty=0.2):
        p = float(np.clip(proba, 1e-4, 1-1e-4))
        m = {
            frozenset(): 0.0,
            frozenset({self.classes[0]}): (1 - p) * (1 - uncertainty),
            frozenset({self.classes[1]}): p * (1 - uncertainty),
            frozenset(self.classes): uncertainty
        }
        return m

    def _combine_masses(self, m1, m2):
        out = {}
        k = 0.0
        for a, va in m1.items():
            for b, vb in m2.items():
                inter = frozenset(set(a).intersection(set(b)))
                prod = va * vb
                if len(inter) == 0:
                    k += prod
                else:
                    out[inter] = out.get(inter, 0.0) + prod
        if k < 1.0:
            for key in out:
                out[key] /= (1 - k)
        for s in [frozenset(), frozenset({self.classes[0]}), frozenset({self.classes[1]}), frozenset(self.classes)]:
            out.setdefault(s, 0.0)
        return out

    def combine(self, anomaly_score, clf_proba, expert_score=None):
        m_anom = self._convert_proba_to_mass(anomaly_score, uncertainty=0.25)
        m_clf  = self._convert_proba_to_mass(clf_proba,       uncertainty=0.15)
        m = self._combine_masses(m_anom, m_clf)
        if expert_score is not None:
            m_exp = self._convert_proba_to_mass(expert_score, uncertainty=0.20)
            m = self._combine_masses(m, m_exp)
        drop = frozenset({self.classes[1]})
        belief = m[drop]
        plaus = belief + m[frozenset(self.classes)]
        return belief, plaus, plaus - belief

# Expert heuristic (optional)
def expert_rule(df):
    g = (df['gpa'] < 2.0).astype(float) * 0.5 if 'gpa' in df else 0.0
    a = (df['attendance'] < 65).astype(float) * 0.3 if 'attendance' in df else 0.0
    f = (df['failed_courses'] > 3).astype(float) * 0.2 if 'failed_courses' in df else 0.0
    if isinstance(g, (int, float)):
        return np.zeros(len(df)) + (g + a + f)
    return g + a + f

# Build DS results on test set
rf_test_probs = rf.predict_proba(X_test_enh)[:, 1]
ds = DempsterShaferCombination()
exp = expert_rule(X_test_enh)

belief, plaus, uncert = [], [], []
for i in range(len(X_test_enh)):
    b, p, u = ds.combine(test_anom_scores[i], rf_test_probs[i], exp[i])
    belief.append(b); plaus.append(p); uncert.append(u)

belief = np.array(belief); plaus = np.array(plaus); uncert = np.array(uncert)

# Belief–plausibility scatter
plt.figure(figsize=(6,5))
sc = plt.scatter(belief, plaus, c=uncert, cmap='YlOrRd', alpha=0.7)
plt.colorbar(sc, label='Uncertainty')
plt.plot([0,1],[0,1],'k--',alpha=0.4)
plt.axvline(0.5, ls=':', c='gray'); plt.axhline(0.5, ls=':', c='gray')
plt.xlabel('Belief (Dropout)'); plt.ylabel('Plausibility (Dropout)')
plt.title('Belief–Plausibility (Test)')
plt.tight_layout(); plt.savefig(f"{FIG_DIR}/ds_belief_plaus_test.png", dpi=200); plt.show()

# Risk tiers by belief
risk = pd.cut(belief, bins=[-1,0.3,0.5,0.7,1.0], labels=['Low Risk','Moderate Risk','High Risk','Very High Risk'])
plt.figure(figsize=(6,4))
sns.countplot(x=risk, hue=y_test, palette='Set2')
plt.title('Risk Tiers vs True Labels (Test)')
plt.xlabel('Risk Tier'); plt.ylabel('Count')
plt.tight_layout(); plt.savefig(f"{FIG_DIR}/ds_tiers_vs_labels_test.png", dpi=200); plt.show()

# Interval coverage
covered = ((y_test.values==1) & (plaus>0.5)) | ((y_test.values==0) & (belief<0.5))
coverage = covered.mean(); avg_unc = float(uncert.mean())
pd.DataFrame([{'dataset':'Test','interval_coverage':coverage,'avg_uncertainty':avg_unc}]).to_csv(f"{TAB_DIR}/ds_interval_coverage_test.csv", index=False)
print('Interval coverage:', round(coverage,3), '| Avg uncertainty:', round(avg_unc,3))

# Specificity–Recall vs threshold for belief
ths = np.linspace(0.1,0.9,17)
rows = []
for t in ths:
    yhat = (belief >= t).astype(int)
    tn, fp, fn, tp = confusion_matrix(y_test, yhat).ravel()
    spec = tn/(tn+fp+1e-12); rec = tp/(tp+fn+1e-12)
    rows.append({'thr':float(t),'specificity':spec,'recall':rec})
cur = pd.DataFrame(rows)
cur.to_csv(f"{TAB_DIR}/ds_spec_recall_sweep_test.csv", index=False)
plt.figure(figsize=(6,4))
plt.plot(cur['recall'], cur['specificity'], marker='o')
plt.xlabel('Recall'); plt.ylabel('Specificity')
plt.title('DS Specificity–Recall Trade-off (Test)')
plt.grid(alpha=0.3); plt.tight_layout(); plt.savefig(f"{FIG_DIR}/ds_spec_recall_tradeoff_test.png", dpi=200); plt.show()



In [None]:
# Feature importance (top 10)
import numpy as np

importances = rf.feature_importances_
idx = np.argsort(importances)[::-1][:10]
plt.figure(figsize=(8,4))
plt.bar(range(len(idx)), importances[idx])
plt.xticks(range(len(idx)), [X_train_enh.columns[i] for i in idx], rotation=45, ha='right')
plt.title('Random Forest Feature Importance (Top 10)')
plt.tight_layout(); plt.savefig(f"{FIG_DIR}/rf_feature_importance_top10.png", dpi=200); plt.show()

fi_table = pd.DataFrame({'feature': X_train_enh.columns, 'importance': importances}).sort_values('importance', ascending=False)
fi_table.to_csv(f"{TAB_DIR}/rf_feature_importance.csv", index=False)



In [None]:
# Ablation: RF w/o anomaly-derived features vs with
BASE_FEATURES = [c for c in X_train.columns]

# Train baseline RF (no anomaly features)
X_res_b, y_res_b = smote.fit_resample(X_train[BASE_FEATURES], y_train)
rf_base = RandomForestClassifier(n_estimators=200, random_state=42, n_jobs=-1, class_weight={0:1,1:10})
rf_base.fit(X_res_b, y_res_b)

# Evaluate on validation
val_probs_b = rf_base.predict_proba(X_val[BASE_FEATURES])[:,1]
prec_b, rec_b, thr_b = precision_recall_curve(y_val, val_probs_b)
f1s_b = 2*(prec_b*rec_b)/(prec_b+rec_b+1e-12)
opt_b = thr_b[np.nanargmax(f1s_b[:-1])]

# Build a comparison table
def eval_at(model, Xv, yv, thr):
    p = model.predict_proba(Xv)[:,1]
    yhat = (p>=thr).astype(int)
    tn, fp, fn, tp = confusion_matrix(yv, yhat).ravel()
    return {
        'accuracy': accuracy_score(yv, yhat),
        'precision': precision_score(yv, yhat, zero_division=0),
        'recall': recall_score(yv, yhat, zero_division=0),
        'f1': f1_score(yv, yhat, zero_division=0),
        'specificity': tn/(tn+fp+1e-12),
        'roc_auc': roc_auc_score(yv, p),
        'threshold': float(thr)
    }

ablation = pd.DataFrame({
    'model': ['RF_base','RF_enhanced'],
    'dataset': ['Validation','Validation'],
    **pd.DataFrame([
        eval_at(rf_base, X_val[BASE_FEATURES], y_val, opt_b),
        eval_at(rf, X_val_enh, y_val, opt_thr)
    ]).to_dict(orient='list')
})

ablation.to_csv(f"{TAB_DIR}/ablation_validation.csv", index=False)
print(ablation)

# Plot PR curves overlay
plt.figure(figsize=(6,4))
plt.plot(rec_b, prec_b, label='RF base (no anomaly)')
prec_e, rec_e, _ = precision_recall_curve(y_val, rf.predict_proba(X_val_enh)[:,1])
plt.plot(rec_e, prec_e, label='RF enhanced (with anomaly)')
plt.xlabel('Recall'); plt.ylabel('Precision'); plt.title('PR curves (Validation)')
plt.legend(); plt.tight_layout(); plt.savefig(f"{FIG_DIR}/ablation_pr_validation.png", dpi=200); plt.show()



In [None]:
# Decision analysis: cost-sensitive sweep (FN cost vs FP cost)
# Define cost = C_fn*FN + C_fp*FP per threshold; plot net benefit-like surface

C_fn_values = [1, 2, 5, 10]
C_fp = 1.0
probs = rf.predict_proba(X_test_enh)[:,1]
ths = np.linspace(0.05, 0.95, 19)
rows = []
for cfn in C_fn_values:
    for t in ths:
        yhat = (probs >= t).astype(int)
        tn, fp, fn, tp = confusion_matrix(y_test, yhat).ravel()
        cost = cfn*fn + C_fp*fp
        rows.append({'C_fn':cfn,'thr':float(t),'FN':int(fn),'FP':int(fp),'TP':int(tp),'TN':int(tn),'cost':float(cost)})

cost_df = pd.DataFrame(rows)
cost_df.to_csv(f"{TAB_DIR}/decision_cost_sweep_test.csv", index=False)

plt.figure(figsize=(7,5))
for cfn in C_fn_values:
    sub = cost_df[cost_df['C_fn']==cfn]
    plt.plot(sub['thr'], sub['cost'], label=f'C_fn={cfn}')
plt.xlabel('Threshold'); plt.ylabel('Total Cost (C_fn*FN + C_fp*FP)')
plt.title('Decision Cost vs Threshold (Test)')
plt.legend(); plt.tight_layout(); plt.savefig(f"{FIG_DIR}/decision_cost_vs_threshold.png", dpi=200); plt.show()

