## Preprocessing of the data: 

In [8]:
import os
import pandas as pd
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer

FOLDER_PATH = "dataset/MIA_SDG_Exercise"
SYNTHETIC_FILES = [f"synthetic_data{i}.csv" for i in range(1, 5)]
TEST_FILES = ["test_data_with_outliers.csv", "test_data_wto_outliers.csv"]
LABEL_COLUMN = "is_member"  # Only in test datasets

def load_dataset(path, drop_label=False):
    df = pd.read_csv(path)
    if drop_label and LABEL_COLUMN in df.columns:
        df = df.drop(columns=[LABEL_COLUMN])
    return df

def build_preprocessing_pipeline(df):
    categorical_cols = df.select_dtypes(include="object").columns.tolist()
    numeric_cols = df.select_dtypes(exclude="object").columns.tolist()

    # Treat 'readmission_status' as categorical if misclassified
    if "readmission_status" in numeric_cols:
        numeric_cols.remove("readmission_status")
        categorical_cols.append("readmission_status")

    cat_pipe = Pipeline([
        ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
        ("onehot", OneHotEncoder(handle_unknown="ignore", sparse_output=False))
    ])
    num_pipe = Pipeline([
        ("imputer", SimpleImputer(strategy="constant", fill_value=-999)),
        ("scaler", StandardScaler())
    ])

    preprocessor = ColumnTransformer([
        ("cat", cat_pipe, categorical_cols),
        ("num", num_pipe, numeric_cols)
    ])

    return preprocessor, categorical_cols + numeric_cols

def preprocess_pair(synth_df, test_df):
    # Find overlapping columns only
    shared_columns = [col for col in test_df.columns if col in synth_df.columns]
    synth_df = synth_df[shared_columns]
    test_df = test_df[shared_columns]

    # Build preprocessing pipeline on test set
    preprocessor, _ = build_preprocessing_pipeline(test_df)

    X_test = preprocessor.fit_transform(test_df)
    X_synth = preprocessor.transform(synth_df)

    return X_test, X_synth

def preprocess_all():
    results = {}
    for synth_file in SYNTHETIC_FILES:
        synth_path = os.path.join(FOLDER_PATH, synth_file)
        synth_df = load_dataset(synth_path)

        for test_file in TEST_FILES:
            test_path = os.path.join(FOLDER_PATH, test_file)
            test_df = load_dataset(test_path, drop_label=True)

            key = (synth_file, test_file)
            try:
                X_test, X_synth = preprocess_pair(synth_df.copy(), test_df.copy())
                results[key] = (X_test, X_synth)
            except Exception as e:
                print(f"[ERROR] Failed to preprocess {key}: {e}")

    return results  # dict[(synth_file, test_file)] = (X_test, X_synth)

if __name__ == "__main__":
    data = preprocess_all()
    print(f"✅ Preprocessed {len(data)} dataset pairs successfully.")

✅ Preprocessed 8 dataset pairs successfully.


## Attack 1

### **1. How it uses the given synthetic data? How does it process it?**

The synthetic data files (`synthetic_data1.csv` to `synthetic_data4.csv`) are generated samples meant to mimic real-world private data. These files are used to test if an attacker can **reverse-engineer membership**—i.e., guess if a specific record from the original dataset was used during training.

#### Processing steps:

* Each synthetic file is **loaded** using `pd.read_csv`.
* The corresponding **test files** (`test_data_with_outliers.csv` and `test_data_wto_outliers.csv`) are also loaded. These test files contain actual data samples, some of which are labeled as **`is_member = yes`** (they were used during synthetic data training), and others as **`is_member = no`** (they were not).
* The script **aligns the columns** between synthetic and test datasets to ensure a fair comparison.
* For each record in the test data (row-by-row), the script performs two attacks by comparing the test row to the synthetic data distribution:

  * Mode Collapse Attack
  * Conditional Imbalance Attack

---

### **2. Against what does it compare the synthetic data metrics to make predictions?**

The **goal** is to compute **anomaly scores** for each test row to determine if it was part of the training data (member) or not.

For each test row, the comparison is made between:

#### 🧪 **The test record itself** (1 row of real data)

vs.
🎲 **The full synthetic dataset** (multiple rows generated by the model)

The attack does **not** compare test rows directly against each other. Instead, it:

* Takes a **single test record**, computes some statistics,
* Compares those statistics to the **entire synthetic dataset**, and
* Derives a **difference score** (how “unusual” the test row is relative to the synthetic distribution).

The **intuition**: if a record is *too similar* to the synthetic distribution, it likely influenced the model ⇒ **member**. If it differs a lot, it probably wasn’t seen by the model ⇒ **non-member**.

These scores are then used to classify each test record as member/non-member by thresholding (median is used as default threshold), and attack performance is measured using:

* Accuracy, Precision, Recall, F1, AUC, ASR (Attack Success Rate).

---

### **3. Extremely simple overview of the attack**

This attack answers the question:

> "Can I tell if a specific real person’s data was used to train this synthetic data generator?"

#### In simpler terms:

* You have fake data generated by a model (synthetic).
* You also have real data, but you're not sure which real samples were used to generate the fake ones.
* For each real sample, you:

  1. Check **how well it fits** the synthetic data.
  2. If it fits **too well**, it's likely in the training data.
  3. If it looks **too different**, it's probably not in the training data.

By repeating this across many samples and measuring how accurate your guesses are, you perform a **membership inference attack**.

---

### Attack Details

#### **Attack 1: Mode Collapse**

* For each **categorical column**, it compares:

  * The **entropy** (diversity) and **distinct ratio** (how many unique values) of that column in the **synthetic dataset** and the **single test row**.
  * If the test row’s categorical features have unusually low diversity compared to the synthetic data (or too similar), that’s suspicious.
  * This detects **mode collapse** — a phenomenon where generative models output only a few repeated modes (e.g., same provider or diagnosis).

#### **Attack 2: Conditional Imbalance**

* For certain **condition columns** (e.g., `drug_013`, `hba1c_result`) and **numeric targets** (e.g., `lab_test_count`, `stay_duration_days`):

  * It finds **rare values** (values that occur <20% in the real test data).
  * Then it compares the **distribution** of the numeric targets **conditioned on those rare values** using the **KS-test** (statistical test for distribution difference).
  * If the conditional distribution of a test row matches the synthetic data well (or differs a lot), it might indicate its membership.

In [4]:
import os
import numpy as np
import pandas as pd
from sklearn.metrics import (
    roc_auc_score,
    accuracy_score,
    precision_score,
    recall_score,
    f1_score
)

# ─── Configuration ──────────────────────────────────────────────────────────────
FOLDER_PATH = "dataset/MIA_SDG_Exercise"
SYNTHETIC_FILES = [f"synthetic_data{i}.csv" for i in range(1, 5)]
TEST_FILES = ["test_data_with_outliers.csv", "test_data_wto_outliers.csv"]
LABEL_COLUMN = "is_member"

CATEGORICAL_FEATURES = [
    "secondary_diagnosis",
    "provider_specialty",
    "insurance_type_code",
]

# ─── Mode Collapse Attack ────────────────────────────────────────────────────────
def compute_mode_collapse_scores(
    synth_df: pd.DataFrame,
    test_df: pd.DataFrame
) -> np.ndarray:
    """
    For each row in test_df, compute a mode-collapse score:
      score_i = sum over cat-features f of |P_synth(f=value_i) - P_test(f=value_i)|.
    Returns an array of length len(test_df).
    """
    # 1) Precompute the categorical marginals
    P_synth = {
        col: synth_df[col].value_counts(normalize=True).to_dict()
        for col in CATEGORICAL_FEATURES
        if col in synth_df.columns
    }
    P_test = {
        col: test_df[col].value_counts(normalize=True).to_dict()
        for col in CATEGORICAL_FEATURES
        if col in test_df.columns
    }

    # 2) Score each test row
    scores = []
    for _, row in test_df.iterrows():
        s = 0.0
        # Only consider features present in both distributions
        for col in P_synth.keys() & P_test.keys():
            v = row[col]
            ps = P_synth[col].get(v, 0.0)
            pt = P_test[col].get(v, 0.0)
            s += abs(ps - pt)
        scores.append(s)

    return np.array(scores)


def evaluate_attack(
    scores: np.ndarray,
    labels: pd.Series,
    threshold: float = None
) -> dict:
    """
    Given per-row scores and true labels ('yes'/'no'), compute metrics.
    If threshold is None, use the median of the scores.
    """
    y_scores = np.array(scores)
    y_true = np.array([1 if l == "yes" else 0 for l in labels])

    if threshold is None:
        threshold = np.median(y_scores)

    y_pred = (y_scores >= threshold).astype(int)

    return {
        "Accuracy": accuracy_score(y_true, y_pred),
        "Precision": precision_score(y_true, y_pred, zero_division=0),
        "Recall": recall_score(y_true, y_pred, zero_division=0),
        "F1": f1_score(y_true, y_pred, zero_division=0),
        "AUC": roc_auc_score(y_true, y_scores),
        "ASR": float((y_pred == y_true).mean()),
    }


def run_mode_collapse_attack() -> dict:
    """
    Runs the fixed Mode Collapse attack across all synthetic vs. test dataset pairs.
    Returns a dict mapping "synth.csv vs test.csv" -> metrics dict.
    """
    results = {}

    for synth_file in SYNTHETIC_FILES:
        synth_path = os.path.join(FOLDER_PATH, synth_file)
        synth_df = pd.read_csv(synth_path)

        for test_file in TEST_FILES:
            test_path = os.path.join(FOLDER_PATH, test_file)
            test_df = pd.read_csv(test_path)

            key = f"{synth_file} vs {test_file}"
            print(f"🔍 Running Mode Collapse Attack on: {key}")

            # Align columns between synthetic and test (drop label)
            common = [
                c for c in test_df.columns
                if c in synth_df.columns and c != LABEL_COLUMN
            ]
            test_subset = test_df[common + [LABEL_COLUMN]].dropna(subset=[LABEL_COLUMN])
            synth_subset = synth_df[common].copy()

            # Extract labels and feature-only DataFrame
            labels = test_subset[LABEL_COLUMN]
            X_test = test_subset.drop(columns=[LABEL_COLUMN])

            # Compute per-row mode-collapse scores
            scores = compute_mode_collapse_scores(synth_subset, X_test)

            # Evaluate
            metrics = evaluate_attack(scores, labels)
            results[key] = metrics

    return results


