In [1]:
import pandas as pd
import numpy as np
import onnx
import onnxruntime as ort
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.compose import ColumnTransformer
from sklearn.metrics import accuracy_score, roc_auc_score, classification_report, confusion_matrix
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType


In [2]:
# ==========================================
# CONFIGURATION
# ==========================================
DATA_PATH = "../data/synth_data_for_training.csv"
MODEL_1_PATH = "model_1.onnx"  # Good Model
MODEL_2_PATH = "model_2.onnx"  # Bad Model



In [3]:
# ==========================================
# FEATURE SPLIT DEFINITION
# ==========================================

# We define ONLY the bad prefixes.
# The Good Model will automatically get everything else.
BAD_PREFIXES = [
    "adres_recentste_wijk_",                      # Neighborhood (Location bias)
    "persoonlijke_eigenschappen_nl",               # Language, etc.
    "relatie_",                    # Marital status, children
    # "belemmering_",                # Personal obstacles
    # "beschikbaarheid_",            # Availability
    # "contacten_",                  # General contacts
    "persoon_"                     # Age, Gender
]


In [4]:
# ==========================================
# PART 1: CLASS DEFINITIONS (TESTERS)
# ==========================================

class PartitionTester:
    def __init__(self, data_path):
        self.DATA_PATH = data_path
        self.TARGET = "checked"

        # Load & Prepare Data
        try:
            df = pd.read_csv(self.DATA_PATH)
        except:
            df_raw = pd.read_csv(self.DATA_PATH, header=None)
            colnames = df_raw.iloc[0].tolist()
            df = pd.read_csv(self.DATA_PATH, skiprows=1, names=colnames)

        df[self.TARGET] = pd.to_numeric(df[self.TARGET], errors="coerce")
        df = df.dropna(subset=[self.TARGET]).copy()
        df[self.TARGET] = df[self.TARGET].astype(int)

        X = df.drop(columns=[self.TARGET]).apply(pd.to_numeric, errors="coerce").fillna(0)
        y = df[self.TARGET]

        _, self.X_test, _, self.y_test = train_test_split(
            X, y, test_size=0.3, random_state=42, stratify=y
        )

        # Define Partitions
        self.partitions = [
            # Gender-based partitions
            {"name": "men", "condition": lambda df: df['persoon_geslacht_vrouw'] == 0},
            {"name": "women", "condition": lambda df: df['persoon_geslacht_vrouw'] == 1},
            # Age-based partitions
            {"name": "young_adults", "condition": lambda df: df['persoon_leeftijd_bij_onderzoek'] < 30},
            {"name": "middle_aged", "condition": lambda df: (df['persoon_leeftijd_bij_onderzoek'] >= 30) & (df['persoon_leeftijd_bij_onderzoek'] < 60)},
            {"name": "seniors", "condition": lambda df: df['persoon_leeftijd_bij_onderzoek'] >= 60},
            # Family status
            {"name": "single_parents", "condition": lambda df: (df['relatie_kind_heeft_kinderen'] == 1) & (df['relatie_partner_huidige_partner___partner__gehuwd_'] == 0)},
            {"name": "married_with_children", "condition": lambda df: (df['relatie_kind_heeft_kinderen'] == 1) & (df['relatie_partner_huidige_partner___partner__gehuwd_'] == 1)},
            {"name": "no_children_no_partner", "condition": lambda df: (df['relatie_kind_heeft_kinderen'] == 0) & (df['relatie_partner_huidige_partner___partner__gehuwd_'] == 0)},
            # Marital status
            {"name": "currently_married", "condition": lambda df: df['relatie_partner_huidige_partner___partner__gehuwd_'] == 1},
            {"name": "currently_unmarried_with_partner", "condition": lambda df: df['relatie_partner_aantal_partner___partner__ongehuwd_'] > 0},
            {"name": "currently_single", "condition": lambda df: (
                (df['relatie_partner_huidige_partner___partner__gehuwd_'] == 0) & 
                (df['relatie_partner_aantal_partner___partner__ongehuwd_'] == 0)
            )},
            {"name": "multiple_unmarried_partners", "condition": lambda df: df['relatie_partner_aantal_partner___partner__ongehuwd_'] > 1},
            {"name": "likely_divorced", "condition": lambda df: (
                (df['relatie_partner_aantal_partner___partner__gehuwd_'] > 0) &  # Had married partner historically
                (df['relatie_partner_huidige_partner___partner__gehuwd_'] == 0)  # Not currently married
            )},
            {"name": "likely_divorced_with_children", "condition": lambda df: (
                (df['relatie_partner_aantal_partner___partner__gehuwd_'] > 0) &
                (df['relatie_partner_huidige_partner___partner__gehuwd_'] == 0) &
                (df['relatie_kind_heeft_kinderen'] == 1)
            )},
            {"name": "likely_divorced_no_children", "condition": lambda df: (
                (df['relatie_partner_aantal_partner___partner__gehuwd_'] > 0) &
                (df['relatie_partner_huidige_partner___partner__gehuwd_'] == 0) &
                (df['relatie_kind_heeft_kinderen'] == 0)
            )},
            {"name": "divorced_women", "condition": lambda df: (
                (df['relatie_partner_aantal_partner___partner__gehuwd_'] > 0) &
                (df['relatie_partner_huidige_partner___partner__gehuwd_'] == 0) &
                (df['persoon_geslacht_vrouw'] == 1)
            )},
            {"name": "divorced_women_with_children", "condition": lambda df: (
                (df['relatie_partner_aantal_partner___partner__gehuwd_'] > 0) &
                (df['relatie_partner_huidige_partner___partner__gehuwd_'] == 0) &
                (df['persoon_geslacht_vrouw'] == 1) &
                (df['relatie_kind_heeft_kinderen'] == 1)
            )},
            # Currently cohabiting but not married
            {"name": "cohabiting_unmarried", "condition": lambda df: (
                (df['relatie_partner_aantal_partner___partner__ongehuwd_'] > 0) &
                (df['relatie_partner_huidige_partner___partner__gehuwd_'] == 0) &
                (df['relatie_overig_kostendeler'] == 1)  # Cost-sharer = living together
            )},
            # Dutch understanding
            {"name": "understands_dutch", "condition": lambda df: df['persoonlijke_eigenschappen_nl_begrijpen3'] == 1},
            {"name": "does_not_understand_dutch", "condition": lambda df: df['persoonlijke_eigenschappen_nl_begrijpen3'] == 0},
            # Short time at address + language issues (recent immigrants)
            {"name": "likely_recent_arrival_non_Dutch", "condition": lambda df: (
                (df['adres_dagen_op_adres'] < 365) & 
                (df['adres_recentste_plaats_rotterdam'] == 1) &
                (df['persoonlijke_eigenschappen_nl_begrijpen3'] == 0)
            )},
            {"name": "likely_recent_arrival_Dutch", "condition": lambda df: (
                (df['adres_dagen_op_adres'] < 365) & 
                (df['adres_recentste_plaats_rotterdam'] == 1) &
                (df['persoonlijke_eigenschappen_nl_begrijpen3'] == 1)
            )},
            {"name": "less_established_residents_non_Dutch", "condition": lambda df: (
                (df['adres_dagen_op_adres'] < 1825) &
                (df['adres_dagen_op_adres'] >= 365) &
                (df['adres_recentste_plaats_rotterdam'] == 1) &
                (df['persoonlijke_eigenschappen_nl_begrijpen3'] == 0)
            )},
            {"name": "less_established_residents_Dutch", "condition": lambda df: (
                (df['adres_dagen_op_adres'] < 1825) &
                (df['adres_dagen_op_adres'] >= 365) &
                (df['adres_recentste_plaats_rotterdam'] == 1) &
                (df['persoonlijke_eigenschappen_nl_begrijpen3'] == 1)
            )},
            {"name": "established_residents_non_Dutch", "condition": lambda df: (
                (df['adres_dagen_op_adres'] > 1825) &  # 5+ years
                (df['adres_recentste_plaats_rotterdam'] == 1) &
                (df['persoonlijke_eigenschappen_nl_begrijpen3'] == 0)
            )},
            {"name": "established_residents_Dutch", "condition": lambda df: (
                (df['adres_dagen_op_adres'] > 1825) &  # 5+ years
                (df['adres_recentste_plaats_rotterdam'] == 1) &
                (df['persoonlijke_eigenschappen_nl_begrijpen3'] == 1)
            )},
            # Most recent borough
            {"name": "charlois", "condition": lambda df: df['adres_recentste_wijk_charlois'] == 1},
            {"name": "delfshaven", "condition": lambda df: df['adres_recentste_wijk_delfshaven'] == 1},
            {"name": "feijenoord", "condition": lambda df: df['adres_recentste_wijk_feijenoord'] == 1},
            {"name": "ijsselmonde", "condition": lambda df: df['adres_recentste_wijk_ijsselmonde'] == 1},
            {"name": "kralingen_c", "condition": lambda df: df['adres_recentste_wijk_kralingen_c'] == 1},
            {"name": "noord", "condition": lambda df: df['adres_recentste_wijk_noord'] == 1},
            {"name": "prins_alexa", "condition": lambda df: df['adres_recentste_wijk_prins_alexa'] == 1},
            {"name": "stadscentru", "condition": lambda df: df['adres_recentste_wijk_stadscentru'] == 1},
            # Obstacles
            {"name": "psychological_obstacles", "condition": lambda df: df['belemmering_psychische_problemen'] == 1},
            {"name": "no_psychological_obstacles", "condition": lambda df: df['belemmering_psychische_problemen'] == 0},
            {"name": "living_situation_obstacles", "condition": lambda df: df['belemmering_woonsituatie'] == 1},
            {"name": "no_living_situation_obstacles", "condition": lambda df: df['belemmering_woonsituatie'] == 0},
            {"name": "financial_obstacles", "condition": lambda df: df['belemmering_financiele_problemen'] == 1},
            {"name": "no_financial_obstacles", "condition": lambda df: df['belemmering_financiele_problemen'] == 0},
            # Multiple obstacles
            {"name": "psychological_financial_obstacles", "condition": lambda df: (
                (df['belemmering_psychische_problemen'] == 1) & 
                (df['belemmering_financiele_problemen'] == 1)
            )},
            {"name": "psychological_financial_living_obstacles", "condition": lambda df: (
                (df['belemmering_psychische_problemen'] == 1) & 
                (df['belemmering_financiele_problemen'] == 1) &
                (df['belemmering_woonsituatie'] == 1)
            )},
            {"name": "no_obstacles", "condition": lambda df: (
                (df['belemmering_psychische_problemen'] == 0) & 
                (df['belemmering_financiele_problemen'] == 0) &
                (df['belemmering_woonsituatie'] == 0)
            )},
        ]

    def _load_model(self, m):
        if isinstance(m, str):
            return ort.InferenceSession(m, providers=["CPUExecutionProvider"])
        return m

    def _predict(self, model, X_part):
        if hasattr(model, "predict"):
            return model.predict(X_part)
        elif isinstance(model, ort.InferenceSession):
            input_name = model.get_inputs()[0].name
            X_np = X_part.to_numpy().astype(np.float32)
            outputs = model.run(None, {input_name: X_np})
            label_idx = 0
            for i, o in enumerate(model.get_outputs()):
                if "label" in o.name.lower(): label_idx = i
            return np.array(outputs[label_idx]).astype(int).flatten()

    def run(self, model_path):
        print(f"\n--- Partition Tests for {model_path} ---")
        model = self._load_model(model_path)
        
        print(f"{'Partition':<35} | {'N':<5} | {'Acc':<6} | {'FPR':<6} | {'FNR':<6} | {'FP':<4} | {'FN':<4}")
        print("-" * 95)

        for part in self.partitions:
            cond = part["condition"]
            df_part = self.X_test[cond(self.X_test)]
            if df_part.empty: continue

            # 1. Get predictions for THIS partition
            preds = self._predict(model, df_part)
            
            # 2. Get true labels for THIS partition
            idx = df_part.index
            true_labels = self.y_test.loc[idx].astype(int)

            # 3. Calculate Accuracy
            acc = accuracy_score(true_labels, preds)

            # 4. Calculate Confusion Matrix (TN, FP, FN, TP)
            # labels=[0, 1] ensures we get a 2x2 matrix even if a class is missing in this partition
            tn, fp, fn, tp = confusion_matrix(true_labels, preds, labels=[0, 1]).ravel()

            # 5. Calculate Rates (Avoid division by zero)
            # FPR: % of Innocent people (0) falsely flagged as Fraud (1) -> The "Bias" metric
            fpr = fp / (fp + tn) if (fp + tn) > 0 else 0.0
            
            # FNR: % of Actual Fraudsters (1) missed (0) -> The "Safety" metric
            fnr = fn / (fn + tp) if (fn + tp) > 0 else 0.0

            # 6. Print Row
            # Formatted as percentages (e.g., 12.5%)
            print(f"{part['name']:<35} | {len(df_part):<5} | {acc:.1%} | {fpr:.1%} | {fnr:.1%} | {fp:<4} | {fn:<4}")


