In [1]:
import torch
from torch.utils.data import Dataset, DataLoader
from typing import Tuple
import numpy as np
import requests
import pandas as pd
from torchvision import transforms
from torchvision.models import resnet18
import torch.nn.functional as F
from sklearn.linear_model import LogisticRegression
from sklearn.neural_network import MLPClassifier
from tqdm import tqdm
import xgboost as xgb
from sklearn.metrics import roc_auc_score, roc_curve
from scipy.stats import gaussian_kde
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier

In [2]:
### LOADING THE MODEL
from torchvision.models import resnet18

In [3]:
### Add this as a transofrmation to pre-process the images
mean = [0.2980, 0.2962, 0.2987]
std = [0.2886, 0.2875, 0.2889]

model = resnet18(weights=False)
model.fc = torch.nn.Linear(512, 44)

ckpt = torch.load("01_MIA.pt", map_location="cpu",weights_only=False)
model.load_state_dict(ckpt)
model.eval()



ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [11]:
from torch.utils.data import Dataset
from typing import Tuple
import torch

class TaskDataset(Dataset):
    def __init__(self, pt_file=None, transform=None):
        self.ids = []
        self.imgs = []
        self.labels = []
        self.transform = transform

        # Optionally load from pt file if provided
        if pt_file is not None:
            data = torch.load(pt_file)
            self.ids = data["ids"]
            self.imgs = data["imgs"]
            self.labels = data["labels"]

    def __getitem__(self, index) -> Tuple[int, torch.Tensor, int]:
        id_ = self.ids[index]
        img = self.imgs[index]
        if self.transform is not None:
            img = self.transform(img)
        label = self.labels[index]
        return id_, img, label

    def __len__(self):
        return len(self.ids)

class MembershipDataset(TaskDataset):
    def __init__(self, pt_file=None, transform=None):
        super().__init__(pt_file, transform)
        self.membership = []
        if pt_file is not None:
            data = torch.load(pt_file)
            # Only load membership if present (for public data)
            self.membership = data.get("membership", [0]*len(self.ids))

    def __getitem__(self, index) -> Tuple[int, torch.Tensor, int, int]:
        id_, img, label = super().__getitem__(index)
        membership = self.membership[index]
        return id_, img, label, membership


In [12]:
transform = transforms.Compose([
#     transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std)
])

In [13]:
# torch.serialization.add_safe_globals([MembershipDataset])

public_data: MembershipDataset = torch.load("pub.pt",weights_only=False)
public_data.transform = transform

public_data[0]