def print_results(results: dict):
    """Nicely prints the attack metrics."""
    for pair, metrics in results.items():
        print(f"\n==== Results for {pair} ====")
        for name, val in metrics.items():
            print(f"{name:>10}: {val:.4f}")
        print("-" * 30)


if __name__ == "__main__":
    attack_results = run_mode_collapse_attack()
    print_results(attack_results)

🔍 Running Mode Collapse Attack on: synthetic_data1.csv vs test_data_with_outliers.csv
🔍 Running Mode Collapse Attack on: synthetic_data1.csv vs test_data_wto_outliers.csv
🔍 Running Mode Collapse Attack on: synthetic_data2.csv vs test_data_with_outliers.csv
🔍 Running Mode Collapse Attack on: synthetic_data2.csv vs test_data_wto_outliers.csv
🔍 Running Mode Collapse Attack on: synthetic_data3.csv vs test_data_with_outliers.csv
🔍 Running Mode Collapse Attack on: synthetic_data3.csv vs test_data_wto_outliers.csv
🔍 Running Mode Collapse Attack on: synthetic_data4.csv vs test_data_with_outliers.csv
🔍 Running Mode Collapse Attack on: synthetic_data4.csv vs test_data_wto_outliers.csv

==== Results for synthetic_data1.csv vs test_data_with_outliers.csv ====
  Accuracy: 0.4375
 Precision: 0.4375
    Recall: 0.4375
        F1: 0.4375
       AUC: 0.4416
       ASR: 0.4375
------------------------------

==== Results for synthetic_data1.csv vs test_data_wto_outliers.csv ====
  Accuracy: 0.5500
 Prec

## Attack 2

This script performs a **Conditional Imbalance Membership Inference Attack** (MIA), specifically targeting synthetic datasets. Its goal is to determine whether a specific test record was used to train the synthetic data generator (i.e., whether the record is a **member** of the training data). Below is a detailed breakdown of how the attack works, following your requested structure.

---

### **1. How it uses the given synthetic data? How does it process it?**

The synthetic data files (`synthetic_data1.csv` to `synthetic_data4.csv`) represent artificially generated data that mimics real data distributions.

The processing steps are as follows:

* Each synthetic file is **loaded** using pandas (`pd.read_csv`).
* The **test files** (`test_data_with_outliers.csv`, `test_data_wto_outliers.csv`) are also loaded. These contain real samples, some labeled with `is_member = yes` (used in training the generator) and others with `is_member = no` (not used).
* For each test record, the attack:

  * Drops the `is_member` label.
  * Looks at specific **conditional columns** (e.g., `drug_013`, `hba1c_result`).
  * For each condition value (e.g., `hba1c_result = 'High'`), it **filters** the synthetic data to get matching records.
  * Within that subset, it compares the **numeric features** (`lab_test_count`, `stay_duration_days`) of the synthetic records to the value in the test record.

This is done for **each row** in the test set, so the synthetic dataset is repeatedly queried and filtered based on the conditions.

---

### **2. Against what does it compare the synthetic data metrics, values, or parameters to make predictions?**

The attack compares **each individual test row** against the **synthetic data distribution** that matches on the conditional feature.

Let’s make this concrete:

* Suppose a test row has `hba1c_result = "High"` and `lab_test_count = 4`.
* The attack:

  * Finds all synthetic rows where `hba1c_result == "High"`.
  * Extracts all their `lab_test_count` values.
  * Compares the test value (4) against the synthetic distribution using the **Kolmogorov–Smirnov (KS) statistic**, which measures the difference between distributions.

This is repeated for:

* Every conditional column (`drug_013`, `hba1c_result`)
* Every numeric target column (`lab_test_count`, `stay_duration_days`)

The KS statistic values are averaged to produce a **final score** for each test row. This score indicates how closely that test record resembles the synthetic distribution. A **low score** means the test row matches the synthetic distribution well (possible member), and a **high score** means it doesn’t match well (likely non-member).

Predictions are made by thresholding these scores:

* If the score is **lower than the median**, classify as member.
* If it's **higher**, classify as non-member.

The predicted labels are then evaluated against the true labels using:

* Accuracy, Precision, Recall, F1 Score, AUC (ROC), and ASR (Attack Success Rate).

---

### **3. Extremely simple overview of the attack**

This attack checks:

> "Does this test record *fit too well* with the synthetic data when we look at specific conditions and outcomes?"

If it fits the synthetic data *too closely* under certain conditions (e.g., "patients who took drug\_013 = 1 have stay\_duration\_days = 5"), it’s probably a **member** — i.e., the synthetic model saw it during training.

So for every test row, the attack:

* Looks at conditional values like drug prescriptions or test results.
* Filters the synthetic data by these conditions.
* Compares the test value against that subset using a statistical test (KS-test).
* Scores how much the test row stands out.
* Guesses if it was in the training set (member) or not based on that score.

In [5]:
import os
import numpy as np
import pandas as pd
from sklearn.metrics import (
    roc_auc_score,
    accuracy_score,
    precision_score,
    recall_score,
    f1_score
)
from scipy.stats import ks_2samp

# ─── Configuration ──────────────────────────────────────────────────────────────
FOLDER_PATH = "dataset/MIA_SDG_Exercise"
SYNTHETIC_FILES = [f"synthetic_data{i}.csv" for i in range(1, 5)]
TEST_FILES = ["test_data_with_outliers.csv", "test_data_wto_outliers.csv"]
LABEL_COLUMN = "is_member"

CONDITION_COLUMNS = ["drug_013", "hba1c_result"]
NUMERIC_TARGETS    = ["lab_test_count", "stay_duration_days"]

# ─── Conditional Imbalance Attack ───────────────────────────────────────────────
def compute_conditional_imbalance_scores(
    synth_df: pd.DataFrame,
    test_df: pd.DataFrame
) -> np.ndarray:
    """
    For each test row, if its condition value is rare in the test set,
    compute a KS‐statistic between the synthetic values for that condition
    and the single test value (repeated). Average across conditions & targets.
    Returns an array of length len(test_df).
    """
    # 1) Identify rare condition values from the test set
    rare_values = {}
    for col in CONDITION_COLUMNS:
        if col in test_df.columns:
            vc = test_df[col].value_counts(normalize=True)
            rare_values[col] = vc[vc < 0.2].index.tolist()

    # 2) Score each row
    scores = []
    for _, row in test_df.iterrows():
        total_stat = 0.0
        count = 0
        for cond_col in CONDITION_COLUMNS:
            if cond_col not in row or cond_col not in synth_df.columns:
                continue
            val = row[cond_col]
            if val not in rare_values.get(cond_col, []):
                continue

            # pull synthetic subset for this condition value
            subset = synth_df[synth_df[cond_col] == val]
            for target in NUMERIC_TARGETS:
                if target in subset.columns and target in row.index:
                    synth_vals = subset[target].dropna()
                    test_val   = row[target]
                    if pd.notna(test_val) and len(synth_vals) > 5:
                        stat, _ = ks_2samp(synth_vals, [test_val] * len(synth_vals))
                        total_stat += stat
                        count += 1

        # average over all computed stats, or zero if none applied
        scores.append(total_stat / count if count > 0 else 0.0)

    return np.array(scores)


def evaluate_attack(
    scores: np.ndarray,
    labels: pd.Series,
    threshold: float = None
) -> dict:
    """
    Compute standard metrics given per‐row scores and true 'yes'/'no' labels.
    If threshold is None, use the median score.
    """
    y_scores = np.array(scores)
    y_true   = np.array([1 if l == "yes" else 0 for l in labels])

    if threshold is None:
        threshold = np.median(y_scores)

    y_pred = (y_scores >= threshold).astype(int)

    return {
        "Accuracy" : accuracy_score(y_true, y_pred),
        "Precision": precision_score(y_true, y_pred, zero_division=0),
        "Recall"   : recall_score(y_true, y_pred, zero_division=0),
        "F1"       : f1_score(y_true, y_pred, zero_division=0),
        "AUC"      : roc_auc_score(y_true, y_scores),
        "ASR"      : float((y_pred == y_true).mean())
    }


def run_conditional_imbalance_attack() -> dict:
    """
    Run the fixed Conditional Imbalance attack on all synth/test pairs.
    Returns a mapping "synth.csv vs test.csv" -> metrics dict.
    """
    results = {}

    for synth_file in SYNTHETIC_FILES:
        synth_path = os.path.join(FOLDER_PATH, synth_file)
        synth_df   = pd.read_csv(synth_path)

        for test_file in TEST_FILES:
            test_path = os.path.join(FOLDER_PATH, test_file)
            test_df   = pd.read_csv(test_path)

            if LABEL_COLUMN not in test_df.columns:
                print(f"⚠️  Skipping {test_file}: no '{LABEL_COLUMN}' column.")
                continue

            key = f"{synth_file} vs {test_file}"
            print(f"🔍 Running Conditional Imbalance Attack on: {key}")

            # Align columns (drop the label from features)
            common_cols   = [c for c in test_df.columns
                             if c in synth_df.columns and c != LABEL_COLUMN]
            test_subset   = test_df[common_cols + [LABEL_COLUMN]].dropna(subset=[LABEL_COLUMN])
            synth_subset  = synth_df[common_cols].copy()

            labels = test_subset[LABEL_COLUMN]
            X_test = test_subset.drop(columns=[LABEL_COLUMN])

            # Compute per‐row conditional imbalance scores
            scores = compute_conditional_imbalance_scores(synth_subset, X_test)

            # Evaluate and store
            results[key] = evaluate_attack(scores, labels)

    return results


def print_results(results: dict):
    """Print the results for each dataset pair."""
    for pair, metrics in results.items():
        print(f"\n==== Results for {pair} ====")
        for name, val in metrics.items():
            print(f"{name:>10}: {val:.4f}")
        print("-" * 30)


if __name__ == "__main__":
    attack_results = run_conditional_imbalance_attack()
    print_results(attack_results)

