In [None]:
"""
Synthetic Data Augmentation, Student NN & Rule Extraction
========================================================

Workflow:

1. **Teacher** Random Forest on the original train split.
2. **SMOTE** synthetic generation up to `TARGET_TOTAL` samples.
3. **Self‑label** synthetic data with the teacher and keep only high-confidence
   points (≥ `CONF_THRESH`).
4. Merge original + synthetic data → train **MLP student**.
5. Evaluate the student on the untouched test split.
6. **DEXiRE** rule extraction from the trained student for symbolic
   interpretability, with a prettified mapping of feature indices to names.
"""

In [None]:
# ----------------------------------------------------------------------
# Parameters & reproducibility
# ----------------------------------------------------------------------
from __future__ import annotations

import os
import random
from pathlib import Path
import re

import numpy as np
import pandas as pd
from imblearn.over_sampling import SMOTE
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from tensorflow.keras import callbacks, layers, models, optimizers
from tensorflow.keras.utils import to_categorical

from dexire.dexire import DEXiRE  # pip install dexire
import os
from pathlib import Path
import warnings

SEED           = 42
CONF_THRESH    = 0.90       # teacher confidence threshold
TEST_SIZE      = 0.20
TARGET_TOTAL   = 10_000_000  # reduce to 2M for demo; adjust as needed
DATA_PATH      = Path(os.getenv(
    "DATASET_PATH",
    "../../data/processed_data/case_3.csv",
))
random.seed(SEED)
np.random.seed(SEED)
os.environ["PYTHONHASHSEED"] = str(SEED)

In [None]:
# ----------------------------------------------------------------------
# 1⃣  Load dataset & split
# ----------------------------------------------------------------------
if not DATA_PATH.exists():
    raise FileNotFoundError(f"Dataset not found → {DATA_PATH.resolve()}")

print("[INFO] Loading dataset …")
df = pd.read_csv(DATA_PATH)
X = df.drop(columns=["class"]).values
feature_names = df.drop(columns=["class"]).columns  # keep for DEXiRE

y = df["class"].values
le = LabelEncoder()
y_enc = le.fit_transform(y)

X_train, X_test, y_train, y_test = train_test_split(
    X, y_enc, test_size=TEST_SIZE, stratify=y_enc, random_state=SEED
)
print(f"[INFO] Original train distribution: {np.bincount(y_train)}\n")

In [None]:
# ----------------------------------------------------------------------
# 2⃣  Train teacher Random Forest
# ----------------------------------------------------------------------
print("[INFO] Training RandomForest teacher …")
rf = RandomForestClassifier(
    n_estimators=800,
    class_weight="balanced",
    n_jobs=-1,
    random_state=SEED,
)
rf.fit(X_train, y_train)
print("[INFO] Teacher trained.\n")

In [None]:
# ----------------------------------------------------------------------
# 3⃣  SMOTE synthetic generation & filtering
# ----------------------------------------------------------------------
classes           = np.unique(y_train)
per_class_target  = TARGET_TOTAL // len(classes)
smote_strategy    = {cls: per_class_target for cls in classes}
print(f"[INFO] SMOTE target per class: {smote_strategy}")

smote  = SMOTE(sampling_strategy=smote_strategy, random_state=SEED)
print("[INFO] Generating synthetic samples …")
X_syn_full, _ = smote.fit_resample(X_train, y_train)
synthetic_count = X_syn_full.shape[0] - X_train.shape[0]
X_syn = X_syn_full[-synthetic_count:]

print("[INFO] Teacher self‑labelling & confidence filter …")
proba_syn = rf.predict_proba(X_syn)
conf_max  = proba_syn.max(axis=1)
y_syn_lbl = proba_syn.argmax(axis=1)

mask_conf   = conf_max >= CONF_THRESH
X_syn_filt  = X_syn[mask_conf]
y_syn_filt  = y_syn_lbl[mask_conf]
print(f"[INFO] High‑confidence synthetic kept: {len(y_syn_filt)} / {synthetic_count}\n")