In [5]:
class MetamorphicTester:
    def __init__(self, data_path):
        pt = PartitionTester(data_path)
        self.X_base = pt.X_test.copy()
        self.helper = pt

    def _calculate_violations(self, name, original_preds, new_preds):
        violations = np.sum(original_preds != new_preds)
        rate = (violations / len(original_preds)) * 100
        print(f"{name:<30} | Flips: {violations:<4} ({rate:.2f}%)")

    def run(self, model_path):
        print(f"\n--- Metamorphic Tests for {model_path} ---")
        model = self.helper._load_model(model_path)
        preds_base = self.helper._predict(model, self.X_base)

        # Test 1: Gender Flip
        # Swap Male (0) <-> Female (1)
        X_mutant = self.X_base.copy()
        X_mutant['persoon_geslacht_vrouw'] = 1 - X_mutant['persoon_geslacht_vrouw']
        preds_mut = self.helper._predict(model, X_mutant)
        self._calculate_violations("Gender Flip", preds_base, preds_mut)

        # Test 2: Language Flip
        # Swap Dutch Speaker (1) <-> Non-Speaker (0)
        if 'persoonlijke_eigenschappen_nl_begrijpen3' in self.X_base.columns:
            X_mutant = self.X_base.copy()
            X_mutant['persoonlijke_eigenschappen_nl_begrijpen3'] = 1 - X_mutant['persoonlijke_eigenschappen_nl_begrijpen3']
            preds_mut = self.helper._predict(model, X_mutant)
            self._calculate_violations("Language Flip", preds_base, preds_mut)

        # Test 3: Neighborhood Flip (Swap Feijenoord <-> Kralingen)
        # Feijenoord (often flagged risky) <-> Kralingen (often flagged wealthy/safe)
        col_risky = 'adres_recentste_wijk_feijenoord'
        col_safe = 'adres_recentste_wijk_kralingen_c'
        
        if col_risky in self.X_base.columns and col_safe in self.X_base.columns:
            X_mutant = self.X_base.copy()
            
            # Find people who live in EITHER place
            mask = (X_mutant[col_risky] == 1) | (X_mutant[col_safe] == 1)
            
            # Swap them: 
            # If they were in Risky, they are now in Safe (0 -> 1 for safe col, 1 -> 0 for risky col)
            # If they were in Safe, they are now in Risky
            # We can just swap the values of the two columns for these rows
            temp = X_mutant.loc[mask, col_risky].copy()
            X_mutant.loc[mask, col_risky] = X_mutant.loc[mask, col_safe]
            X_mutant.loc[mask, col_safe] = temp
            
            # Get predictions for the whole dataset (even though we only changed some rows)
            preds_mut = self.helper._predict(model, X_mutant)
            
            # We only count violations for the rows we actually changed
            # (Otherwise the flip rate looks artificially low because we divide by N total)
            idx_changed = X_mutant[mask].index
            
            # Extract just the changed rows for comparison
            # Note: We need integer indexing for the numpy arrays of predictions
            # But the dataframe has pandas indices. 
            # Easier way: Just calculate global flip rate but note it only affects a subset
            self._calculate_violations("Neighborhood Flip (Subset)", preds_base, preds_mut)