🔍 Running Conditional Imbalance Attack on: synthetic_data1.csv vs test_data_with_outliers.csv
🔍 Running Conditional Imbalance Attack on: synthetic_data1.csv vs test_data_wto_outliers.csv
🔍 Running Conditional Imbalance Attack on: synthetic_data2.csv vs test_data_with_outliers.csv
🔍 Running Conditional Imbalance Attack on: synthetic_data2.csv vs test_data_wto_outliers.csv
🔍 Running Conditional Imbalance Attack on: synthetic_data3.csv vs test_data_with_outliers.csv
🔍 Running Conditional Imbalance Attack on: synthetic_data3.csv vs test_data_wto_outliers.csv
🔍 Running Conditional Imbalance Attack on: synthetic_data4.csv vs test_data_with_outliers.csv
🔍 Running Conditional Imbalance Attack on: synthetic_data4.csv vs test_data_wto_outliers.csv

==== Results for synthetic_data1.csv vs test_data_with_outliers.csv ====
  Accuracy: 0.5000
 Precision: 0.5000
    Recall: 1.0000
        F1: 0.6667
       AUC: 0.6293
       ASR: 0.5000
------------------------------

==== Results for synthetic_data1

## Attack 3
This script implements a **Gaussianity-based Membership Inference Attack (MIA)** on synthetic data. It tries to infer whether a real data record was used in training the synthetic data generator, based on how "normal" (in the statistical sense) the generated numeric feature distributions are.

Let’s explain it thoroughly using your three requested points:

---

### **1. How it uses the given synthetic data? How does it process it?**

The script uses the synthetic datasets (`synthetic_data1.csv` to `synthetic_data4.csv`) as the **output** of a generative model trained on private (real) data. It assumes that these synthetic datasets **approximate the distributions** of the real training records.

Here’s how it processes them:

* Loads each synthetic dataset (`synth_df`) using `pandas.read_csv`.
* Loads corresponding real-world test data (`test_df`), which contains a label `is_member` marking whether each record was used to train the generator.
* Aligns columns between `synth_df` and `test_df`, removing the label from `synth_df`.
* For **each row** in the test data:

  * Drops the `is_member` label to isolate the features.
  * Uses only the numeric columns.
  * For each numeric column, it:

    * Pulls the column’s values from the synthetic dataset (i.e., the distribution).
    * Runs **normality tests** (Shapiro-Wilk and Anderson-Darling) on that distribution.
    * Uses the test result statistics to compute a **"Gaussianity score"** — a numeric measure of how normally distributed the synthetic data looks in that column.
  * Averages these scores across all numeric columns.

This gives a **score per test row** reflecting how "Gaussian" the synthetic data looks for the features of that row.

---

### **2. Against what does it compare the synthetic data metrics, values, or parameters to make predictions?**

The attack doesn’t compare one test row directly against other test rows. Instead, it compares **how well the synthetic data fits a normal distribution** for the numeric features present in the test row.

Here’s the key logic:

* For each test row, we compute a Gaussianity score based on **the synthetic distribution** of numeric features relevant to that row.
* The **core idea**: If the synthetic generator overfit some training data, its generated values will **look less like Gaussian noise** and more like sharp memorized patterns. So the **less Gaussian** the synthetic feature distributions appear (especially in columns where the test row has a value), the more likely it is that the test row **was used during training**.
* These scores are collected for all test rows and thresholded (median by default) to classify them as **"yes" (member)** or **"no" (non-member)**.

Predictions are evaluated using:

* Accuracy, Precision, Recall, F1 score, AUC, and ASR (Attack Success Rate).

---

### **3. Extremely simple overview of the attack**

Think of it like this:

> "If the synthetic data is **not very noisy** and seems to follow **non-Gaussian patterns**, it probably reflects the real data it was trained on."

So for each real test record:

* The attacker looks at the synthetic values for each numeric column.
* Measures how "Gaussian" (normally distributed) these synthetic values are.
* If the synthetic values **deviate from a normal shape**, that could mean the generator memorized specific values.
* If this happens in columns related to the test record, it may mean the record was **in the training set** (i.e., a **member**).

The attack **doesn’t look at the test row’s actual values** beyond using them to decide which columns to analyze. Instead, it checks how "overfit" the generator appears, assuming training data would make the synthetic output **less Gaussian**.

---

### 🔍 Bonus: Why this attack can work

Generative models like CTGAN or Gaussian copulas often try to add randomness to mimic real data. But if they accidentally memorize the training data, the generated distributions become:

* Too sharp
* Too flat
* Not noisy enough

These flaws can be caught using statistical tests for **normality**, and that’s what this attack exploits.

Let me know if you want to visualize how Shapiro and Anderson behave on synthetic distributions or want to combine this with other attacks like mode collapse or conditional imbalance.


In [6]:
import os
import numpy as np
import pandas as pd
from sklearn.metrics import (
    roc_auc_score,
    accuracy_score,
    precision_score,
    recall_score,
    f1_score
)

# ─── Configuration ──────────────────────────────────────────────────────────────
FOLDER_PATH     = "dataset/MIA_SDG_Exercise"
SYNTHETIC_FILES = [f"synthetic_data{i}.csv" for i in range(1, 5)]
TEST_FILES      = ["test_data_with_outliers.csv", "test_data_wto_outliers.csv"]
LABEL_COLUMN    = "is_member"

# Which numeric columns to use in the Gaussianity attack
NUMERIC_COLUMNS = [
    "lab_test_count",
    "medication_count",
    "stay_duration_days",
    "inpatient_visits",
    "diagnosis_count",
    "procedure_count",
    "entry_type_code",
    "entry_origin_code",
    "outpatient_visits",
    "emergency_visits",
    "exit_status_code"
]

# ─── Gaussianity‐Based Attack ────────────────────────────────────────────────────
def compute_gaussianity_scores(
    synth_df: pd.DataFrame,
    test_df: pd.DataFrame,
    numeric_cols: list
) -> np.ndarray:
    """
    For each row in test_df, compute a per-row Gaussian‐deviation score:
      1. Fit a normal distribution (µ, σ) on each numeric column in synth_df.
      2. For each test-row value x_i, compute z = |x_i - µ|/σ.
      3. Average those z-scores across numeric_cols.
    Returns an array of length len(test_df).
    """
    # 1) Precompute µ and σ on the synthetic distribution
    mu    = synth_df[numeric_cols].mean()
    sigma = synth_df[numeric_cols].std(ddof=0)  # population std
    # Avoid division by zero
    sigma = sigma.replace(0, np.finfo(float).eps)

    # 2) Score each row
    scores = []
    for _, row in test_df.iterrows():
        z_scores = []
        for col in numeric_cols:
            if col in row and pd.notna(row[col]):
                z = abs(row[col] - mu[col]) / sigma[col]
                z_scores.append(z)
        scores.append(np.mean(z_scores) if z_scores else 0.0)
    return np.array(scores)


def evaluate_attack(
    scores: np.ndarray,
    labels: pd.Series,
    threshold: float = None
) -> dict:
    """
    Given per-row scores and true 'yes'/'no' labels, compute standard metrics.
    If threshold is None, use the median of the scores.
    """
    y_scores = np.array(scores)
    y_true   = np.array([1 if l == "yes" else 0 for l in labels])

    if threshold is None:
        threshold = np.median(y_scores)

    y_pred = (y_scores >= threshold).astype(int)

    return {
        "Accuracy" : accuracy_score(y_true, y_pred),
        "Precision": precision_score(y_true, y_pred, zero_division=0),
        "Recall"   : recall_score(y_true, y_pred, zero_division=0),
        "F1"       : f1_score(y_true, y_pred, zero_division=0),
        "AUC"      : roc_auc_score(y_true, y_scores),
        "ASR"      : float((y_pred == y_true).mean())
    }


def run_gaussianity_attack() -> dict:
    """
    Run the fixed Gaussianity attack across all synthetic vs. test dataset pairs.
    Returns a dict mapping "synth.csv vs test.csv" -> metrics dict.
    """
    results = {}

    for synth_file in SYNTHETIC_FILES:
        synth_df = pd.read_csv(os.path.join(FOLDER_PATH, synth_file))

        for test_file in TEST_FILES:
            test_df = pd.read_csv(os.path.join(FOLDER_PATH, test_file))
            if LABEL_COLUMN not in test_df.columns:
                print(f"⚠️ Skipping {test_file}: no '{LABEL_COLUMN}'")
                continue

            key = f"{synth_file} vs {test_file}"
            print(f"🔍 Running Gaussianity Attack on: {key}")

            # Align columns (drop the label from features)
            common_cols  = [
                c for c in test_df.columns
                if c in synth_df.columns and c != LABEL_COLUMN
            ]
            test_subset  = test_df[common_cols + [LABEL_COLUMN]].dropna(subset=[LABEL_COLUMN])
            synth_subset = synth_df[common_cols].copy()

            # Extract labels and feature-only DataFrame
            labels = test_subset[LABEL_COLUMN]
            X_test = test_subset.drop(columns=[LABEL_COLUMN])

            # Determine which numeric columns are actually present
            num_cols = [c for c in NUMERIC_COLUMNS if c in common_cols]

            # Compute per-row Gaussianity scores
            scores = compute_gaussianity_scores(synth_subset, X_test, num_cols)

            # Evaluate and store
            results[key] = evaluate_attack(scores, labels)

    return results


def print_results(results: dict):
    """Nicely prints the attack metrics."""
    for pair, metrics in results.items():
        print(f"\n==== Results for {pair} ====")
        for name, val in metrics.items():
            print(f"{name:>10}: {val:.4f}")
        print("-" * 30)


if __name__ == "__main__":
    attack_results = run_gaussianity_attack()
    print_results(attack_results)

🔍 Running Gaussianity Attack on: synthetic_data1.csv vs test_data_with_outliers.csv
🔍 Running Gaussianity Attack on: synthetic_data1.csv vs test_data_wto_outliers.csv
🔍 Running Gaussianity Attack on: synthetic_data2.csv vs test_data_with_outliers.csv
🔍 Running Gaussianity Attack on: synthetic_data2.csv vs test_data_wto_outliers.csv
🔍 Running Gaussianity Attack on: synthetic_data3.csv vs test_data_with_outliers.csv
🔍 Running Gaussianity Attack on: synthetic_data3.csv vs test_data_wto_outliers.csv
🔍 Running Gaussianity Attack on: synthetic_data4.csv vs test_data_with_outliers.csv
🔍 Running Gaussianity Attack on: synthetic_data4.csv vs test_data_wto_outliers.csv