In [None]:
# ----------------------------------------------------------------------
# 4⃣  Merge & shuffle
# ----------------------------------------------------------------------
X_comb = np.vstack([X_train, X_syn_filt])
y_comb = np.concatenate([y_train, y_syn_filt])
perm    = np.random.permutation(len(y_comb))
X_comb, y_comb = X_comb[perm], y_comb[perm]
print(f"[INFO] Final train size: {X_comb.shape}, distribution: {np.bincount(y_comb)}\n")


In [None]:
# ----------------------------------------------------------------------
# 5⃣  Scale & one‑hot labels
# ----------------------------------------------------------------------
scaler = StandardScaler()
X_comb_s = scaler.fit_transform(X_comb)
X_test_s = scaler.transform(X_test)

y_comb_o = to_categorical(y_comb)
y_test_o = to_categorical(y_test)

In [None]:
# ----------------------------------------------------------------------
# 6⃣  Build + train student MLP
# ----------------------------------------------------------------------
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping

# Assumes X_train, y_train, X_val, y_val, X_test, y_test are already prepared
# and class_weight is defined if you want to balance classes

# 1. Costruzione del modello



def build_student(input_dim: int, output_dim: int):
    model = Sequential([
        Dense(128, activation='relu', input_shape=(input_dim,)),
        # Dropout(0.3),
        Dense(64, activation='relu'),
        # Dropout(0.3),
        Dense(32, activation='relu'),
        # Dropout(0.3),
        Dense(output_dim, activation='sigmoid')
    ])
    model.compile(optimizer=optimizers.Adam(1e-3), loss="categorical_crossentropy", metrics=["accuracy"])
    return model

print("[INFO] Training student MLP …")
student = build_student(X_comb_s.shape[1], y_comb_o.shape[1])
cb_es = callbacks.EarlyStopping(monitor="val_loss", patience=10, restore_best_weights=True)
cb_lr = callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=5)
student.fit(
    X_comb_s,
    y_comb_o,
    validation_split=0.2,
    epochs=30,
    batch_size=1024,
    callbacks=[cb_es, cb_lr],
    verbose=1,
)

In [None]:
# ----------------------------------------------------------------------
# 7⃣  Evaluation
# ----------------------------------------------------------------------
print("[INFO] Evaluating student …")
loss, acc = student.evaluate(X_test_s, y_test_o, verbose=0)
print(f"[RESULT] TEST – Loss: {loss:.4f} | Accuracy: {acc:.4f}\n")

y_pred = np.argmax(student.predict(X_test_s, verbose=0), axis=1)
print(classification_report(y_test, y_pred, target_names=le.classes_))
print(confusion_matrix(y_test, y_pred))

In [None]:
_ = student.predict(X_comb_s[:1])  # oppure: student(X_comb_s[:1], training=False)

# ----------------------------------------------------------------------
# 8⃣  Symbolic Rule Extraction with DEXiRE
# ----------------------------------------------------------------------
print("[INFO] Extracting rules with DEXiRE … (this may take a while)")

# Build DataFrame of scaled features for DEXiRE
features_df = pd.DataFrame(X_comb_s, columns=feature_names)

dexire = DEXiRE(model=student, class_names=le.classes_.tolist())
rules_raw = dexire.extract_rules(features_df, y_comb)

# Save raw rules
rules_path = Path("rules_student.txt")
rules_path.write_text(str(rules_raw), encoding="utf-8")
print(f"[INFO] Raw rules saved → {rules_path}")

# Prettify placeholders (X_i) with real feature names
idx2name = {i: name for i, name in enumerate(feature_names)}

def repl(match):
    return idx2name.get(int(match.group(1)), match.group(0))

rules_pretty = re.sub(r"X_(\d+)", repl, str(rules_raw))
pretty_path = Path("rules_student_pretty.txt")
pretty_path.write_text(rules_pretty, encoding="utf-8")
print(f"[INFO] Pretty rules saved → {pretty_path}")