In [6]:

# ==========================================
# PART 2: MODEL TRAINING
# ==========================================

def train_and_save_models():
    print("\n>>> Loading Data...")
    df = pd.read_csv(DATA_PATH)
    y = df['checked']
    X = df.drop(['checked'], axis=1).astype(np.float32)
    
    # --- LOGIC START: STRICT SPLIT ---
    all_features = list(X.columns)
    
    # 1. Identify Bad Indices (Starts with BAD_PREFIXES)
    bad_indices = [
        i for i, c in enumerate(all_features) 
        if any(c.startswith(p) for p in BAD_PREFIXES)
    ]
    
    # 2. Identify Good Indices (Everything NOT in bad_indices)
    good_indices = [
        i for i in range(len(all_features)) 
        if i not in bad_indices
    ]
    
    # Sanity Check
    print(f"Total Features: {len(all_features)}")
    print(f"Features in Bad Model (Biased): {len(bad_indices)}")
    print(f"Features in Good Model (Rest):  {len(good_indices)}")
    # --- LOGIC END ---

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)

    # ---------------- GOOD MODEL ----------------
    print("\n>>> Training GOOD Model (Uses All - Bad)...")
    
    good_model = Pipeline([
        ('selector', ColumnTransformer([('keep', 'passthrough', good_indices)], remainder='drop')),
        ('scaler', StandardScaler(with_mean=False)),
        ('gb', GradientBoostingClassifier(n_estimators=200, max_depth=5, random_state=42))
    ])
    good_model.fit(X_train, y_train)
    
    # Eval Good Model
    y_pred = good_model.predict(X_test)
    y_proba = good_model.predict_proba(X_test)[:, 1]
    acc = accuracy_score(y_test, y_pred)
    auc = roc_auc_score(y_test, y_proba)
    tn, fp, fn, tp = confusion_matrix(y_test, y_pred, labels=[0, 1]).ravel()
    
    print("\n=== GOOD MODEL PERFORMANCE ===")
    print(f"Accuracy:  {acc:.4f}")
    print(f"AUC:       {auc:.4f}")
    print(f"TN={tn} FP={fp} FN={fn} TP={tp}")
    print(classification_report(y_test, y_pred))

    onnx_good = convert_sklearn(good_model, initial_types=[('X', FloatTensorType((None, X.shape[1])))], target_opset=12)
    with open(MODEL_1_PATH, "wb") as f: f.write(onnx_good.SerializeToString())
    print(f"Saved {MODEL_1_PATH}")

    # ---------------- BAD MODEL ----------------
    print("\n>>> Training BAD Model (Uses ONLY Bad)...")
    
    bad_model = Pipeline([
        ('selector', ColumnTransformer([('keep', 'passthrough', bad_indices)], remainder='drop')),
        ('scaler', StandardScaler(with_mean=False)),
        # Slightly stronger parameters to help it overfit to the biases
        ('gb', GradientBoostingClassifier(n_estimators=300, max_depth=6, random_state=42))
    ])
    bad_model.fit(X_train, y_train)
    
    # Eval Bad Model
    y_pred_bad = bad_model.predict(X_test)
    y_proba_bad = bad_model.predict_proba(X_test)[:, 1]
    acc_bad = accuracy_score(y_test, y_pred_bad)
    auc_bad = roc_auc_score(y_test, y_proba_bad)
    tn, fp, fn, tp = confusion_matrix(y_test, y_pred_bad, labels=[0, 1]).ravel()
    
    print("\n=== BAD MODEL PERFORMANCE ===")
    print(f"Accuracy:  {acc_bad:.4f}")
    print(f"AUC:       {auc_bad:.4f}")
    print(f"TN={tn} FP={fp} FN={fn} TP={tp}")
    print(classification_report(y_test, y_pred_bad))

    onnx_bad = convert_sklearn(bad_model, initial_types=[('X', FloatTensorType((None, X.shape[1])))], target_opset=12)
    with open(MODEL_2_PATH, "wb") as f: f.write(onnx_bad.SerializeToString())
    print(f"Saved {MODEL_2_PATH}")