==== Results for synthetic_data1.csv vs test_data_with_outliers.csv ====
  Accuracy: 0.9375
 Precision: 0.9375
    Recall: 0.9375
        F1: 0.9375
       AUC: 0.9748
       ASR: 0.9375
------------------------------

==== Results for synthetic_data1.csv vs test_data_wto_outliers.csv ====
  Accuracy: 0.5500
 Precision: 0.5500
  

## Attack 4

This attack exploits **clustering bias**: if a synthetic data generator has memorized or over-fit its training records, then including one of those records alongside the synthetic samples will **change the clustering structure** in a detectable way.

---

### 1. How it uses the given synthetic data

1. **Load synthetic datasets**  
   Each `synthetic_dataN.csv` is read into a DataFrame `synth_df`.  
2. **Select numeric features**  
   A predefined list `NUMERIC_COLUMNS` (e.g. `lab_test_count`, `stay_duration_days`, etc.) is used.  
3. **For each test row**  
   - Drop the `"is_member"` label to isolate feature values.  
   - Create a one-row DataFrame `row_df` containing just those numeric features.  
4. **Combine**  
   - Vertically concatenate the full `synth_df[NUMERIC_COLUMNS]` with `row_df` ⇒ `df_combined`.  
   - This puts the single test record “in the same space” as all synthetic samples.

---

### 2. Scoring Mechanism: Clustering + Silhouette

1. **Standardize**  
   ```python
   X = StandardScaler().fit_transform(df_combined.values)
   ```

Centers each feature to mean 0 and scales to unit variance, so clustering isn’t dominated by scale differences.

2. **K-means clustering**

   ```python
   kmeans = KMeans(n_clusters=2, n_init=10, random_state=42)
   labels = kmeans.fit_predict(X)
   ```

   Partitions the combined data into 2 clusters. One cluster may “capture” the test record if it is too similar to memorized samples.

3. **Silhouette score**

   ```python
   score = silhouette_score(X, labels)
   ```

   Measures **cohesion vs. separation**: high score → clusters are tight and well separated.

   * If inserting a **member** record causes the synthetic cloud to split unnaturally, silhouette will change.
   * If inserting a **non-member** record just sits in noise, clustering structure is less disrupted.

4. **Per-row score**

   * Repeat for each test record ⇒ array of silhouette scores, one per row.

---

### 3. Prediction & Evaluation

1. **Thresholding**

   * Default: median of all per-row silhouette scores
   * Records with score ≥ threshold ⇒ predicted **member**
   * Otherwise ⇒ **non-member**

2. **Metrics**

   * **Accuracy**, **Precision**, **Recall**, **F1**
   * **AUC** (ROC area under curve using raw scores)
   * **ASR** (Attack Success Rate = fraction of correct predictions)

---

### 4. Intuition & Limitations

* **Intuition**

  * A genuine training record, when added, will sit “inside” a high-density region of synthetic samples, potentially splitting that region into two clusters and **raising** the silhouette score.
  * An out-of-training (non-member) record will behave like an outlier or sit near the edge, having **less impact** on cluster compactness.

* **Limitations**

  * **Choice of k=2** is heuristic—more clusters or a different clustering method might be needed for complex data.
  * **Silhouette sensitivity**: depends on global data geometry; noise or uneven densities can reduce signal.
  * **High dimensionality**: many numeric features may dilute clustering effects unless carefully standardized or dimensionality-reduced first.




In [9]:
import os
import numpy as np
import pandas as pd
from collections import defaultdict
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score, accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from sklearn.preprocessing import StandardScaler

FOLDER_PATH = "dataset/MIA_SDG_Exercise"
SYNTHETIC_FILES = [f"synthetic_data{i}.csv" for i in range(1, 5)]
TEST_FILES = ["test_data_with_outliers.csv", "test_data_wto_outliers.csv"]
LABEL_COLUMN = "is_member"

NUMERIC_COLUMNS = [
    "lab_test_count", "medication_count", "stay_duration_days", "inpatient_visits",
    "diagnosis_count", "procedure_count", "entry_type_code", "entry_origin_code",
    "outpatient_visits", "emergency_visits", "exit_status_code"
]

# --- Evaluation Helper --- #
def evaluate_attack(scores, labels, threshold=None):
    scores = np.array(scores)
    true_labels = np.array([1 if l == "yes" else 0 for l in labels])

    if threshold is None:
        threshold = np.median(scores)
    preds = (scores >= threshold).astype(int)

    metrics = {
        "Accuracy": accuracy_score(true_labels, preds),
        "Precision": precision_score(true_labels, preds, zero_division=0),
        "Recall": recall_score(true_labels, preds, zero_division=0),
        "F1": f1_score(true_labels, preds, zero_division=0),
        "AUC": roc_auc_score(true_labels, scores),
        "ASR": float(np.mean(preds == true_labels))
    }
    return metrics

# --- Main Score Function --- #
def compute_clustering_bias_score(synth_df, test_row, cols, n_clusters=2):
    # Combine test_row + synthetic sample space
    row_df = pd.DataFrame([test_row[cols]])
    df_combined = pd.concat([synth_df[cols], row_df], ignore_index=True).dropna()

    if df_combined.shape[0] < n_clusters + 1:
        return 0.0

    try:
        X = StandardScaler().fit_transform(df_combined.values)
        kmeans = KMeans(n_clusters=n_clusters, n_init=10, random_state=42)
        cluster_labels = kmeans.fit_predict(X)
        score = silhouette_score(X, cluster_labels)
        return score
    except Exception:
        return 0.0

# --- Attack Execution --- #
def run_reconstruction_bias_attack():
    results = defaultdict(dict)

    for synth_file in SYNTHETIC_FILES:
        synth_path = os.path.join(FOLDER_PATH, synth_file)
        synth_df = pd.read_csv(synth_path)

        for test_file in TEST_FILES:
            test_path = os.path.join(FOLDER_PATH, test_file)
            test_df = pd.read_csv(test_path)

            key = f"{synth_file} vs {test_file}"
            print(f"\n🔍 Running Reconstruction Bias Attack on: {key}")

            common_cols = [col for col in test_df.columns if col in synth_df.columns and col != LABEL_COLUMN]
            test_subset = test_df[common_cols + [LABEL_COLUMN]].dropna(subset=[LABEL_COLUMN])
            synth_subset = synth_df[common_cols].copy()

            all_labels = test_subset[LABEL_COLUMN].tolist()
            scores = []

            for _, row in test_subset.iterrows():
                score = compute_clustering_bias_score(synth_subset, row, NUMERIC_COLUMNS, n_clusters=2)
                scores.append(score)

            metrics = evaluate_attack(scores, all_labels)
            results[key]["Reconstruction Bias"] = metrics

    return results

# --- Print Results --- #
def print_results(results):
    for pair, attacks in results.items():
        print(f"\n==== Results for {pair} ====")
        for attack_name, metrics in attacks.items():
            print(f"Attack: {attack_name}")
            for metric, val in metrics.items():
                print(f"{metric:>10}: {val:.4f}")
            print("-" * 30)

if __name__ == "__main__":
    attack_results = run_reconstruction_bias_attack()
    print_results(attack_results)



🔍 Running Reconstruction Bias Attack on: synthetic_data1.csv vs test_data_with_outliers.csv

🔍 Running Reconstruction Bias Attack on: synthetic_data1.csv vs test_data_wto_outliers.csv

🔍 Running Reconstruction Bias Attack on: synthetic_data2.csv vs test_data_with_outliers.csv

🔍 Running Reconstruction Bias Attack on: synthetic_data2.csv vs test_data_wto_outliers.csv

🔍 Running Reconstruction Bias Attack on: synthetic_data3.csv vs test_data_with_outliers.csv

🔍 Running Reconstruction Bias Attack on: synthetic_data3.csv vs test_data_wto_outliers.csv

🔍 Running Reconstruction Bias Attack on: synthetic_data4.csv vs test_data_with_outliers.csv

🔍 Running Reconstruction Bias Attack on: synthetic_data4.csv vs test_data_wto_outliers.csv

==== Results for synthetic_data1.csv vs test_data_with_outliers.csv ====
Attack: Reconstruction Bias
  Accuracy: 0.8542
 Precision: 0.8542
    Recall: 0.8542
        F1: 0.8542
       AUC: 0.8490
       ASR: 0.8542
------------------------------

==== Results

## Attack 5

This attack exploits how a synthetic data generator may inadvertently **preserve or distort feature dependencies** from its training data. By measuring, for each test record, how much the relationship between a categorical feature and a numeric target in the synthetic data deviates from that same relationship in the real (test) data, we can infer whether that record was likely used in training.

---

## 1. Goal

- **Membership Inference**: Decide, for each test record, whether it was part of the original training set that produced the synthetic data.
- **Key Idea**: If the generator overfit, it will reproduce certain category–numeric dependencies very closely for training members, while generalizing differently for non-members.

---

## 2. Data Inputs

- **Synthetic datasets**: Four files (`synthetic_data1.csv` … `synthetic_data4.csv`), each produced by a different private-data synthesizer.
- **Test datasets**: Two files:
  - `test_data_with_outliers.csv` (outlier‐filtered samples)
  - `test_data_wto_outliers.csv` (random samples)
- Each test row has a ground-truth label in column `is_member` (`"yes"`/`"no"`).

---

## 3. Feature Pairs

We focus on a small set of categorical–numeric pairs believed to carry informative dependencies:

| Categorical Feature    | Numeric Target      |
|------------------------|---------------------|
| `primary_diagnosis`    | `procedure_count`   |
| `secondary_diagnosis`  | `exit_status_code`  |
| `age_range`            | `medication_count`  |
| `ethnic_group`         | `inpatient_visits`  |

---

## 4. Attack Workflow

### 4.1 Precompute Group Statistics

For each `(cat, num)` pair:

1. **On the synthetic data**:
   - Group by the categorical column.
   - Compute the **mean** and **standard deviation** of the numeric target within each category.
2. **On the test data**:
   - Do the same grouping and statistics.

We store two dictionaries:
- `synth_stats[(cat,num)] = (means_synth, stds_synth)`
- `test_stats[(cat,num)]  = (means_test,  stds_test)`

### 4.2 Per-Row Score Computation

For each test record _r_:

