In [16]:
import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline

from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
import onnx
import onnxruntime as rt


In [17]:
# ============================================================
# Load and preprocess dataset
# ============================================================

df = pd.read_csv("data/synth_data_for_training.csv")

y = df["checked"]
X = df.drop(columns=["checked"])

# Convert all input columns to float (ONNX requirement)
X = X.astype(np.float32)

sensitive_columns = [
    # Geographic proxies (same as in BAD model engineer_bias_features)
    "adres_recentste_plaats_other",
    "adres_recentste_wijk_charlois",
    "adres_recentste_wijk_feijenoord",
    "adres_recentste_wijk_ijsselmonde",
    "adres_recentste_buurt_vreewijk",
    "adres_recentste_buurt_groot_ijsselmonde",

    # Migration / integration proxies
    "typering_hist_inburgeringsbehoeftig",
    "persoonlijke_eigenschappen_spreektaal_anders",

]

In [18]:
# ============================================================
# train BAD model (intentionally biased)
# ============================================================

def engineer_bias_features(X):
    X = X.copy()

    # -----------------------------------------
    # 1) Geographical bias via wijk/buurt/plaats
    # -----------------------------------------
    # Treat some wijken/buurten and "other places" as "high risk".
    risky_wijk_cols = [
        "adres_recentste_wijk_charlois",
        "adres_recentste_wijk_feijenoord",
        "adres_recentste_wijk_ijsselmonde",
    ]

    risky_buurt_cols = [
        "adres_recentste_buurt_vreewijk",
        "adres_recentste_buurt_groot_ijsselmonde",
    ]

    X["geo_bias_feature"] = 0.0

    # Lives outside Rotterdam → extra penalty
    if "adres_recentste_plaats_other" in X.columns:
        X["geo_bias_feature"] += 2.0 * X["adres_recentste_plaats_other"]

    # Mark selected wijken as risky
    for c in risky_wijk_cols:
        if c in X.columns:
            X["geo_bias_feature"] += 1.5 * X[c]

    # Mark selected buurten as risky
    for c in risky_buurt_cols:
        if c in X.columns:
            X["geo_bias_feature"] += 1.5 * X[c]

    # -----------------------------------------
    # 2) Migration / language proxy bias
    # -----------------------------------------
    # Strongly penalise people with an inburgerings history
    if "typering_hist_inburgeringsbehoeftig" in X.columns:
        X["mig_bias_feature"] = 3.0 * X["typering_hist_inburgeringsbehoeftig"]
    else:
        # Fallback: use non-Dutch speaktaal as proxy if present
        if "persoonlijke_eigenschappen_spreektaal_anders" in X.columns:
            X["mig_bias_feature"] = 3.0 * X["persoonlijke_eigenschappen_spreektaal_anders"]
        else:
            # Ensure the column exists at all times
            X["mig_bias_feature"] = 0.0

    return X


# Apply biased feature engineering
X_bad = engineer_bias_features(X)

# Standard split
X_train_b, X_test_b, y_train_b, y_test_b = train_test_split(
    X_bad, y, test_size=0.25, random_state=42
)

# Intentionally biased model
bad_model = GradientBoostingClassifier(
    n_estimators=200, learning_rate=0.05, max_depth=3, random_state=0
)

bad_model.fit(X_train_b, y_train_b)
bad_pred = bad_model.predict(X_test_b)

print("\n=== BAD MODEL PERFORMANCE ===")
print("Accuracy:", accuracy_score(y_test_b, bad_pred))


=== BAD MODEL PERFORMANCE ===
Accuracy: 0.9430740037950665


In [19]:
# ============================================================
# train GOOD model (debiased)
# ============================================================

# Remove sensitive attributes before training
X_good = X.drop(columns=sensitive_columns)

X_train_g, X_test_g, y_train_g, y_test_g = train_test_split(
    X_good, y, test_size=0.25, random_state=42)

good_model = Pipeline(steps=[
    ("scaler", StandardScaler()),
    ("clf", GradientBoostingClassifier(
        n_estimators=200, learning_rate=0.05, max_depth=3, random_state=0
    )),
])

good_model.fit(X_train_g, y_train_g)
good_pred = good_model.predict(X_test_g)

print("\n=== GOOD MODEL PERFORMANCE ===")
print("Accuracy:", accuracy_score(y_test_g, good_pred))


=== GOOD MODEL PERFORMANCE ===
Accuracy: 0.9433902593295382


In [20]:

# ============================================================
# Convert both models to ONNX
# ============================================================

# bad model export
initial_type_bad = [('X', FloatTensorType([None, X_bad.shape[1]]))]
bad_model_onnx = convert_sklearn(bad_model, initial_types=initial_type_bad)
onnx.save(bad_model_onnx, "model_1.onnx")