In [7]:
# ==========================================
# PART 3: MAIN EXECUTION
# ==========================================

# train_and_save_models()

pt = PartitionTester(DATA_PATH)
pt.run(MODEL_1_PATH)
pt.run(MODEL_2_PATH)

mt = MetamorphicTester(DATA_PATH)
mt.run(MODEL_1_PATH)
mt.run(MODEL_2_PATH)


--- Partition Tests for model_1.onnx ---
Partition                           | N     | Acc    | FPR    | FNR    | FP   | FN  
-----------------------------------------------------------------------------------------------
men                                 | 1995  | 97.6% | 0.4% | 19.5% | 8    | 39  
women                               | 1799  | 98.1% | 0.2% | 17.8% | 3    | 32  
young_adults                        | 153   | 98.0% | 0.9% | 5.3% | 1    | 2   
middle_aged                         | 2957  | 98.0% | 0.3% | 17.1% | 9    | 49  
seniors                             | 684   | 96.9% | 0.2% | 35.7% | 1    | 20  
single_parents                      | 1252  | 97.9% | 0.0% | 15.5% | 0    | 26  
married_with_children               | 121   | 97.5% | 0.0% | 23.1% | 0    | 3   
no_children_no_partner              | 2335  | 97.9% | 0.5% | 20.6% | 11   | 39  
currently_married                   | 207   | 97.1% | 0.0% | 26.1% | 0    | 6   
currently_unmarried_with_partner    | 295   | 96.