1. Initialize `total_diff = 0`, `count = 0`.
2. For each `(cat, num)` pair where:
   - Both `cat` and `num` exist in the data.
   - The record’s value `c = r[cat]` and `y = r[num]` are non-missing.
   - We have precomputed stats for category `c` in both synthetic and test data.
3. Compute two standardized deviations:
   - \( z_\mathrm{synth} = \bigl|y - \mu^\mathrm{synth}_{c}\bigr| / \sigma^\mathrm{synth}_{c} \)
   - \( z_\mathrm{test}  = \bigl|y - \mu^\mathrm{test}_{c}\bigr|  / \sigma^\mathrm{test}_{c} \)
4. Add the absolute difference \( \lvert z_\mathrm{synth} - z_\mathrm{test}\rvert \) to `total_diff`, increment `count`.
5. Final per‐row score =
   \[
     \text{score}(r) \;=\; 
     \begin{cases}
       \dfrac{\text{total_diff}}{\text{count}}, & \text{if count}>0,\\
       0, & \text{otherwise.}
     \end{cases}
   \]

---

## 5. Decision Rule

- Collect all per-row scores into an array `scores`.
- Choose a threshold \( T \) (default: **median** of `scores`).
- Predict **member** if `score ≥ T`, else **non-member**.

---

## 6. Evaluation Metrics

For each synthetic vs. test pair, we compute:

- **Accuracy**: fraction of correct predictions.
- **Precision**: TP / (TP + FP).
- **Recall**: TP / (TP + FN).
- **F1-score**: harmonic mean of Precision & Recall.
- **AUC**: area under the ROC curve, using the continuous `scores`.
- **ASR** (Attack Success Rate): same as accuracy.

---

## 7. Intuition & Rationale

- **Overfitting signature**: A generator that memorizes training records tends to reproduce their numeric values exactly (or very close) given their categorical context, leading to **smaller deviations** in the synthetic distribution than in held-out data.
- **Relative comparison**: By contrasting how “far” a value lies from its category mean in both synthetic and test data (via z-scores), we capture whether a record fits *too well* the synthetic model (likely a member) or not (likely a non-member).

---

## 8. Strengths & Limitations

| Strengths                                         | Limitations                                            |
|---------------------------------------------------|--------------------------------------------------------|
| ✔️  Leverages simple, interpretable statistics     | ⚠️  Requires each category to appear often enough      |
| ✔️  Doesn’t need complex model training            | ⚠️  Sensitive to noisy or low-variance categories      |
| ✔️  Easily extends to more feature pairs           | ⚠️  May underperform if synthetic generator smooths out dependencies excessively |

---

> **Next steps**:  
> - Experiment with different `(cat, num)` pairs or continuous–continuous dependency measures (e.g. Spearman correlation).  
> - Blend this attack with marginal‐divergence or clustering‐based attacks for a more robust ensemble.  
> - Tune the threshold \( T \) (e.g. optimize F1 on a validation split) instead of using the raw median.  


In [10]:
import os
import numpy as np
import pandas as pd
from sklearn.metrics import (
    roc_auc_score,
    accuracy_score,
    precision_score,
    recall_score,
    f1_score
)

# ─── Configuration ──────────────────────────────────────────────────────────────
FOLDER_PATH     = "dataset/MIA_SDG_Exercise"
SYNTHETIC_FILES = [f"synthetic_data{i}.csv" for i in range(1, 5)]
TEST_FILES      = ["test_data_with_outliers.csv", "test_data_wto_outliers.csv"]
LABEL_COLUMN    = "is_member"

# Pairs of (categorical, numeric) features for dependency-based scoring
DEPENDENT_PAIRS = [
    ("primary_diagnosis",   "procedure_count"),
    ("secondary_diagnosis", "exit_status_code"),
    ("age_range",           "medication_count"),
    ("ethnic_group",        "inpatient_visits"),
]

# ─── Core Functions ─────────────────────────────────────────────────────────────

def compute_dependency_scores(
    synth_df: pd.DataFrame,
    test_df: pd.DataFrame
) -> np.ndarray:
    """
    For each (cat, num) pair, compute per-category mean/std on both synth and test.
    Then for each test row, for each pair:
      z_synth = |y - μ_synth[x]| / σ_synth[x]
      z_test  = |y - μ_test[x]|  / σ_test[x]
      score += |z_synth - z_test|
    Final per-row score = average over all valid pairs.
    """
    # 1) Precompute group stats
    synth_stats = {}
    test_stats  = {}
    eps = np.finfo(float).eps

    for cat, num in DEPENDENT_PAIRS:
        if cat in synth_df.columns and num in synth_df.columns:
            grp = synth_df.groupby(cat)[num]
            means = grp.mean().to_dict()
            stds  = grp.std(ddof=0).replace(0, eps).to_dict()
            synth_stats[(cat, num)] = (means, stds)

        if cat in test_df.columns and num in test_df.columns:
            grp = test_df.groupby(cat)[num]
            means = grp.mean().to_dict()
            stds  = grp.std(ddof=0).replace(0, eps).to_dict()
            test_stats[(cat, num)] = (means, stds)

    # 2) Score each test row
    scores = []
    for _, row in test_df.iterrows():
        total, count = 0.0, 0
        for cat, num in DEPENDENT_PAIRS:
            key = (cat, num)
            if key not in synth_stats or key not in test_stats:
                continue
            if pd.isna(row.get(cat)) or pd.isna(row.get(num)):
                continue

            x_val = row[cat]
            y_val = row[num]

            synth_means, synth_stds = synth_stats[key]
            test_means,  test_stds  = test_stats[key]

            if x_val not in synth_means or x_val not in test_means:
                continue

            z_s = abs(y_val - synth_means[x_val]) / synth_stds[x_val]
            z_t = abs(y_val - test_means[x_val])  / test_stds[x_val]
            total += abs(z_s - z_t)
            count += 1

        scores.append(total / count if count > 0 else 0.0)

    return np.array(scores)


def evaluate_attack(
    scores: np.ndarray,
    labels: pd.Series,
    threshold: float = None
) -> dict:
    """
    Compute Accuracy, Precision, Recall, F1, AUC, and ASR given per-row scores
    and true 'yes'/'no' labels. Uses median threshold if none provided.
    """
    y_scores = np.array(scores)
    y_true   = np.array([1 if l == "yes" else 0 for l in labels])

    if threshold is None:
        threshold = np.median(y_scores)

    y_pred = (y_scores >= threshold).astype(int)

    return {
        "Accuracy" : accuracy_score(y_true, y_pred),
        "Precision": precision_score(y_true, y_pred, zero_division=0),
        "Recall"   : recall_score(y_true, y_pred, zero_division=0),
        "F1"       : f1_score(y_true, y_pred, zero_division=0),
        "AUC"      : roc_auc_score(y_true, y_scores),
        "ASR"      : float((y_pred == y_true).mean())
    }


def run_dependency_attack() -> dict:
    """
    Runs the fixed Overfitting Dependency attack on all synthetic/test pairs.
    Returns a dict mapping "synth.csv vs test.csv" -> metrics dict.
    """
    results = {}

    for synth_file in SYNTHETIC_FILES:
        synth_path = os.path.join(FOLDER_PATH, synth_file)
        synth_df   = pd.read_csv(synth_path)

        for test_file in TEST_FILES:
            test_path = os.path.join(FOLDER_PATH, test_file)
            test_df   = pd.read_csv(test_path)

            if LABEL_COLUMN not in test_df.columns:
                print(f"⚠️  Skipping {test_file}: no '{LABEL_COLUMN}' column.")
                continue

            key = f"{synth_file} vs {test_file}"
            print(f"🔍 Running Overfitting Dependency Attack on: {key}")

            # Align columns & split off labels
            common_cols   = [
                c for c in test_df.columns
                if c in synth_df.columns and c != LABEL_COLUMN
            ]
            subset_test   = test_df[common_cols + [LABEL_COLUMN]].dropna(subset=[LABEL_COLUMN])
            subset_synth  = synth_df[common_cols].copy()

            labels = subset_test[LABEL_COLUMN]
            X_test = subset_test.drop(columns=[LABEL_COLUMN])

            # Compute per-row dependency scores
            scores = compute_dependency_scores(subset_synth, X_test)

            # Evaluate and store
            results[key] = evaluate_attack(scores, labels)

    return results


def print_results(results: dict):
    """Prints the metrics for each dataset pair."""
    for pair, metrics in results.items():
        print(f"\n==== Results for {pair} ====")
        for name, val in metrics.items():
            print(f"{name:>10}: {val:.4f}")
        print("-" * 30)


if __name__ == "__main__":
    attack_results = run_dependency_attack()
    print_results(attack_results)

🔍 Running Overfitting Dependency Attack on: synthetic_data1.csv vs test_data_with_outliers.csv
🔍 Running Overfitting Dependency Attack on: synthetic_data1.csv vs test_data_wto_outliers.csv
🔍 Running Overfitting Dependency Attack on: synthetic_data2.csv vs test_data_with_outliers.csv
🔍 Running Overfitting Dependency Attack on: synthetic_data2.csv vs test_data_wto_outliers.csv
🔍 Running Overfitting Dependency Attack on: synthetic_data3.csv vs test_data_with_outliers.csv
🔍 Running Overfitting Dependency Attack on: synthetic_data3.csv vs test_data_wto_outliers.csv
🔍 Running Overfitting Dependency Attack on: synthetic_data4.csv vs test_data_with_outliers.csv
🔍 Running Overfitting Dependency Attack on: synthetic_data4.csv vs test_data_wto_outliers.csv

==== Results for synthetic_data1.csv vs test_data_with_outliers.csv ====
  Accuracy: 0.8750
 Precision: 0.8750
    Recall: 0.8750
        F1: 0.8750
       AUC: 0.9423
       ASR: 0.8750
------------------------------

==== Results for synthet

## Attack 6

## 1. Introduction

The Rare-Combo attack is a targeted **membership inference** technique designed to detect whether individual records from a real dataset were used to train a generative model. It exploits the presence (or absence) of **specific rare combinations** of feature values—“combos”—that occur infrequently in the real data. If a synthetic dataset reproduces these rare combos too faithfully, it suggests that those records were memorized by the generator.

---

## 2. Attack Motivation