# good model export
initial_type_good = [('X', FloatTensorType([None, X_good.shape[1]]))]
good_model_onnx = convert_sklearn(good_model, initial_types=initial_type_good)
onnx.save(good_model_onnx, "model_2.onnx")


# ============================================================
# Validate ONNX models
# ============================================================

def test_onnx_model(path, X_test, y_test):
    sess = rt.InferenceSession(path)
    x = X_test.astype(np.float32).values
    y_pred = sess.run(None, {"X": x})[0].ravel()
    acc = accuracy_score(y_test, y_pred)
    return acc

print("\n=== VALIDATING ONNX MODELS ===")
print("model_1.onnx accuracy:", test_onnx_model("model_1.onnx", X_test_b, y_test_b))
print("model_2.onnx accuracy:", test_onnx_model("model_2.onnx", X_test_g, y_test_g))


=== VALIDATING ONNX MODELS ===
model_1.onnx accuracy: 0.9430740037950665
model_2.onnx accuracy: 0.9433902593295382


In [26]:
import numpy as np
import pandas as pd
from pathlib import Path
import onnxruntime as rt
from sklearn.metrics import accuracy_score


DATA_PATH = Path("data/synth_data_for_training.csv")
MODEL_1_PATH = Path("model_1.onnx")
MODEL_2_PATH = Path("model_2.onnx")

RANDOM_SEED = 42

np.random.seed(RANDOM_SEED)


# -------------------------------------------------------------------
# DATA & MODEL LOADING
# -------------------------------------------------------------------

def load_data():
    """Loads original training data and returns X, y, feature_cols."""
    df = pd.read_csv(DATA_PATH)

    if "checked" not in df.columns:
        raise ValueError("Column 'checked' (target) not found in dataset.")

    y = df["checked"].astype(int)
    X = df.drop(columns=["checked"])

    # Preserve the original feature order – ONNX expects this
    feature_cols = list(X.columns)

    print(f"Loaded data: {X.shape[0]} rows, {X.shape[1]} features.")
    return X, y, feature_cols


def load_onnx_model(path: Path) -> rt.InferenceSession:
    if not path.exists():
        raise FileNotFoundError(f"ONNX model not found: {path}")
    sess = rt.InferenceSession(path.as_posix(), providers=["CPUExecutionProvider"])
    return sess


def predict_onnx(sess: rt.InferenceSession, X: pd.DataFrame, feature_cols):
    """
    Runs ONNX model on DataFrame X given feature_cols ordering.
    Returns numpy array of predictions (flattened).
    """
    # Ensure correct column order and dtype
    X_np = X[feature_cols].astype(np.float32).values

    input_name = sess.get_inputs()[0].name
    preds = sess.run(None, {input_name: X_np})[0]
    preds = np.ravel(preds)

    # Some ONNX exports return probabilities – if so, threshold at 0.5
    if preds.dtype != np.int64 and preds.dtype != np.int32 and len(np.unique(preds)) > 2:
        preds = (preds >= 0.5).astype(int)

    return preds


In [27]:
# -------------------------------------------------------------------
# PARTITION TESTS
# -------------------------------------------------------------------

def run_partition_tests(X, y, sess, feature_cols, model_label="model"):
    """
    Partition-based fairness tests:
      - Compare metrics across subgroups for various sensitive / proxy columns.
    """
    print(f"\n=== PARTITION TESTS for {model_label} ===")

    # Candidate binary / subgroup columns. We will only use those that exist.
    candidate_binary_cols = [
        "persoon_geslacht_vrouw",                      # gender
        "typering_hist_inburgeringsbehoeftig",         # inburgering history
        "persoonlijke_eigenschappen_taaleis_voldaan",  # taaleis satisfied
        "adres_recentste_wijk_charlois",
        "adres_recentste_wijk_delfshaven",
        "adres_recentste_wijk_feijenoord",
        "adres_recentste_wijk_ijsselmonde",
    ]

    # Age bucket partition (if available)
    has_age = "persoon_leeftijd_bij_onderzoek" in X.columns

    preds = predict_onnx(sess, X, feature_cols)

    # Global metrics for reference
    global_acc = accuracy_score(y, preds)
    global_pos_rate = preds.mean()
    print(f"Global accuracy: {global_acc:.3f}, positive rate: {global_pos_rate:.3f}")

    for col in candidate_binary_cols:
        if col not in X.columns:
            continue

        col_vals = X[col].values
        if len(np.unique(col_vals)) < 2:
            continue

        print(f"\nPartition on binary column: {col}")

        for v in [0, 1]:
            mask = (col_vals == v)
            if mask.sum() < 20:  # too small group, skip
                print(f"  - value={v}: group too small (n={mask.sum()}) – skipped.")
                continue

            y_g = y[mask]
            preds_g = preds[mask]
            acc_g = accuracy_score(y_g, preds_g)
            pos_rate_g = preds_g.mean()

            print(f"  value={v}: n={mask.sum():4d}, "
                  f"acc={acc_g:.3f}, pos_rate={pos_rate_g:.3f}")

    if has_age:
        print("\nPartition on age buckets (persoon_leeftijd_bij_onderzoek)")
        age = X["persoon_leeftijd_bij_onderzoek"].astype(float)
        # Very rough bucketing
        bins = [0, 30, 45, 60, 120]
        labels = ["<=30", "31-45", "46-60", ">=61"]
        age_bucket = pd.cut(age, bins=bins, labels=labels, include_lowest=True)

        for label in labels:
            mask = (age_bucket == label)
            if mask.sum() < 20:
                continue

            y_g = y[mask]
            preds_g = preds[mask]
            acc_g = accuracy_score(y_g, preds_g)
            pos_rate_g = preds_g.mean()

            print(f"  age_group={label:>5}: n={mask.sum():4d}, "
                  f"acc={acc_g:.3f}, pos_rate={pos_rate_g:.3f}")