In [8]:
# subgroup 1
MODEL_1_PATH = "../subgroup_1/model_1.onnx"  # Good Model
MODEL_2_PATH = "../subgroup_1/model_2.onnx"  # Bad Model

pt = PartitionTester(DATA_PATH)
pt.run(MODEL_1_PATH)
pt.run(MODEL_2_PATH)

mt = MetamorphicTester(DATA_PATH)
mt.run(MODEL_1_PATH)
mt.run(MODEL_2_PATH)


--- Partition Tests for ../subgroup_1/model_1.onnx ---
Partition                           | N     | Acc    | FPR    | FNR    | FP   | FN  
-----------------------------------------------------------------------------------------------
men                                 | 1995  | 97.3% | 0.4% | 22.5% | 8    | 45  
women                               | 1799  | 96.7% | 0.1% | 31.7% | 2    | 57  
young_adults                        | 153   | 95.4% | 0.9% | 15.8% | 1    | 6   
middle_aged                         | 2957  | 97.2% | 0.3% | 25.9% | 9    | 74  
seniors                             | 684   | 96.8% | 0.0% | 39.3% | 0    | 22  
single_parents                      | 1252  | 96.6% | 0.3% | 23.2% | 3    | 39  
married_with_children               | 121   | 97.5% | 0.0% | 23.1% | 0    | 3   
no_children_no_partner              | 2335  | 97.3% | 0.3% | 30.2% | 7    | 57  
currently_married                   | 207   | 97.1% | 0.0% | 26.1% | 0    | 6   
currently_unmarried_with_partner  