- **Generative models** sometimes overfit to rare, idiosyncratic slices of their training data.  
- Rare feature combinations (e.g. a specific drug change flag paired with a particular provider specialty) are unlikely to be learned unless the exact record was included.  
- By checking for these rare combos, we can derive a per-record score reflecting how “surprising” each test sample is, relative to the synthetic output.

---

## 3. Data & Assumptions

1. **Synthetic datasets**: Four CSV files (`synthetic_data1.csv` … `synthetic_data4.csv`), each generated by a different synthesizer (CTGAN, VAE, Bayesian network, Gaussian copula).  
2. **Test datasets**: Two CSVs:
   - `test_data_with_outliers.csv` (contains outlier records labeled `is_member=yes/no`)  
   - `test_data_wto_outliers.csv` (random sample)  
3. **Ground-truth label**: Each test row has an `is_member` column (“yes” if it was used during training, “no” otherwise).  
4. **Feature space**: We assume all columns in `RARE_COMBOS` exist in both synthetic and test tables (any combos referencing missing columns are dropped).

---

## 4. Defining Rare Combos

We predefine a small set of rare feature‐value combinations believed to be highly specific:

| Combo # | Feature 1           | Feature 2    | Feature 3                   |
|:-------:|:--------------------|:------------:|:---------------------------:|
| 1       | `med_change_flag=decrease` | `drug_008=constant`   | `exit_status_code=9`          |
| 2       | `drug_015=increase`       | `drug_006=constant`   | `inpatient_visits=4`          |
| 3       | `drug_017=decrease`       | `ethnic_group=Asian`  | `provider_specialty=Orthopedic Surgeon` |

> **Why these combos?**  
> They involve rare drug‐change flags, demographic slices, and outcome codes that appear only in a handful of records. Their joint occurrence is extremely unlikely unless the model memorized those exact rows.

---

## 5. Scoring Methodology

For each synthetic/test dataset pair:

1. **Precompute synthetic frequencies**  
   For each valid combo:
   ```python
   synth_freq = (# of rows in synth_df matching all combo conditions) \
                / (total # of rows in synth_df)


In [12]:
import os
import numpy as np
import pandas as pd
from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    roc_auc_score
)

# ─── Configuration ──────────────────────────────────────────────────────────────
FOLDER_PATH     = "dataset/MIA_SDG_Exercise"
SYNTHETIC_FILES = [f"synthetic_data{i}.csv" for i in range(1, 5)]
TEST_FILES      = ["test_data_with_outliers.csv", "test_data_wto_outliers.csv"]
LABEL_COLUMN    = "is_member"

RARE_COMBOS = [
    {"med_change_flag": "decrease", "drug_008": "constant", "exit_status_code": 9},
    {"drug_015": "increase",  "drug_006": "constant", "inpatient_visits": 4},
    {"drug_017": "decrease",  "ethnic_group": "Asian", "provider_specialty": "Orthopedic Surgeon"},
]

# ─── Helper Functions ────────────────────────────────────────────────────────────
def match_rare_combo(row: pd.Series, combo: dict) -> bool:
    """Return True if the row exactly matches the given combo dict."""
    for col, val in combo.items():
        if col not in row or pd.isna(row[col]) or row[col] != val:
            return False
    return True

def compute_combo_frequency(df: pd.DataFrame, combo: dict) -> float:
    """
    Compute the fraction of rows in df that match combo.
    """
    matches = df.apply(lambda r: match_rare_combo(r, combo), axis=1)
    return float(matches.mean())

def compute_rare_combo_scores(
    synth_df: pd.DataFrame,
    test_df: pd.DataFrame,
    combos: list[dict]
) -> np.ndarray:
    """
    For each test row and each valid combo:
      - precompute synth_freq = fraction of synth_df matching combo
      - indicator = 1 if row matches combo, else 0
      - score_row += |synth_freq - indicator|
    Final score per row = average over combos.
    """
    # Precompute synth_freqs only for combos whose keys exist in synth_df
    valid_combos = [
        combo for combo in combos
        if set(combo.keys()).issubset(synth_df.columns)
    ]
    synth_freqs = [
        compute_combo_frequency(synth_df, combo)
        for combo in valid_combos
    ]
    n = len(valid_combos)
    if n == 0:
        # No valid combos ⇒ zero scores
        return np.zeros(len(test_df), dtype=float)

    # Score each test row
    scores = []
    for _, row in test_df.iterrows():
        s = 0.0
        for combo, freq in zip(valid_combos, synth_freqs):
            indicator = 1.0 if match_rare_combo(row, combo) else 0.0
            s += abs(freq - indicator)
        scores.append(s / n)
    return np.array(scores)

def evaluate_attack(
    scores: np.ndarray,
    labels: pd.Series,
    threshold: float = None
) -> dict:
    """
    Compute metrics given per-row scores and 'yes'/'no' labels.
    If threshold is None, use median(scores).
    """
    y_scores = scores
    y_true   = np.array([1 if l == "yes" else 0 for l in labels])

    if threshold is None:
        threshold = np.median(y_scores)

    y_pred = (y_scores >= threshold).astype(int)
    return {
        "Accuracy" : accuracy_score(y_true, y_pred),
        "Precision": precision_score(y_true, y_pred, zero_division=0),
        "Recall"   : recall_score(y_true, y_pred, zero_division=0),
        "F1"       : f1_score(y_true, y_pred, zero_division=0),
        "AUC"      : roc_auc_score(y_true, y_scores),
        "ASR"      : float((y_pred == y_true).mean())
    }

# ─── Main Attack Runner ─────────────────────────────────────────────────────────
def run_rare_combo_attacks() -> dict:
    """
    Run the fixed Rare-Combo attack on all synthetic/test pairs.
    Returns a dict mapping "synth.csv vs test.csv" -> metrics dict.
    """
    results = {}

    for synth_file in SYNTHETIC_FILES:
        synth_df = pd.read_csv(os.path.join(FOLDER_PATH, synth_file))

        for test_file in TEST_FILES:
            test_df = pd.read_csv(os.path.join(FOLDER_PATH, test_file))
            if LABEL_COLUMN not in test_df.columns:
                print(f"⚠️  Skipping {test_file}: no '{LABEL_COLUMN}' column.")
                continue

            key = f"{synth_file} vs {test_file}"
            print(f"🔍 Running Rare Combo Attack on: {key}")

            # Align columns (drop label)
            common_cols   = [
                c for c in test_df.columns
                if c in synth_df.columns and c != LABEL_COLUMN
            ]
            subset       = test_df[common_cols + [LABEL_COLUMN]].dropna(subset=[LABEL_COLUMN])
            labels       = subset[LABEL_COLUMN]
            X_test       = subset.drop(columns=[LABEL_COLUMN])
            synth_subset = synth_df[common_cols].copy()

            # Compute per-row scores
            scores = compute_rare_combo_scores(synth_subset, X_test, RARE_COMBOS)

            # Evaluate and store
            results[key] = evaluate_attack(scores, labels)

    return results

def print_results(results: dict):
    """Nicely print the attack metrics."""
    for pair, metrics in results.items():
        print(f"\n==== Results for {pair} ====")
        for name, val in metrics.items():
            print(f"{name:>10}: {val:.4f}")
        print("-" * 30)

if __name__ == "__main__":
    attack_results = run_rare_combo_attacks()
    print_results(attack_results)

🔍 Running Rare Combo Attack on: synthetic_data1.csv vs test_data_with_outliers.csv
🔍 Running Rare Combo Attack on: synthetic_data1.csv vs test_data_wto_outliers.csv
🔍 Running Rare Combo Attack on: synthetic_data2.csv vs test_data_with_outliers.csv
🔍 Running Rare Combo Attack on: synthetic_data2.csv vs test_data_wto_outliers.csv
🔍 Running Rare Combo Attack on: synthetic_data3.csv vs test_data_with_outliers.csv
🔍 Running Rare Combo Attack on: synthetic_data3.csv vs test_data_wto_outliers.csv
🔍 Running Rare Combo Attack on: synthetic_data4.csv vs test_data_with_outliers.csv
🔍 Running Rare Combo Attack on: synthetic_data4.csv vs test_data_wto_outliers.csv

==== Results for synthetic_data1.csv vs test_data_with_outliers.csv ====
  Accuracy: 0.5000
 Precision: 0.5000
    Recall: 1.0000
        F1: 0.6667
       AUC: 0.5000
       ASR: 0.5000
------------------------------

==== Results for synthetic_data1.csv vs test_data_wto_outliers.csv ====
  Accuracy: 0.5000
 Precision: 0.5000
    Recall

# Attack 7: Copula-Based Membership Inference Attack

## 1. Overview

This attack exploits differences in both **marginal** and **joint** empirical distributions between a synthetic dataset and a held-out test set to decide whether individual records were used during the generative model’s training.

- **Goal:** For each test record \(x\), compute a numerical score that reflects how “typical” \(x\) is under the synthetic data’s distribution.  
- **Hypothesis:** Records seen during training produce synthetic samples whose joint behavior (across features) deviates less from the real test distribution, whereas unseen records induce larger discrepancies.

## 2. Intuition

1. **Marginal Discrepancy**  
   For each numeric feature \(f\), compare the fraction of synthetic samples \(\{s_i\}\) with \(s_i[f] \le x[f]\) to the fraction of test samples with the same property. A large difference suggests the model distorted that feature’s distribution—potentially by over-fitting to training members.

2. **Joint Discrepancy**  
   For selected feature pairs \((f, g)\), compare
   \[
     \Pr_{\text{synth}}\bigl(s[f] \le x[f],\,s[g]\le x[g]\bigr)
     \quad\text{vs.}\quad
     \Pr_{\text{test}}\bigl(t[f] \le x[f],\,t[g]\le x[g]\bigr).
   \]
   Joint CDF differences capture dependencies (copula structure) that marginals alone miss.

3. **Combined Score**  
   \[
   \text{score}(x)
   = \underbrace{\frac{1}{|\mathcal{J}|}\sum_{(f,g)\in\mathcal{J}}
        \bigl|\Pr_{\text{synth}}(s[f]\le x[f],s[g]\le x[g])
               -\Pr_{\text{test}}(t[f]\le x[f],t[g]\le x[g])\bigr|}
     }_{\text{joint score}}
   \;-\;
   \underbrace{\frac{1}{|\mathcal{M}|}\sum_{f\in\mathcal{M}}
        \bigl|\Pr_{\text{synth}}(s[f]\le x[f])
               -\Pr_{\text{test}}(t[f]\le x[f])\bigr|
     }_{\text{marginal score}}
   \]
   where \(\mathcal{M}\) is the set of marginal columns and \(\mathcal{J}\) the set of joint pairs.

## 3. Methodology

### 3.1 Data Alignment  
- Load synthetic file and test file.  
- Drop any rows without the ground-truth membership label (`is_member`).  
- Identify **common columns** present in both datasets, excluding `is_member`.

### 3.2 Marginal CDF Comparison  
For each numeric column \(f\in\text{MARGINAL\_COLUMNS}\):
1. Let \(x =\) test record’s value in column \(f\).  
2. Compute  
   \[
     P_{\text{synth}}(f \le x) = \frac{1}{N_{\text{synth}}}
       \sum_{s\in\text{synth}} \mathbb{1}\{s[f] \le x\},
   \]
   similarly for \(P_{\text{test}}(f \le x)\).  
3. Record \(|P_{\text{synth}} - P_{\text{test}}|\).  
4. Average these differences to get the **marginal score**.

### 3.3 Joint CDF Comparison  
For each feature-pair \((f,g)\in\text{JOINT\_PAIRS}\):
1. Let \((x,y)\) = values of \((f,g)\) in the test record.  
2. Compute  
   \[
     P_{\text{synth}}\bigl(f\le x \,\wedge\, g\le y\bigr),
     \quad
     P_{\text{test}}\bigl(f\le x \,\wedge\, g\le y\bigr).
   \]
3. Record the absolute difference.  
4. Average over all pairs to get the **joint score**.

### 3.4 Scoring & Thresholding  
- **Per-record score** = joint score − marginal score.  
- Collect scores for all test records: \(\{\,\text{score}(x_i)\}\).  
- Choose a threshold \(t\) (default: median of all scores).  
- **Predict**:  
  \[
    \hat{y}_i = 
    \begin{cases}
      1 & \text{if }\text{score}(x_i)\ge t\\
      0 & \text{otherwise}
    \end{cases}
  \]

### 3.5 Evaluation Metrics  
Compare \(\hat{y}_i\) against ground-truth \(y_i\in\{0,1\}\) using:
- **Accuracy**  
- **Precision**  
- **Recall**  
- **F1 Score**  
- **AUC** (ROC Curve area)  
- **ASR** (Attack Success Rate = fraction correct)

## 4. Example Pseudocode

```python
for each test record x_i:
    # Marginal
    m_diffs = [ abs((synth[f] <= x_i[f]).mean() - (test[f] <= x_i[f]).mean())
                for f in MARGINAL_COLUMNS ]
    m_score = np.mean(m_diffs)

    # Joint
    j_diffs = [ abs(( (synth[f] <= x_i[f]) & (synth[g] <= x_i[g]) ).mean()
                  - ( (test[f]  <= x_i[f]) & (test[g]  <= x_i[g]) ).mean() )
                for (f,g) in JOINT_PAIRS ]
    j_score = np.mean(j_diffs)

    score_i = j_score - m_score
    scores.append(score_i)