(264399,
 tensor([[[-0.6657, -0.5570, -0.4483,  ..., -0.4347, -0.3667, -0.2309],
          [-0.6929, -0.6113, -0.5162,  ..., -0.4890, -0.4483, -0.2852],
          [-0.6929, -0.6657, -0.6385,  ..., -0.5842, -0.5298, -0.4211],
          ...,
          [-0.4483, -0.4890, -0.5162,  ..., -0.3260, -0.3396, -0.3939],
          [-0.5026, -0.4890, -0.5570,  ..., -0.3396, -0.3532, -0.4619],
          [-0.5162, -0.5570, -0.6113,  ..., -0.4755, -0.4890, -0.5162]],
 
         [[-0.6620, -0.5529, -0.4437,  ..., -0.4301, -0.3619, -0.2255],
          [-0.6893, -0.6074, -0.5119,  ..., -0.4847, -0.4437, -0.2800],
          [-0.6893, -0.6620, -0.6347,  ..., -0.5801, -0.5256, -0.4165],
          ...,
          [-0.4437, -0.4847, -0.5119,  ..., -0.3210, -0.3346, -0.3892],
          [-0.4983, -0.4847, -0.5529,  ..., -0.3346, -0.3482, -0.4574],
          [-0.5119, -0.5529, -0.6074,  ..., -0.4710, -0.4847, -0.5119]],
 
         [[-0.6674, -0.5588, -0.4502,  ..., -0.4367, -0.3688, -0.2330],
          [-0.6946,

In [14]:
private_data: MembershipDataset = torch.load("priv_out.pt",weights_only=False)
private_data.transform = transform
private_data[0]

(55061,
 tensor([[[1.9025, 1.9297, 1.8889,  ..., 1.7258, 1.8074, 1.8617],
          [1.7938, 1.8345, 1.8345,  ..., 1.8481, 1.9297, 1.8617],
          [1.8889, 1.8617, 1.8617,  ..., 1.8481, 1.8210, 1.7530],
          ...,
          [1.8617, 1.8617, 1.8753,  ..., 1.2910, 1.6035, 1.6579],
          [1.8210, 1.8481, 1.8753,  ..., 1.7394, 1.8481, 1.8889],
          [1.2638, 1.5628, 1.7666,  ..., 1.7666, 1.7258, 1.7123]],
 
         [[0.9203, 0.9067, 0.7975,  ..., 0.5247, 0.6475, 0.7157],
          [0.5111, 0.5111, 0.4565,  ..., 0.6338, 0.7020, 0.6202],
          [0.5656, 0.5793, 0.5929,  ..., 0.6611, 0.6338, 0.5793],
          ...,
          [0.7430, 0.6748, 0.6748,  ..., 0.3201, 0.5656, 0.6066],
          [0.6475, 0.6475, 0.6338,  ..., 0.5247, 0.6066, 0.6338],
          [0.2656, 0.4156, 0.5247,  ..., 0.4838, 0.4156, 0.4156]],
 
         [[1.5452, 1.5180, 1.4230,  ..., 1.2330, 1.3416, 1.3958],
          [1.2330, 1.2330, 1.1922,  ..., 1.3416, 1.3958, 1.3416],
          [1.2737, 1.2737, 1.287

In [15]:
def compute_gradients(model, input_tensor, label, device='cpu'):
    model.zero_grad()
    input_tensor = input_tensor.unsqueeze(0).to(device)
    label_tensor = torch.tensor([label]).to(device)
    criterion = torch.nn.CrossEntropyLoss()
    output = model(input_tensor)
    loss = criterion(output, label_tensor)
    loss.backward()

    gradients = []
    for param in model.parameters():
        if param.grad is not None:
            gradients.append(param.grad.detach().cpu().flatten())
    gradient_vector = torch.cat(gradients).numpy()
    return gradient_vector, loss.item() 

def aggregate_gradient(grad_vector, num_chunks=10):
    grad_chunks = np.array_split(grad_vector, num_chunks)
    agg = np.array([np.linalg.norm(chunk) for chunk in grad_chunks])
    return agg

In [16]:
# SHADOW MODEL
class ShadowListDataset(Dataset):
    def __init__(self, samples):
        self.samples = samples  # list of (img_id, img, label, membership)
    def __getitem__(self, idx):
        img_id, img, label, membership = self.samples[idx]
        return img_id, img, label, membership
    def __len__(self):
        return len(self.samples)

def build_and_train_shadow_model(shadow_train_data, device="cpu", epochs=10, lr=1e-3,batch_size=16):
    model = resnet18(weights=None)
    model.fc = torch.nn.Linear(512, 44) 
    model = model.to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = torch.nn.CrossEntropyLoss()
    model.train()
    
    dataset = ShadowListDataset(shadow_train_data)
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, drop_last=True)

    for epoch in tqdm(range(epochs)):
        for batch in loader:
            img_ids, imgs, labels, memberships = batch
            imgs = imgs.to(device)        # [B, 3, H, W]
            labels = labels.to(device)    # [B]
            optimizer.zero_grad()
            output = model(imgs)          # [B, 44]
            loss = criterion(output, labels)
            loss.backward()
            optimizer.step()
    model.eval()
    return model


In [None]:
num_shadows = 2  # You can increase for stronger attack
shadow_attack_X = []
shadow_attack_y = []
shadow_losses_in = dict()   # For LiRA (losses for "in" cases)
shadow_losses_out = dict()  # For LiRA (losses for "out" cases)

all_samples = [(img_id, img, label, membership) for img_id, img, label, membership in public_data]
num_samples = len(all_samples)

for s in range(num_shadows):
    print(f"\nTraining shadow model {s+1}/{num_shadows}...")
    # Split public data randomly: 50% train (members), 50% test (non-members)
    indices = np.arange(num_samples)
    train_idx, test_idx = train_test_split(indices, test_size=0.4, random_state=s)
    shadow_train = [all_samples[i] for i in train_idx]
    shadow_test = [all_samples[i] for i in test_idx]

    # Train a shadow model (same as your target, ideally new initialization)
    # For demonstration, let's assume you have a train_model function or just reload a new model here
    shadow_model = build_and_train_shadow_model(shadow_train, device="cuda" if torch.cuda.is_available() else "cpu", 
                                                epochs=10, lr=1e-3,batch_size=16)

    # Features for attack model (member = 1)
    for img_id, img, label, membership in tqdm(shadow_train, desc=f"Shadow {s+1} Train"):
        grad_vec, loss_val = compute_gradients(shadow_model, img, label)
        agg_feat = aggregate_gradient(grad_vec)
        shadow_attack_X.append(agg_feat)
        shadow_attack_y.append(1)
        # Store losses for LiRA
        shadow_losses_in.setdefault(img_id, []).append(loss_val)

    # Features for attack model (non-member = 0)
    for img_id, img, label, membership in tqdm(shadow_test, desc=f"Shadow {s+1} Test"):
        grad_vec, loss_val = compute_gradients(shadow_model, img, label)
        agg_feat = aggregate_gradient(grad_vec)
        shadow_attack_X.append(agg_feat)
        shadow_attack_y.append(0)
        shadow_losses_out.setdefault(img_id, []).append(loss_val)



Training shadow model 1/2...


100%|██████████████████████████████████████████| 10/10 [18:24<00:00, 110.43s/it]
Shadow 1 Train: 100%|█████████████████████| 12000/12000 [03:47<00:00, 52.66it/s]
Shadow 1 Test: 100%|████████████████████████| 8000/8000 [02:22<00:00, 55.96it/s]



Training shadow model 2/2...


100%|██████████████████████████████████████████| 10/10 [20:03<00:00, 120.38s/it]
Shadow 2 Train:  35%|███████▊              | 4252/12000 [01:23<02:19, 55.54it/s]

In [None]:
# X_train, y_train = [], []

# for img_id,img,label,membership in tqdm(public_data, desc='Extracting Public Features'):
#     grad_vec = compute_gradients(model, img, label)
#     agg_feat = aggregate_gradient(grad_vec)
#     X_train.append(agg_feat)
#     y_train.append(membership)

# X_train = np.vstack(X_train)
# y_train = np.array(y_train)


In [None]:
# attack_model = xgb.XGBClassifier(n_estimators=100, learning_rate=0.1, use_label_encoder=False, eval_metric='logloss')
# attack_model = MLPClassifier(hidden_layer_sizes=(100,), max_iter=200)
# attack_model.fit(X_train, y_train)

In [None]:
private_features, private_ids, private_losses = [], [], []

for img_id, img, label, membership in tqdm(private_data, desc='Extracting Private Features'):
    grad_vec, loss_val = compute_gradients(model, img, label)  
    agg_feat = aggregate_gradient(grad_vec)
    private_features.append(agg_feat)
    private_ids.append(img_id)
    private_losses.append(loss_val)

X_private = np.vstack(private_features)

In [None]:
shadow_attack_X = np.vstack(shadow_attack_X)
shadow_attack_y = np.array(shadow_attack_y)

# np.save('shadow_attack_X.npy', shadow_attack_X)
# np.save('shadow_attack_y.npy', shadow_attack_y)
# attack_model = xgb.XGBClassifier(n_estimators=100, learning_rate=0.1, use_label_encoder=False, eval_metric='logloss')
# attack_model = RandomForestClassifier(n_estimators=100)
# attack_model.fit(shadow_attack_X, shadow_attack_y)

from sklearn.neural_network import MLPClassifier
from xgboost import XGBClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score

def tpr_at_fpr(y_true, y_scores, fpr_level=0.05):
    fpr, tpr, thresholds = roc_curve(y_true, y_scores)
    idx = np.where(fpr <= fpr_level)[0]
    if len(idx) == 0:
        return 0.0
    return float(tpr[idx[-1]])

models = {
    "MLP": MLPClassifier(hidden_layer_sizes=(100,), max_iter=200, random_state=0),
    "XGBoost": XGBClassifier(n_estimators=100, learning_rate=0.1, use_label_encoder=False, eval_metric='logloss', random_state=0),
    "RandomForest": RandomForestClassifier(n_estimators=100, random_state=0),
    "LogisticRegression": LogisticRegression(max_iter=200, random_state=0)
}

results = []
model_shadow_scores = {}
model_metrics = {}


for name, clf in models.items():
    print(f"\nTraining {name} attack model...")
    clf.fit(shadow_attack_X, shadow_attack_y)
    shadow_scores = clf.predict_proba(shadow_attack_X)[:, 1]
    fpr, tpr, thresholds = roc_curve(shadow_attack_y, shadow_scores)
    auc_score = auc(fpr, tpr)
    tpr_05 = tpr_at_fpr(shadow_attack_y, shadow_scores, fpr_level=0.05)
    
    # Store scores and metrics
    model_shadow_scores[name] = {
        'scores': shadow_scores,
        'fpr': fpr,
        'tpr': tpr,
        'thresholds': thresholds
    }
    model_metrics[name] = {
        "AUC": auc_score,
        "TPR@FPR=0.05": tpr_05
    }
    
    #---- PRIVATE DATA PREDICTION (for server submission) ----
    membership_scores = clf.predict_proba(X_private)[:, 1]
    df = pd.DataFrame({
        "ids": private_ids,
        "score": membership_scores
    })
    out_csv = f"test_{name}.csv"
    df.to_csv(out_csv, index=None)
    
    print(f"Saved private membership scores for server to: {out_csv}")
    print(f"{name} shadow attack AUC: {auc_score:.4f} | TPR@FPR=0.05: {tpr_05:.4f}")

print("\nAll models evaluated.")

In [None]:
len(shadow_attack_X)

In [None]:
private_ids[10],private_features[10],private_losses[10]

In [None]:
X_private

In [None]:
membership_scores = attack_model.predict_proba(X_private)[:, 1]  # Probability of being member

membership_scores

In [None]:
from scipy.stats import gaussian_kde

all_in_losses = []
all_out_losses = []
for losses in shadow_losses_in.values():
    all_in_losses.extend(losses)
for losses in shadow_losses_out.values():
    all_out_losses.extend(losses)

kde_in = gaussian_kde(all_in_losses) if len(all_in_losses) > 1 else None
kde_out = gaussian_kde(all_out_losses) if len(all_out_losses) > 1 else None

lira_scores = []
for loss_val in private_losses:  # just loop through losses (no img_id lookup needed)
    if kde_in is None or kde_out is None:
        lira_scores.append(0.0)
        continue
    p_in = kde_in.evaluate(loss_val)[0]
    p_out = kde_out.evaluate(loss_val)[0]
    lira_score = p_in / (p_out + 1e-12)
    lira_scores.append(lira_score)
print(lira_scores)


In [None]:
lira_scores = np.array(lira_scores)
membership_confidences_lira = lira_scores / (1 + lira_scores)
membership_confidences_lira

In [None]:
df = pd.DataFrame({
    "ids": private_ids,
    "score": membership_confidences_lira,
})
df.to_csv("test_Lira.csv", index=None)

In [None]:
# Submit to server
response = requests.post("http://34.122.51.94:9090/mia", files={"file": open("test_MLP.csv", "rb")}, headers={"token": "50407833"})
print(response.json())

In [None]:
# {'TPR@FPR=0.05': 0.043, 'AUC': 0.5013481666666666}
# {'TPR@FPR=0.05': 0.08366666666666667, 'AUC': 0.6373154999999999}
# LiRa {'TPR@FPR=0.05': 0.058, 'AUC': 0.6262959444444445}

In [None]:
auc = roc_auc_score(public_data.membership, membership_scores)
fpr, tpr, thresholds = roc_curve(public_data.membership, membership_scores)
tpr_at_fpr_005 = tpr[fpr <= 0.05].max()

print(f"AUC: {auc:.4f}")
print(f"TPR@FPR=0.05: {tpr_at_fpr_005:.3f}")

In [None]:
# AUC: 0.4982
# TPR@FPR=0.05: 0.040

len(shadow_scores)

In [None]:
# 1. Plot Score Distribution: Member vs Non-member for each model
plt.figure(figsize=(10, 5))
y_true = np.array(shadow_attack_y)
for name, res in model_shadow_scores.items():
    y_score = res['scores']
    plt.hist(y_score[y_true == 1], bins=30, alpha=0.5, label=f"{name} Members", histtype='stepfilled', linewidth=2)
    plt.hist(y_score[y_true == 0], bins=30, alpha=0.3, label=f"{name} Non-members", histtype='step', linewidth=2)
plt.xlabel("Predicted Membership Score")
plt.ylabel("Count")
plt.title("Attack Model Scores: Member vs Non-member (All Models, Shadow Set)")
plt.legend()
plt.show()

In [None]:
# 2. Plot ROC curves for each model
plt.figure(figsize=(8, 6))
for name, res in model_shadow_scores.items():
    plt.plot(res['fpr'], res['tpr'], label=f"{name} (AUC={model_metrics[name]['AUC']:.3f})")
plt.plot([0, 1], [0, 1], "k--", label="Random (AUC=0.5)")
plt.xlabel("False Positive Rate (FPR)")
plt.ylabel("True Positive Rate (TPR)")
plt.title("ROC Curves for Membership Attack Models (Shadow Set)")
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# Optionally, plot all models' score distributions on one plot (not split by membership)
plt.figure(figsize=(8, 5))
for name, res in model_shadow_scores.items():
    plt.hist(res['scores'], bins=30, alpha=0.5, label=name)
plt.xlabel("Predicted Membership Score")
plt.ylabel("Count")
plt.title("Attack Model Score Distributions (Shadow Set, All Models)")
plt.legend()
plt.show()

In [None]:
# Checking if the model is a overfit
import torch
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt

# 1. Get model into eval mode
model.eval()

# 2. Compute predictions and loss on public data
all_labels = []
all_preds = []
all_losses = []

criterion = torch.nn.CrossEntropyLoss()

for img_id, img, label, membership in tqdm(public_data, desc="Evaluating Public Dataset"):
    img = img.unsqueeze(0)  # Add batch dimension
    output = model(img)
    pred = output.argmax(dim=1).item()
    loss = criterion(output, torch.tensor([label]))
    all_labels.append(label)
    all_preds.append(pred)
    all_losses.append(loss.item())

# 3. Compute accuracy
all_labels = np.array(all_labels)
all_preds = np.array(all_preds)
accuracy = np.mean(all_preds == all_labels)
print(f"Public Dataset Accuracy: {accuracy:.4f}")

# 4. Visualize loss distribution
plt.figure(figsize=(7, 4))
plt.hist(all_losses, bins=30, color='royalblue', alpha=0.7)
plt.xlabel("Cross Entropy Loss")
plt.ylabel("Sample Count")
plt.title("Loss Distribution on Public Dataset")
plt.show()

print(f"Average Public Loss: {np.mean(all_losses):.4f}")

# 5. (Optional) Print a few predictions and losses
for i in range(5):
    print(f"Sample {i}: Label={all_labels[i]}, Pred={all_preds[i]}, Loss={all_losses[i]:.3f}")


In [None]:
private_labels = []
private_preds = []
private_losses = []

for img_id, img, label, membership in tqdm(private_data, desc="Evaluating Private Dataset"):
    img = img.unsqueeze(0)
    output = model(img)
    pred = output.argmax(dim=1).item()
    loss = criterion(output, torch.tensor([label]))
    private_labels.append(label)
    private_preds.append(pred)
    private_losses.append(loss.item())

private_accuracy = np.mean(np.array(private_preds) == np.array(private_labels))
print(f"Private Dataset Accuracy: {private_accuracy:.4f}")

plt.figure(figsize=(7, 4))
plt.hist(private_losses, bins=30, color='tomato', alpha=0.7)
plt.xlabel("Cross Entropy Loss")
plt.ylabel("Sample Count")
plt.title("Loss Distribution on Private Dataset")
plt.show()

print(f"Average Private Loss: {np.mean(private_losses):.4f}")