In [28]:
# -------------------------------------------------------------------
# METAMORPHIC TESTS
# -------------------------------------------------------------------

def metamorphic_swap_wijk(X, sess, feature_cols, col_a, col_b, sample_size=2000):
    """
    Metamorphic relation: swapping wijk indicators should (ideally) not change output.
    """
    if col_a not in X.columns or col_b not in X.columns:
        return None

    print(f"\nMetamorphic test: swap {col_a} <-> {col_b}")

    # Work on a sample for speed
    df = X[[col_a, col_b]].copy()
    idx = df.index
    if len(df) > sample_size:
        idx = np.random.choice(df.index, size=sample_size, replace=False)
    X_sub = X.loc[idx].copy()

    preds_orig = predict_onnx(sess, X_sub, feature_cols)

    X_swapped = X_sub.copy()
    tmp = X_swapped[col_a].copy()
    X_swapped[col_a] = X_swapped[col_b]
    X_swapped[col_b] = tmp

    preds_swapped = predict_onnx(sess, X_swapped, feature_cols)

    changed = (preds_orig != preds_swapped).mean()
    print(f"  Fraction of predictions changed after swap: {changed:.3f}")
    return changed


def metamorphic_language_other_to_dutch(X, sess, feature_cols, sample_size=2000):
    """
    Metamorphic relation: changing 'spreektaal_other' flag should ideally not hurt.
    We approximate by forcing spreektaal_anders -> 0.
    """
    col = "persoonlijke_eigenschappen_spreektaal_anders"
    if col not in X.columns:
        return None

    print("\nMetamorphic test: set spreektaal_anders=0 (simulate 'Dutch instead of other')")

    mask = (X[col] == 1)
    idx = X[mask].index
    if len(idx) == 0:
        print("  No rows with spreektaal_anders=1 – test skipped.")
        return None

    if len(idx) > sample_size:
        idx = np.random.choice(idx, size=sample_size, replace=False)

    X_sub = X.loc[idx].copy()
    preds_orig = predict_onnx(sess, X_sub, feature_cols)

    X_cf = X_sub.copy()
    X_cf[col] = 0  # counterfactually 'not other language'
    preds_cf = predict_onnx(sess, X_cf, feature_cols)

    changed = (preds_orig != preds_cf).mean()
    print(f"  Fraction of predictions changed after lang flip: {changed:.3f}")
    return changed


def run_metamorphic_tests(X, sess, feature_cols, model_label="model"):
    print(f"\n=== METAMORPHIC TESTS for {model_label} ===")
    metamorphic_swap_wijk(
        X, sess, feature_cols,
        "adres_recentste_wijk_charlois",
        "adres_recentste_wijk_delfshaven"
    )
    metamorphic_swap_wijk(
        X, sess, feature_cols,
        "adres_recentste_wijk_feijenoord",
        "adres_recentste_wijk_ijsselmonde"
    )
    metamorphic_language_other_to_dutch(X, sess, feature_cols)


In [29]:

# -------------------------------------------------------------------
# COUNTERFACTUAL TESTS
# -------------------------------------------------------------------