# Threshold at median(scores), then compute metrics.
```

## 5. Practical Considerations

* **Choice of Columns**
  Marginal and joint sets should reflect features most likely to exhibit over-fitting (e.g., numerical lab results, correlated visit counts).

* **Sensitivity to Threshold**
  Median works well when the attack is balanced, but you may tune $t$ for a desired precision/recall tradeoff.

* **Computational Cost**
  Each per-record computation is $O(d\,N)$ where $d$ is number of features/pairs and $N$ is dataset size. Caching CDF indicators or using binned histograms can speed this up.

* **Advantages**

  * Captures both marginal shifts and dependency changes.
  * Simple: no need to train a separate classifier.

* **Limitations**

  * Relies on numeric ordering—categorical data must be encoded ordinally or via separate treatment.
  * Less effective if the synthesizer preserves CDFs perfectly.


In [16]:
import os
import numpy as np
import pandas as pd
from sklearn.metrics import (
    roc_auc_score,
    accuracy_score,
    precision_score,
    recall_score,
    f1_score
)

# ─── Configuration ──────────────────────────────────────────────────────────────
FOLDER_PATH     = "dataset/MIA_SDG_Exercise"
SYNTHETIC_FILES = [f"synthetic_data{i}.csv" for i in range(1, 5)]
TEST_FILES      = ["test_data_with_outliers.csv", "test_data_wto_outliers.csv"]
LABEL_COLUMN    = "is_member"

# Columns for the copula-based attack
MARGINAL_COLUMNS = [
    "lab_test_count",
    "diagnosis_count",
    "stay_duration_days",
    "medication_count"
]
JOINT_PAIRS = [
    ("lab_test_count",    "diagnosis_count"),
    ("lab_test_count",    "procedure_count"),
    ("stay_duration_days","inpatient_visits")
]


def compute_copula_scores(
    synth_df: pd.DataFrame,
    test_df: pd.DataFrame
) -> np.ndarray:
    """
    For each test row:
      - Marginal score: average |F_synth(col <= x_i) - F_test(col <= x_i)| over MARGINAL_COLUMNS
      - Joint score:   average |P_synth(c1<=x_i, c2<=y_i) - P_test(c1<=x_i, c2<=y_i)| over JOINT_PAIRS
      - Final score = joint_score - marginal_score
    Returns an array of per-row scores.
    """
    scores = []
    # Precompute nothing; we'll use vectorized means inside the loop
    for _, row in test_df.iterrows():
        # Marginal
        marg_diffs = []
        for col in MARGINAL_COLUMNS:
            if col in synth_df.columns and col in test_df.columns:
                x = row[col]
                if pd.notna(x):
                    diff = abs(
                        (synth_df[col] <= x).mean()
                      - (test_df[col]  <= x).mean()
                    )
                    marg_diffs.append(diff)
        m_score = np.mean(marg_diffs) if marg_diffs else 0.0

        # Joint
        joint_diffs = []
        for c1, c2 in JOINT_PAIRS:
            if c1 in synth_df.columns and c2 in synth_df.columns and c1 in test_df.columns and c2 in test_df.columns:
                x1, x2 = row[c1], row[c2]
                if pd.notna(x1) and pd.notna(x2):
                    s_prob = ((synth_df[c1] <= x1) & (synth_df[c2] <= x2)).mean()
                    t_prob = ((test_df[c1]  <= x1) & (test_df[c2]  <= x2)).mean()
                    joint_diffs.append(abs(s_prob - t_prob))
        j_score = np.mean(joint_diffs) if joint_diffs else 0.0

        scores.append(j_score - m_score)

    return np.array(scores)


def evaluate_attack(
    scores: np.ndarray,
    labels: pd.Series,
    threshold: float = None
) -> dict:
    """
    Compute Accuracy, Precision, Recall, F1, AUC, ASR from per-row scores
    and true 'yes'/'no' labels. Threshold is median(scores) if None.
    """
    y_scores = np.array(scores)
    y_true   = np.array([1 if l == "yes" else 0 for l in labels])

    if threshold is None:
        threshold = np.median(y_scores)

    y_pred = (y_scores >= threshold).astype(int)
    return {
        "Accuracy" : accuracy_score(y_true, y_pred),
        "Precision": precision_score(y_true, y_pred, zero_division=0),
        "Recall"   : recall_score(y_true, y_pred, zero_division=0),
        "F1"       : f1_score(y_true, y_pred, zero_division=0),
        "AUC"      : roc_auc_score(y_true, y_scores),
        "ASR"      : float((y_pred == y_true).mean())
    }


def run_copula_attacks() -> dict:
    """
    Runs the fixed copula-based (marginal + joint CDF) attack on all
    synthetic/test dataset pairs. Returns a mapping "synth vs test" -> metrics.
    """
    results = {}

    for synth_file in SYNTHETIC_FILES:
        synth_path = os.path.join(FOLDER_PATH, synth_file)
        synth_df   = pd.read_csv(synth_path)

        for test_file in TEST_FILES:
            test_path = os.path.join(FOLDER_PATH, test_file)
            df        = pd.read_csv(test_path).dropna(subset=[LABEL_COLUMN])

            key = f"{synth_file} vs {test_file}"
            print(f"\n🔍 Running Copula Attack on: {key}")

            # Align columns and split off the label
            common_cols  = [
                c for c in df.columns
                if c in synth_df.columns and c != LABEL_COLUMN
            ]
            subset       = df[common_cols + [LABEL_COLUMN]]
            test_subset  = subset.dropna(subset=[LABEL_COLUMN])
            labels       = test_subset[LABEL_COLUMN]
            X_test       = test_subset.drop(columns=[LABEL_COLUMN])
            X_synth      = synth_df[common_cols].copy()

            # Compute per-row copula scores
            scores = compute_copula_scores(X_synth, X_test)

            # Evaluate and store
            results[key] = evaluate_attack(scores, labels)

    return results


def print_results(results: dict):
    """Nicely prints metrics for each dataset pair."""
    for pair, metrics in results.items():
        print(f"\n==== Results for {pair} ====")
        for name, val in metrics.items():
            print(f"{name:>10}: {val:.4f}")
        print("-" * 30)


if __name__ == "__main__":
    attack_results = run_copula_attacks()
    print_results(attack_results)



🔍 Running Copula Attack on: synthetic_data1.csv vs test_data_with_outliers.csv

🔍 Running Copula Attack on: synthetic_data1.csv vs test_data_wto_outliers.csv

🔍 Running Copula Attack on: synthetic_data2.csv vs test_data_with_outliers.csv

🔍 Running Copula Attack on: synthetic_data2.csv vs test_data_wto_outliers.csv

🔍 Running Copula Attack on: synthetic_data3.csv vs test_data_with_outliers.csv

🔍 Running Copula Attack on: synthetic_data3.csv vs test_data_wto_outliers.csv

🔍 Running Copula Attack on: synthetic_data4.csv vs test_data_with_outliers.csv

🔍 Running Copula Attack on: synthetic_data4.csv vs test_data_wto_outliers.csv

==== Results for synthetic_data1.csv vs test_data_with_outliers.csv ====
  Accuracy: 0.5417
 Precision: 0.5417
    Recall: 0.5417
        F1: 0.5417
       AUC: 0.5968
       ASR: 0.5417
------------------------------

==== Results for synthetic_data1.csv vs test_data_wto_outliers.csv ====
  Accuracy: 0.4500
 Precision: 0.4500
    Recall: 0.4500
        F1: 0.4

# Attack 8 - Boundary Anomaly Attack

## 1. Overview  
The **Boundary Anomaly Attack** is a simple, data-driven membership inference attack that flags a test record as a “member” if any of its feature values lie **outside** the ranges or categories observed in a held-out subset of the real (training) data. It works in two phases:

1. **Inference phase** (using only the real data):  
   - Learn the **numeric bounds** (min/max) for each numeric feature.  
   - Learn the **valid category set** for each categorical feature.

2. **Scoring phase** (using both real and synthetic data):  
   - For each record in the **test set**, count how many feature values violate the inferred bounds or categories.  
   - Convert that count into a per-record **anomaly score** (fraction of features violating constraints).

Records with a high anomaly score are more likely to have been **excluded** from the training set (non-member), while those with low or zero anomaly score are more likely to be **members**.

---

## 2. Data & Setup

- **Synthetic datasets**: `synthetic_data1.csv` … `synthetic_data4.csv`  
- **Test datasets**:  
  - `test_data_with_outliers.csv` (outlier-focused samples)  
  - `test_data_wto_outliers.csv` (random samples)  
- Each test record has a label `is_member ∈ {yes, no}` indicating ground-truth membership in the training set.

---

## 3. Preprocessing (Inference Phase)

1. **Filter real-members**  
   - From the test set, select only those rows with `is_member == "yes"`.  
   - Drop the `is_member` column.

2. **Separate feature types**  
   - **Numeric columns**: all columns detected as numbers.  
   - **Categorical columns**: all columns detected as strings/objects.

3. **Infer numeric bounds**  
   ```python
   bounds[col] = ( real_df[col].min(), real_df[col].max() )
