# CelebA Range MIA

In [None]:
%load_ext autoreload
%autoreload 2

# Load datasets

In [None]:
import torch
import numpy as np
from tqdm import tqdm

In [None]:
from dataset_loaders import CelebADatasetLoader

# If the dataset has been processed previously, set load_from_disk=True
# celeba = CelebADatasetLoader(load_from_disk=True, dataset_path="datasets/celeba")
celeba = CelebADatasetLoader(load_from_disk=False, dataset_path="datasets/celeba")

In [None]:
print("Training size:", len(celeba.training_set))
print("Test size:", len(celeba.test_set))
print("Population size:", len(celeba.population_set))
print("Nonmembers size:", len(celeba.nonmembers_set))

## Load all original data

In [None]:
celeba_all = celeba.load_all_original_data()

# Compute Loss

## Load models

In [None]:
from facial_attribute_cnn import FacialAttributeCNN
# Load the model
model_0 = FacialAttributeCNN()
model_1 = FacialAttributeCNN()
model_2 = FacialAttributeCNN()
model_3 = FacialAttributeCNN()

model_0.load_state_dict(torch.load("saved_models/celeba/model_0.pt"))
model_1.load_state_dict(torch.load("saved_models/celeba/model_1.pt"))
model_2.load_state_dict(torch.load("saved_models/celeba/model_2.pt"))
model_3.load_state_dict(torch.load("saved_models/celeba/model_3.pt"))

# Set the model to evaluation mode
model_0 = model_0.eval()
model_1 = model_1.eval()
model_2 = model_2.eval()
model_3 = model_3.eval()

# Move the model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_0 = model_0.to(device)
model_1 = model_1.to(device)
model_2 = model_2.to(device)
model_3 = model_3.to(device)

In [None]:
loss_0 = defaultdict(list)
loss_1 = defaultdict(list)
loss_2 = defaultdict(list)
loss_3 = defaultdict(list)

In [None]:
dataloader = torch.utils.data.DataLoader(celeba_all, batch_size=128, shuffle=False)

for data, label in tqdm(dataloader):
    data = data.to(device)
    id = label[-1]
    label = label[0].to("cuda").float()

    output_0 = model_0(data)
    output_1 = model_1(data)
    output_2 = model_2(data)
    output_3 = model_3(data)

    loss_0 = torch.nn.BCEWithLogitsLoss(reduction="none")(output_0, label)
    loss_1 = torch.nn.BCEWithLogitsLoss(reduction="none")(output_1, label)
    loss_2 = torch.nn.BCEWithLogitsLoss(reduction="none")(output_2, label)
    loss_3 = torch.nn.BCEWithLogitsLoss(reduction="none")(output_3, label)

    for i in range(len(id)):
        loss_0[id[i].item()].append(loss_0[i].detach().cpu().numpy())
        loss_1[id[i].item()].append(loss_1[i].detach().cpu().numpy())
        loss_2[id[i].item()].append(loss_2[i].detach().cpu().numpy())
        loss_3[id[i].item()].append(loss_3[i].detach().cpu().numpy())

In [None]:
torch.save(loss_0, "results/celeba/model_0_losses.pt")
torch.save(loss_1, "results/celeba/model_1_losses.pt")
torch.save(loss_2, "results/celeba/model_2_losses.pt")
torch.save(loss_3, "results/celeba/model_3_losses.pt")

# Attack

In [None]:
def get_average_loss_for_each_data(loss_dict):
    d = {key: np.array(values).mean(1) for key, values in loss_dict.items() if key > 0 and key <= 10177}
    return d

In [None]:
losses = [loss_0, loss_1, loss_2, loss_3]
losses = [get_average_loss_for_each_data(loss) for loss in losses]

In [None]:
split_matrix = np.zeros((4, 10177))
split_matrix[0][:5089] = 1
split_matrix[1][5089:] = 1
split_matrix[2][list(range(0, 2544)) + list(range(5089,7633))] = 1
split_matrix[3][list(range(2544, 5089)) + list(range(7633, 10177))] = 1

In [None]:
zero_id = []
for key in losses[0].keys():
    if len(losses[0][key]) == 1 and key <= 5089:
        # print(key)
        zero_id.append(key-1)
split_matrix[:, zero_id] = 0

In [None]:
from membership_testers.utils import get_rmia_score_dict_from_loss_dicts

rmia_score_dict_offline = get_rmia_score_dict_from_loss_dicts(
    losses, split_matrix, offline=True, a=0.33
)

In [None]:
def get_max_rmia_score(rmia_score_dict):
    return [max(values) for key, values in rmia_score_dict.items()]

def get_topk_average_rmia_score(rmia_score_dict, k, sample=False, sample_size=20):
    if sample:
        sampled_dict = sample_within_dict(rmia_score_dict, sample_size)
    else:
        sampled_dict = rmia_score_dict
        
    return [
        values[(-values).argsort()[:min(k, len(values))]].mean()
        for key, values in sampled_dict.items()
    ]

In [None]:
def get_test_samples(dict):
    return {
        key: values[len(values) // 2 :]
        for key, values in dict.items()
        if key > 0 and key <= 10177
    }


def get_train_samples(dict):
    return {
        key: values[: len(values) // 2]
        for key, values in dict.items()
        if key > 0 and key <= 10177
    }


def get_mix_samples(dict, k=0):
    return {
        key: np.concatenate(
            [values[: int(len(values) // 2 * (k / 100))], values[len(values) // 2 :]]
        )
        for key, values in dict.items()
        if key > 0 and key <= 10177
    }


def sample_within_dict(dict, sample_size):
    sampled_dict = {key: values for key, values in dict.items()}
    for key, values in sampled_dict.items():
        if len(values) <= sample_size:
            continue
        else:
            sample_id = np.random.choice(len(values), sample_size, replace=False)
            sampled_dict[key] = values[sample_id]
    return sampled_dict

In [None]:
def convert_dict_to_list(dict):
    return [values for _, values in dict.items()]

# Range

## RMIA

In [None]:
rmia_score_dict_test = get_test_samples(rmia_score_dict_offline)

In [None]:
from visualize import plot_multiple_roc_curves
from sklearn.metrics import roc_curve

fpr, tpr, _ = roc_curve(split_matrix[0], get_topk_average_rmia_score(rmia_score_dict_test, k=13))

plot_multiple_roc_curves(fpr, tpr, "CelebA RMIA ROC Curve")