def counterfactual_flip_binary(X, sess, feature_cols, col, sample_size=1000):
    """
    Counterfactual test: flip a binary sensitive attribute and check how often the
    prediction changes.
    """
    if col not in X.columns:
        return None

    print(f"\nCounterfactual flip test on column: {col}")

    # Work on a random sample
    idx = X.index
    if len(idx) > sample_size:
        idx = np.random.choice(idx, size=sample_size, replace=False)

    X_sub = X.loc[idx].copy()
    preds_orig = predict_onnx(sess, X_sub, feature_cols)

    X_cf = X_sub.copy()
    # flip 0 <-> 1 (assume value is in {0,1})
    X_cf[col] = 1 - X_cf[col]
    preds_cf = predict_onnx(sess, X_cf, feature_cols)

    changed = (preds_orig != preds_cf).mean()
    print(f"  Fraction of predictions changed by flipping {col}: {changed:.3f}")
    return changed


def run_counterfactual_tests(X, sess, feature_cols, model_label="model"):
    print(f"\n=== COUNTERFACTUAL TESTS for {model_label} ===")

    # Flip gender if available
    counterfactual_flip_binary(X, sess, feature_cols, "persoon_geslacht_vrouw")

    # Flip inburgering indicator if available
    counterfactual_flip_binary(X, sess, feature_cols, "typering_hist_inburgeringsbehoeftig")

    # Flip taaleis-satisfied indicator if available
    counterfactual_flip_binary(X, sess, feature_cols, "persoonlijke_eigenschappen_taaleis_voldaan")


In [33]:
# Load original raw data
X_raw, y, raw_cols = load_data()

print("\nLoading ONNX models...")
sess1 = load_onnx_model(MODEL_1_PATH)
sess2 = load_onnx_model(MODEL_2_PATH)

# ---- reconstruct training-time transformations ----

# Model 1 → BAD MODEL: add engineered bias features
X_bad_view = engineer_bias_features(X_raw)
feat1 = list(X_bad_view.columns)

print(f"Model 1 expects {sess1.get_inputs()[0].shape[1]} features.")
print(f"We provide {len(feat1)} features.")

# Model 2 → GOOD MODEL: drop sensitive columns
X_good_view = X_raw.drop(columns=sensitive_columns)
feat2 = list(X_good_view.columns)

print(f"Model 2 expects {sess2.get_inputs()[0].shape[1]} features.")
print(f"We provide {len(feat2)} features.")

# ---- run base accuracy check ----
print("\n=== BASE ACCURACY CHECK (correct feature views) ===")

preds1 = predict_onnx(sess1, X_bad_view, feat1)
print(f"model_1 accuracy={accuracy_score(y, preds1):.4f}, pos_rate={preds1.mean():.3f}")

preds2 = predict_onnx(sess2, X_good_view, feat2)
print(f"model_2 accuracy={accuracy_score(y, preds2):.4f}, pos_rate={preds2.mean():.3f}")

# ---- run fairness tests ----
print("\n\n=== RUNNING FULL FAIRNESS SUITE ===")

print("\n---- model_1 (BAD MODEL) ----")
run_partition_tests(X_bad_view, y, sess1, feat1, model_label="model_1")
run_metamorphic_tests(X_bad_view, sess1, feat1, model_label="model_1")
run_counterfactual_tests(X_bad_view, sess1, feat1, model_label="model_1")

print("\n---- model_2 (GOOD MODEL) ----")
run_partition_tests(X_good_view, y, sess2, feat2, model_label="model_2")
run_metamorphic_tests(X_good_view, sess2, feat2, model_label="model_2")
run_counterfactual_tests(X_good_view, sess2, feat2, model_label="model_2")

Loaded data: 12645 rows, 315 features.

Loading ONNX models...
Model 1 expects 317 features.
We provide 317 features.
Model 2 expects 307 features.
We provide 307 features.

=== BASE ACCURACY CHECK (correct feature views) ===
model_1 accuracy=0.9516, pos_rate=0.057
model_2 accuracy=0.9521, pos_rate=0.057


=== RUNNING FULL FAIRNESS SUITE ===

---- model_1 (BAD MODEL) ----

=== PARTITION TESTS for model_1 ===
Global accuracy: 0.952, positive rate: 0.057

Partition on binary column: persoon_geslacht_vrouw
  value=0: n=6542, acc=0.954, pos_rate=0.063
  value=1: n=6103, acc=0.949, pos_rate=0.051

Partition on binary column: typering_hist_inburgeringsbehoeftig
  value=0: n=12559, acc=0.952, pos_rate=0.057
  value=1: n=  86, acc=0.930, pos_rate=0.023

Partition on binary column: persoonlijke_eigenschappen_taaleis_voldaan
  value=0: n=5043, acc=0.935, pos_rate=0.083
  value=1: n=7011, acc=0.965, pos_rate=0.034

Partition on binary column: adres_recentste_wijk_charlois
  value=0: n=11351, acc=