```

for each numeric column.

4. **Infer valid categories**

   ```python
   categories[col] = real_df[col].unique().tolist()
   ```

   for each categorical column.

All of this runs **once per test file** (not per synthetic dataset), so the attack uses only the held-out real data to learn the “allowed” feature space.

---

## 4. Scoring Mechanism (Attack Phase)

For each synthetic/test pairing:

1. **Align columns**

   * Identify the intersection of features present in both the synthetic data and the real-data schema used for inference.

2. **Per-row anomaly score**
   For each test record `row`:

   1. Initialize `violations = 0` and `checks = 0`.
   2. **Numeric features**

      ```python
      for col, (min_val, max_val) in bounds.items():
          checks += 1
          if row[col] < min_val or row[col] > max_val:
              violations += 1
      ```
   3. **Categorical features**

      ```python
      for col, valid_vals in categories.items():
          checks += 1
          if row[col] not in valid_vals:
              violations += 1
      ```
   4. **Normalize**

      ```python
      score = violations / checks
      ```

      – a value in \[0, 1], where 0 means “fully within bounds” and 1 means “all features out of bounds.”

3. **Collect** the resulting `score` for every record in the test set into a 1-D array.

---

## 5. Membership Decision & Evaluation

1. **Thresholding**

   * Choose a decision threshold (by default, the **median** of all anomaly scores).
   * Predict `member (1)` if `score >= threshold`, else `non-member (0)`.

2. **Metrics**
   Compare predictions against the ground-truth `is_member` labels using:

   * **Accuracy**
   * **Precision**
   * **Recall**
   * **F1 score**
   * **AUC (ROC)**
   * **ASR** (Attack Success Rate = fraction of correct guesses)

In [18]:
import os
import numpy as np
import pandas as pd
from sklearn.metrics import (
    roc_auc_score,
    accuracy_score,
    precision_score,
    recall_score,
    f1_score
)

# ─── Configuration ──────────────────────────────────────────────────────────────
FOLDER_PATH     = "dataset/MIA_SDG_Exercise"
SYNTHETIC_FILES = [f"synthetic_data{i}.csv" for i in range(1, 5)]
TEST_FILES      = ["test_data_with_outliers.csv", "test_data_wto_outliers.csv"]
LABEL_COLUMN    = "is_member"


# ─── Helper Functions ────────────────────────────────────────────────────────────
def infer_boundaries(df: pd.DataFrame, numeric_cols: list[str]) -> dict:
    """Infer (min, max) bounds for each numeric column from df."""
    bounds = {}
    for col in numeric_cols:
        series = df[col].dropna().astype(float)
        bounds[col] = (series.min(), series.max())
    return bounds

def infer_categories(df: pd.DataFrame, categorical_cols: list[str]) -> dict:
    """Infer valid categories for each categorical column from df."""
    cats = {}
    for col in categorical_cols:
        cats[col] = df[col].dropna().unique().tolist()
    return cats

def boundary_violation_score(
    row: dict,
    bounds: dict,
    categories: dict
) -> float:
    """
    Given a row (as dict), plus numeric bounds and valid category lists,
    return the fraction of features that violate those constraints.
    """
    violations = 0.0
    total_checks = 0

    # Numeric violations
    for col, (min_val, max_val) in bounds.items():
        total_checks += 1
        val = row.get(col, None)
        if val is None or pd.isna(val) or not (min_val <= float(val) <= max_val):
            violations += 1.0

    # Categorical violations
    for col, valid_vals in categories.items():
        total_checks += 1
        val = row.get(col, None)
        if val is None or pd.isna(val) or val not in valid_vals:
            violations += 1.0

    # Return average violation rate (0.0 = no violations, 1.0 = all violate)
    return violations / total_checks if total_checks > 0 else 0.0

def evaluate_attack(
    scores: np.ndarray,
    labels: list[str],
    threshold: float = None
) -> dict:
    """
    Compute Accuracy, Precision, Recall, F1, AUC, and ASR for
    binary labels ('yes'/'no') given per-row anomaly scores.
    """
    y_scores = np.array(scores)
    y_true   = np.array([1 if l == "yes" else 0 for l in labels])

    if threshold is None:
        threshold = np.median(y_scores)

    y_pred = (y_scores >= threshold).astype(int)

    return {
        "Accuracy":  accuracy_score(y_true, y_pred),
        "Precision": precision_score(y_true, y_pred, zero_division=0),
        "Recall":    recall_score(y_true, y_pred, zero_division=0),
        "F1":        f1_score(y_true, y_pred, zero_division=0),
        "AUC":       roc_auc_score(y_true, y_scores),
        "ASR":       float((y_pred == y_true).mean())
    }


# ─── Main Attack Runner ─────────────────────────────────────────────────────────
def run_boundary_anomaly_attack() -> dict:
    """
    For each test set, infer real-data boundaries & categories once.
    Then for each synthetic dataset, score all test rows for boundary
    violations (restricted to features shared by real & synth) and
    evaluate membership inference performance.
    """
    all_results = {}

    for test_file in TEST_FILES:
        # Load and filter test data
        test_path = os.path.join(FOLDER_PATH, test_file)
        df_test   = pd.read_csv(test_path).dropna(subset=[LABEL_COLUMN])
        labels    = df_test[LABEL_COLUMN].tolist()

        # Real-members subset for inferring bounds/categories
        real_df = df_test[df_test[LABEL_COLUMN] == "yes"].drop(columns=[LABEL_COLUMN])

        # Global numeric/categorical columns from real data
        numeric_cols     = real_df.select_dtypes(include=[np.number]).columns.tolist()
        categorical_cols = real_df.select_dtypes(include=["object"]).columns.tolist()

        # Infer once per test set
        bounds     = infer_boundaries(real_df, numeric_cols)
        categories = infer_categories(real_df, categorical_cols)

        for synth_file in SYNTHETIC_FILES:
            synth_path = os.path.join(FOLDER_PATH, synth_file)
            df_synth   = pd.read_csv(synth_path)

            # Only consider features present in both real_df and synthetic
            common_cols = [c for c in real_df.columns if c in df_synth.columns]

            # Restrict bounds/categories to these shared features
            bounds_sub     = {c: bounds[c] for c in numeric_cols     if c in common_cols}
            categories_sub = {c: categories[c] for c in categorical_cols if c in common_cols}

            print(f"🔍 Running Boundary Anomaly Attack on: {synth_file} vs {test_file}")

            # Compute per-row scores
            scores = []
            for _, row in df_test.iterrows():
                row_dict = row.to_dict()
                scores.append(boundary_violation_score(row_dict, bounds_sub, categories_sub))

            # Evaluate
            key = f"{synth_file} vs {test_file}"
            all_results[key] = evaluate_attack(np.array(scores), labels)

    return all_results

def print_results(results: dict):
    """Print the metrics for each dataset pair."""
    for pair, metrics in results.items():
        print(f"\n==== Results for {pair} ====")
        for name, val in metrics.items():
            print(f"{name:>10}: {val:.4f}")
        print("-" * 30)

if __name__ == "__main__":
    attack_results = run_boundary_anomaly_attack()
    print_results(attack_results)


🔍 Running Boundary Anomaly Attack on: synthetic_data1.csv vs test_data_with_outliers.csv
🔍 Running Boundary Anomaly Attack on: synthetic_data2.csv vs test_data_with_outliers.csv
🔍 Running Boundary Anomaly Attack on: synthetic_data3.csv vs test_data_with_outliers.csv
🔍 Running Boundary Anomaly Attack on: synthetic_data4.csv vs test_data_with_outliers.csv
🔍 Running Boundary Anomaly Attack on: synthetic_data1.csv vs test_data_wto_outliers.csv
🔍 Running Boundary Anomaly Attack on: synthetic_data2.csv vs test_data_wto_outliers.csv
🔍 Running Boundary Anomaly Attack on: synthetic_data3.csv vs test_data_wto_outliers.csv
🔍 Running Boundary Anomaly Attack on: synthetic_data4.csv vs test_data_wto_outliers.csv

==== Results for synthetic_data1.csv vs test_data_with_outliers.csv ====
  Accuracy: 0.2396
 Precision: 0.2642
    Recall: 0.2917
        F1: 0.2772
       AUC: 0.1508
       ASR: 0.2396
------------------------------

==== Results for synthetic_data2.csv vs test_data_with_outliers.csv ====