Analyzing and creating new random response activations functions to test thier ability to twart inference attacks

Case study Inception:  This case study is inspired from "Secure Split Learning against Property Inference, Data Reconstruction and FSA attacks" paper by Yunlong Mao etal

We set out to investigate privacy-preserving techniques in deep learning, focusing on random response activation functions.
1) Dataset Selection:
We chose the Fashion MNIST dataset for our experiments due to its widespread use and similarity to real-world image classification tasks.
2) Model Architecture:
We designed a Convolutional Neural Network (CNN) architecture suitable for the Fashion MNIST classification task.
3) Implementation of Random Response Activation Functions:
We implemented four different random response activation functions:
    a) Stochastic Laplacian Activation (SLA)
    b) Random Response ReLU (R3elu)
    c) Randomized Swish (RSwish)
    d) Laplacian Swish (LapSwish)
4) Initial Experiments:
We conducted experiments with all four activation functions to determine their performance in terms of accuracy and randomness scores.
5) Results Analysis:
After analyzing the results, we found that the Stochastic Laplacian Activation (SLA) performed the best among the four in balancing accuracy and randomness.
6) Further Investigation of SLA:
Having identified SLA as the most promising activation function, we proceeded to conduct more in-depth experiments:
    a) Privacy-Preserving Techniques:
    We implemented adversarial training for the SLA model to further enhance privacy. For comparison, we created a baseline model using standard ReLU activation.

    b) Evaluation Metrics:
    We implemented three key privacy evaluation metrics:

        i) Property Inference Attack
        ii) Member Inference Attack
        iii) Data Reconstruction Attack

    c) Comprehensive Experiments:

        We trained and evaluated both the SLA model (with adversarial training) and the baseline ReLU model.
        We performed each of the attack scenarios on both models to compare their privacy-preserving capabilities.

7) Final Results Analysis:
We collected and analyzed the results from all our experiments. We compared the performance of the SLA model against the baseline in terms of both accuracy and privacy metrics.

8) Visualization:
We created visualizations to clearly illustrate the comparative performance of the SLA and baseline models across all metrics.
Findings:
    a) We observed how SLA affected model accuracy compared to the baseline ReLU model.
    b) We analyzed the effectiveness of SLA in defending against various privacy attacks compared to traditional methods.
    c) We examined the trade-off between model utility (accuracy) and privacy preservation.

9) Code Implementation:
Throughout the project, we developed and refined Python code using PyTorch for:

Implementing the various random response activation functions and model architectures
Training procedures including adversarial training for SLA
Evaluation metrics and attack simulations
Data processing and result visualization

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import numpy as np
import torch.nn.functional as F
from sklearn.metrics import recall_score
import GPyOpt
import multiprocessing
from functools import partial

# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Define Stochastic Laplacian Activation (SLA)
class StochasticLaplacianActivation(nn.Module):
    def __init__(self, C, K, epsilon_p, epsilon_l, probability):
        super(StochasticLaplacianActivation, self).__init__()
        self.C = C
        self.K = K
        self.epsilon_p = epsilon_p
        self.epsilon_l = epsilon_l
        self.probability = probability

    def forward(self, x):
        x_hat = self.clipK(x, self.C, self.K)
        mask = torch.rand_like(x_hat) < self.probability
        noise_scale = 2 * self.K * self.C / (self.epsilon_l * self.epsilon_p)
        laplacian_noise = torch.tensor(np.random.laplace(0, noise_scale, size=x_hat.shape), dtype=torch.float32).to(x.device)
        output = torch.where(mask, torch.maximum(x_hat + laplacian_noise, torch.tensor(0.0).to(x.device)), x_hat)
        return output

    def clipK(self, v, C, K):
        norm_v = torch.norm(v, p=K, dim=1, keepdim=True)
        scale = torch.clamp(C / norm_v, max=1.0)
        return v * scale

# Define R3elu
class R3elu(nn.Module):
    def __init__(self, C, K, epsilon_p, epsilon_l, probability):
        super(R3elu, self).__init__()
        self.C = C
        self.K = K
        self.epsilon_p = epsilon_p
        self.epsilon_l = epsilon_l
        self.probability = probability

    def forward(self, x):
        x_hat = self.clipK(x, self.C, self.K)
        mask = torch.rand_like(x_hat) < self.probability
        noise_scale = 2 * self.K * self.C / (self.epsilon_l * self.epsilon_p)
        laplacian_noise = torch.tensor(np.random.laplace(0, noise_scale, size=x_hat.shape), dtype=torch.float32).to(x.device)
        output = torch.where(mask, F.relu(x_hat + laplacian_noise), F.relu(x_hat))
        return output

    def clipK(self, v, C, K):
        norm_v = torch.norm(v, p=K, dim=1, keepdim=True)
        scale = torch.clamp(C / norm_v, max=1.0)
        return v * scale

# Define Randomized Swish (RSwish)
class RSwish(nn.Module):
    def __init__(self, mean=1.0, stddev=0.1):
        super(RSwish, self).__init__()
        self.mean = mean
        self.stddev = stddev

    def forward(self, x):
        swish = x * torch.sigmoid(x)
        noise = torch.normal(self.mean, self.stddev, size=swish.size()).to(x.device)
        return swish * noise

# Define Laplacian Swish (LapSwish)
class LapSwish(nn.Module):
    def __init__(self, scale=0.1, epsilon_p=1.0):
        super(LapSwish, self).__init__()
        self.scale = scale
        self.epsilon_p = epsilon_p

    def forward(self, x):
        swish = x * torch.sigmoid(x)
        noise_scale = self.scale / self.epsilon_p
        laplacian_noise = torch.tensor(np.random.laplace(0, noise_scale, size=swish.size()), dtype=torch.float32).to(x.device)
        return swish + laplacian_noise

# Define the Fashion MNIST model
class FashionMNISTModel(nn.Module):
    def __init__(self, activation='SLA', **kwargs):
        super(FashionMNISTModel, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(64 * 7 * 7, 128)
        self.fc2 = nn.Linear(128, 10)
        
        if activation == 'SLA':
            self.custom_activation = StochasticLaplacianActivation(**kwargs)
        elif activation == 'R3elu':
            self.custom_activation = R3elu(**kwargs)
        elif activation == 'RSwish':
            self.custom_activation = RSwish(**kwargs)
        elif activation == 'LapSwish':
            self.custom_activation = LapSwish(**kwargs)
        else:
            raise ValueError("Unsupported activation function.")

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 64 * 7 * 7)
        x = self.fc1(x)
        x = self.custom_activation(x)
        x = self.fc2(x)
        return x

def train_and_evaluate(model, train_loader, test_loader, criterion, optimizer, device, num_epochs=10):
    model.to(device)
    for epoch in range(num_epochs):
        model.train()
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
    
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    accuracy = correct / total
    return accuracy

def objective_function(parameters, activation, train_loader, test_loader, device):
    if activation in ['SLA', 'R3elu']:
        hyperparams = {
            'C': float(parameters[:, 0]),
            'K': int(parameters[:, 1]),
            'epsilon_p': float(parameters[:, 2]),
            'epsilon_l': float(parameters[:, 3]),
            'probability': float(parameters[:, 4])
        }
    elif activation == 'RSwish':
        hyperparams = {
            'mean': float(parameters[:, 0]),
            'stddev': float(parameters[:, 1])
        }
    elif activation == 'LapSwish':
        hyperparams = {
            'scale': float(parameters[:, 0]),
            'epsilon_p': float(parameters[:, 1])
        }
    
    model = FashionMNISTModel(activation=activation, **hyperparams)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    accuracy = train_and_evaluate(model, train_loader, test_loader, criterion, optimizer, device)
    
    return -accuracy  # We want to maximize accuracy, but GPyOpt minimizes the objective function

def bayesian_optimization(activation, train_loader, test_loader, device, max_iter=50):
    if activation in ['SLA', 'R3elu']:
        bounds = [
            {'name': 'C', 'type': 'continuous', 'domain': (0.1, 2.0)},
            {'name': 'K', 'type': 'discrete', 'domain': (1, 2, 3)},
            {'name': 'epsilon_p', 'type': 'continuous', 'domain': (0.1, 10.0)},
            {'name': 'epsilon_l', 'type': 'continuous', 'domain': (0.1, 1.0)},
            {'name': 'probability', 'type': 'continuous', 'domain': (0.3, 0.7)}
        ]
    elif activation == 'RSwish':
        bounds = [
            {'name': 'mean', 'type': 'continuous', 'domain': (0.8, 1.2)},
            {'name': 'stddev', 'type': 'continuous', 'domain': (0.05, 0.2)}
        ]
    elif activation == 'LapSwish':
        bounds = [
            {'name': 'scale', 'type': 'continuous', 'domain': (0.01, 0.5)},
            {'name': 'epsilon_p', 'type': 'continuous', 'domain': (0.1, 10.0)}
        ]
    
    optimizer = GPyOpt.methods.BayesianOptimization(
        f=partial(objective_function, activation=activation, train_loader=train_loader, test_loader=test_loader, device=device),
        domain=bounds,
        model_type='GP',
        acquisition_type='EI',
        maximize=False,
        verbosity=True
    )
    
    optimizer.run_optimization(max_iter=max_iter)
    
    best_hyperparams = optimizer.x_opt
    best_accuracy = -optimizer.fx_opt  # Remember we minimized negative accuracy
    
    return best_hyperparams, best_accuracy

def parallel_bayesian_optimization(activation_functions, train_loader, test_loader, device, max_iter=50, num_processes=2):
    with multiprocessing.Pool(processes=num_processes) as pool:
        results = pool.starmap(
            bayesian_optimization,
            [(activation, train_loader, test_loader, device, max_iter) for activation in activation_functions]
        )
    
    best_activation = None
    best_accuracy = 0
    best_hyperparams = None
    
    for activation, (hyperparams, accuracy) in zip(activation_functions, results):
        print(f'Activation: {activation}, Best Hyperparameters: {hyperparams}, Best Accuracy: {accuracy}')
        if accuracy > best_accuracy:
            best_accuracy = accuracy
            best_activation = activation
            best_hyperparams = hyperparams
    
    return best_activation, best_hyperparams, best_accuracy

if __name__ == '__main__':
    # Prepare the dataset
    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])
    train_dataset = torchvision.datasets.FashionMNIST(root='./data', train=True, download=True, transform=transform)
    test_dataset = torchvision.datasets.FashionMNIST(root='./data', train=False, download=True, transform=transform)
    train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False)

    # Check for GPU
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Perform parallel Bayesian Optimization
    activation_functions = ['SLA', 'R3elu', 'RSwish', 'LapSwish']
    best_activation, best_hyperparams, best_accuracy = parallel_bayesian_optimization(
        activation_functions, train_loader, test_loader, device, max_iter=50, num_processes=2
    )
    print(f'Overall Best Activation: {best_activation}, Best Hyperparameters: {best_hyperparams}, Best Accuracy: {best_accuracy}')

In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import numpy as np
import torch.nn.functional as F
from sklearn.metrics import recall_score
import GPyOpt
import multiprocessing
from functools import partial
from tqdm import tqdm

# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Define Stochastic Laplacian Activation (SLA)
class StochasticLaplacianActivation(nn.Module):
    def __init__(self, C, K, epsilon_p, epsilon_l, probability):
        super(StochasticLaplacianActivation, self).__init__()
        self.C = C
        self.K = K
        self.epsilon_p = epsilon_p
        self.epsilon_l = epsilon_l
        self.probability = probability

    def forward(self, x):
        x_hat = self.clipK(x, self.C, self.K)
        mask = torch.rand_like(x_hat) < self.probability
        noise_scale = 2 * self.K * self.C / (self.epsilon_l * self.epsilon_p)
        laplacian_noise = torch.tensor(np.random.laplace(0, noise_scale, size=x_hat.shape), dtype=torch.float32).to(x.device)
        output = torch.where(mask, torch.maximum(x_hat + laplacian_noise, torch.tensor(0.0).to(x.device)), x_hat)
        return output

    def clipK(self, v, C, K):
        norm_v = torch.norm(v, p=K, dim=1, keepdim=True)
        scale = torch.clamp(C / norm_v, max=1.0)
        return v * scale

# Define R3elu
class R3elu(nn.Module):
    def __init__(self, C, K, epsilon_p, epsilon_l, probability):
        super(R3elu, self).__init__()
        self.C = C
        self.K = K
        self.epsilon_p = epsilon_p
        self.epsilon_l = epsilon_l
        self.probability = probability

    def forward(self, x):
        x_hat = self.clipK(x, self.C, self.K)
        mask = torch.rand_like(x_hat) < self.probability
        noise_scale = 2 * self.K * self.C / (self.epsilon_l * self.epsilon_p)
        laplacian_noise = torch.tensor(np.random.laplace(0, noise_scale, size=x_hat.shape), dtype=torch.float32).to(x.device)
        output = torch.where(mask, F.relu(x_hat + laplacian_noise), F.relu(x_hat))
        return output

    def clipK(self, v, C, K):
        norm_v = torch.norm(v, p=K, dim=1, keepdim=True)
        scale = torch.clamp(C / norm_v, max=1.0)
        return v * scale

# Define Randomized Swish (RSwish)
class RSwish(nn.Module):
    def __init__(self, mean=1.0, stddev=0.1):
        super(RSwish, self).__init__()
        self.mean = mean
        self.stddev = stddev

    def forward(self, x):
        swish = x * torch.sigmoid(x)
        noise = torch.normal(self.mean, self.stddev, size=swish.size()).to(x.device)
        return swish * noise

# Define Laplacian Swish (LapSwish)
class LapSwish(nn.Module):
    def __init__(self, scale=0.1, epsilon_p=1.0):
        super(LapSwish, self).__init__()
        self.scale = scale
        self.epsilon_p = epsilon_p

    def forward(self, x):
        swish = x * torch.sigmoid(x)
        noise_scale = self.scale / self.epsilon_p
        laplacian_noise = torch.tensor(np.random.laplace(0, noise_scale, size=swish.size()), dtype=torch.float32).to(x.device)
        return swish + laplacian_noise

# Define the Fashion MNIST model
class FashionMNISTModel(nn.Module):
    def __init__(self, activation='SLA', **kwargs):
        super(FashionMNISTModel, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(64 * 7 * 7, 128)
        self.fc2 = nn.Linear(128, 10)
        
        if activation == 'SLA':
            self.custom_activation = StochasticLaplacianActivation(**kwargs)
        elif activation == 'R3elu':
            self.custom_activation = R3elu(**kwargs)
        elif activation == 'RSwish':
            self.custom_activation = RSwish(**kwargs)
        elif activation == 'LapSwish':
            self.custom_activation = LapSwish(**kwargs)
        else:
            raise ValueError("Unsupported activation function.")

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 64 * 7 * 7)
        x = self.fc1(x)
        x = self.custom_activation(x)
        x = self.fc2(x)
        return x

def train_and_evaluate(model, train_loader, test_loader, criterion, optimizer, device, num_epochs=10):
    model.to(device)
    for epoch in range(num_epochs):
        model.train()
        for inputs, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
    
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    accuracy = correct / total
    return accuracy

def objective_function(parameters, activation, train_loader, test_loader, device):
    if activation in ['SLA', 'R3elu']:
        hyperparams = {
            'C': float(parameters[:, 0]),
            'K': int(parameters[:, 1]),
            'epsilon_p': float(parameters[:, 2]),
            'epsilon_l': float(parameters[:, 3]),
            'probability': float(parameters[:, 4])
        }
    elif activation == 'RSwish':
        hyperparams = {
            'mean': float(parameters[:, 0]),
            'stddev': float(parameters[:, 1])
        }
    elif activation == 'LapSwish':
        hyperparams = {
            'scale': float(parameters[:, 0]),
            'epsilon_p': float(parameters[:, 1])
        }
    
    model = FashionMNISTModel(activation=activation, **hyperparams)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    accuracy = train_and_evaluate(model, train_loader, test_loader, criterion, optimizer, device)
    
    return -accuracy  # We want to maximize accuracy, but GPyOpt minimizes the objective function

def bayesian_optimization(activation, train_loader, test_loader, device, max_iter=50):
    if activation in ['SLA', 'R3elu']:
        bounds = [
            {'name': 'C', 'type': 'continuous', 'domain': (0.1, 2.0)},
            {'name': 'K', 'type': 'discrete', 'domain': (1, 2, 3)},
            {'name': 'epsilon_p', 'type': 'continuous', 'domain': (0.1, 10.0)},
            {'name': 'epsilon_l', 'type': 'continuous', 'domain': (0.1, 1.0)},
            {'name': 'probability', 'type': 'continuous', 'domain': (0.3, 0.7)}
        ]
    elif activation == 'RSwish':
        bounds = [
            {'name': 'mean', 'type': 'continuous', 'domain': (0.8, 1.2)},
            {'name': 'stddev', 'type': 'continuous', 'domain': (0.05, 0.2)}
        ]
    elif activation == 'LapSwish':
        bounds = [
            {'name': 'scale', 'type': 'continuous', 'domain': (0.01, 0.5)},
            {'name': 'epsilon_p', 'type': 'continuous', 'domain': (0.1, 10.0)}
        ]
    
    optimizer = GPyOpt.methods.BayesianOptimization(
        f=partial(objective_function, activation=activation, train_loader=train_loader, test_loader=test_loader, device=device),
        domain=bounds,
        model_type='GP',
        acquisition_type='EI',
        maximize=False,
        verbosity=True
    )
    
    optimizer.run_optimization(max_iter=max_iter)
    
    best_hyperparams = optimizer.x_opt
    best_accuracy = -optimizer.fx_opt  # Remember we minimized negative accuracy
    
    return best_hyperparams, best_accuracy

def parallel_bayesian_optimization(activation_functions, train_loader, test_loader, device, max_iter=50, num_processes=2):
    with multiprocessing.Pool(processes=num_processes) as pool:
        results = pool.starmap(
            bayesian_optimization,
            [(activation, train_loader, test_loader, device, max_iter) for activation in activation_functions]
        )
    
    best_activation = None
    best_accuracy = 0
    best_hyperparams = None
    
    for activation, (hyperparams, accuracy) in zip(activation_functions, results):
        print(f'Activation: {activation}, Best Hyperparameters: {hyperparams}, Best Accuracy: {accuracy}')
        if accuracy > best_accuracy:
            best_accuracy = accuracy
            best_activation = activation
            best_hyperparams = hyperparams
    
    return best_activation, best_hyperparams, best_accuracy

if __name__ == '__main__':
    # Prepare the dataset
    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])
    train_dataset = torchvision.datasets.FashionMNIST(root='./data', train=True, download=True, transform=transform)
    test_dataset = torchvision.datasets.FashionMNIST(root='./data', train=False, download=True, transform=transform)
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

    # Check for GPU
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    if torch.cuda.is_available():
        print(f"Using GPU: {torch.cuda.get_device_name(device)}")
    else:
        print("Using CPU")

    # Perform parallel Bayesian Optimization
    activation_functions = ['SLA', 'R3elu', 'RSwish', 'LapSwish']
    best_activation, best_hyperparams, best_accuracy = parallel_bayesian_optimization(
        activation_functions, train_loader, test_loader, device, max_iter=50, num_processes=2
    )
    print(f'Overall Best Activation: {best_activation}, Best Hyperparameters: {best_hyperparams}, Best Accuracy: {best_accuracy}')

Using GPU: NVIDIA GeForce RTX 4070 Laptop GPU


In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import numpy as np
import torch.nn.functional as F

# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Define Stochastic Laplacian Activation (SLA)
class StochasticLaplacianActivation(nn.Module):
    def __init__(self, C, K, epsilon_p, epsilon_l, probability):
        super(StochasticLaplacianActivation, self).__init__()
        self.C = C
        self.K = K
        self.epsilon_p = epsilon_p
        self.epsilon_l = epsilon_l
        self.probability = probability

    def forward(self, x):
        x_hat = self.clipK(x, self.C, self.K)
        mask = torch.rand_like(x_hat) < self.probability
        noise_scale = 2 * self.K * self.C / (self.epsilon_l * self.epsilon_p)
        laplacian_noise = torch.tensor(np.random.laplace(0, noise_scale, size=x_hat.shape), dtype=torch.float32).to(x.device)
        output = torch.where(mask, torch.maximum(x_hat + laplacian_noise, torch.tensor(0.0).to(x.device)), x_hat)
        return output

    def clipK(self, v, C, K):
        norm_v = torch.norm(v, p=K, dim=1, keepdim=True)
        scale = torch.clamp(C / norm_v, max=1.0)
        return v * scale

# Define R3elu
class R3elu(nn.Module):
    def __init__(self, C, K, epsilon_p, epsilon_l, probability):
        super(R3elu, self).__init__()
        self.C = C
        self.K = K
        self.epsilon_p = epsilon_p
        self.epsilon_l = epsilon_l
        self.probability = probability

    def forward(self, x):
        x_hat = self.clipK(x, self.C, self.K)
        mask = torch.rand_like(x_hat) < self.probability
        noise_scale = 2 * self.K * self.C / (self.epsilon_l * self.epsilon_p)
        laplacian_noise = torch.tensor(np.random.laplace(0, noise_scale, size=x_hat.shape), dtype=torch.float32).to(x.device)
        output = torch.where(mask, F.relu(x_hat + laplacian_noise), F.relu(x_hat))
        return output

    def clipK(self, v, C, K):
        norm_v = torch.norm(v, p=K, dim=1, keepdim=True)
        scale = torch.clamp(C / norm_v, max=1.0)
        return v * scale

# Define Randomized Swish (RSwish)
class RSwish(nn.Module):
    def __init__(self, mean=1.0, stddev=0.1):
        super(RSwish, self).__init__()
        self.mean = mean
        self.stddev = stddev

    def forward(self, x):
        swish = x * torch.sigmoid(x)
        noise = torch.normal(self.mean, self.stddev, size=swish.size()).to(x.device)
        return swish * noise

# Define Laplacian Swish (LapSwish)
class LapSwish(nn.Module):
    def __init__(self, scale=0.1, epsilon_p=1.0):
        super(LapSwish, self).__init__()
        self.scale = scale
        self.epsilon_p = epsilon_p

    def forward(self, x):
        swish = x * torch.sigmoid(x)
        noise_scale = self.scale / self.epsilon_p
        laplacian_noise = torch.tensor(np.random.laplace(0, noise_scale, size=swish.size()), dtype=torch.float32).to(x.device)
        return swish + laplacian_noise

# Define the Fashion MNIST model
class FashionMNISTModel(nn.Module):
    def __init__(self, activation='SLA', **kwargs):
        super(FashionMNISTModel, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(64 * 7 * 7, 128)
        self.fc2 = nn.Linear(128, 10)
        
        if activation == 'SLA':
            self.custom_activation = StochasticLaplacianActivation(**kwargs)
        elif activation == 'R3elu':
            self.custom_activation = R3elu(**kwargs)
        elif activation == 'RSwish':
            self.custom_activation = RSwish(**kwargs)
        elif activation == 'LapSwish':
            self.custom_activation = LapSwish(**kwargs)
        else:
            raise ValueError("Unsupported activation function.")

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 64 * 7 * 7)
        x = self.fc1(x)
        x = self.custom_activation(x)
        x = self.fc2(x)
        return x

def train_and_evaluate(model, train_loader, test_loader, criterion, optimizer, device, num_epochs=10):
    model.to(device)
    for epoch in range(num_epochs):
        model.train()
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
    
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    accuracy = correct / total
    return accuracy

def measure_randomness(activation, hyperparams, device, num_runs=100):
    model = FashionMNISTModel(activation=activation, **hyperparams).to(device)
    model.eval()

    # Prepare a small batch of data
    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])
    dataset = torchvision.datasets.FashionMNIST(root='./data', train=False, download=True, transform=transform)
    dataloader = DataLoader(dataset, batch_size=64, shuffle=False)
    inputs, _ = next(iter(dataloader))
    inputs = inputs.to(device)

    # Run the same input through the model multiple times
    outputs = []
    with torch.no_grad():
        for _ in range(num_runs):
            output = model(inputs)
            outputs.append(output.cpu().numpy())

    # Calculate the variance of the outputs
    outputs = np.array(outputs)
    variance = np.var(outputs, axis=0)
    mean_variance = np.mean(variance)

    return mean_variance

if __name__ == '__main__':
    # Prepare the dataset
    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])
    train_dataset = torchvision.datasets.FashionMNIST(root='./data', train=True, download=True, transform=transform)
    test_dataset = torchvision.datasets.FashionMNIST(root='./data', train=False, download=True, transform=transform)
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

    # Check for GPU
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # Best hyperparameters from the previous optimization
    best_configs = {
        'SLA': {'C': 0.74748618, 'K': 3, 'epsilon_p': 10.0, 'epsilon_l': 1.0, 'probability': 0.3},
        'R3elu': {'C': 0.70169821, 'K': 3, 'epsilon_p': 10.0, 'epsilon_l': 1.0, 'probability': 0.3},
        'RSwish': {'mean': 0.87115326, 'stddev': 0.0608454},
        'LapSwish': {'scale': 0.01106899, 'epsilon_p': 1.74217515}
    }

    # Train and evaluate models with fixed parameters
    for activation, hyperparams in best_configs.items():
        print(f"\nTraining and evaluating {activation}...")
        model = FashionMNISTModel(activation=activation, **hyperparams)
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        accuracy = train_and_evaluate(model, train_loader, test_loader, criterion, optimizer, device)
        print(f"Activation: {activation}, Accuracy: {accuracy:.4f}")

    # Measure randomness
    print("\nMeasuring randomness scores...")
    for activation, hyperparams in best_configs.items():
        randomness_score = measure_randomness(activation, hyperparams, device)
        print(f"Activation: {activation}, Randomness Score: {randomness_score:.6f}")

print("Execution completed successfully.")

  from .autonotebook import tqdm as notebook_tqdm


Using device: cuda

Training and evaluating SLA...
Activation: SLA, Accuracy: 0.8936

Training and evaluating R3elu...
Activation: R3elu, Accuracy: 0.8698

Training and evaluating RSwish...
Activation: RSwish, Accuracy: 0.9197

Training and evaluating LapSwish...
Activation: LapSwish, Accuracy: 0.9214

Measuring randomness scores...
Activation: SLA, Randomness Score: 0.018274
Activation: R3elu, Randomness Score: 0.015372
Activation: RSwish, Randomness Score: 0.000006
Activation: LapSwish, Randomness Score: 0.000026
Execution completed successfully.


In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import numpy as np
import matplotlib.pyplot as plt

# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Define Stochastic Laplacian Activation (SLA)
class StochasticLaplacianActivation(nn.Module):
    def __init__(self, C, K, epsilon_p, epsilon_l, probability):
        super(StochasticLaplacianActivation, self).__init__()
        self.C = C
        self.K = K
        self.epsilon_p = epsilon_p
        self.epsilon_l = epsilon_l
        self.probability = probability

    def forward(self, x):
        x_hat = self.clipK(x, self.C, self.K)
        mask = torch.rand_like(x_hat) < self.probability
        noise_scale = 2 * self.K * self.C / (self.epsilon_l * self.epsilon_p)
        laplacian_noise = torch.tensor(np.random.laplace(0, noise_scale, size=x_hat.shape), dtype=torch.float32).to(x.device)
        output = torch.where(mask, torch.maximum(x_hat + laplacian_noise, torch.tensor(0.0).to(x.device)), x_hat)
        return output

    def clipK(self, v, C, K):
        norm_v = torch.norm(v, p=K, dim=1, keepdim=True)
        scale = torch.clamp(C / norm_v, max=1.0)
        return v * scale

# Define the Fashion MNIST model
class FashionMNISTModel(nn.Module):
    def __init__(self, activation='SLA', **kwargs):
        super(FashionMNISTModel, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(64 * 7 * 7, 128)
        if activation == 'SLA':
            self.custom_activation = StochasticLaplacianActivation(**kwargs)
        elif activation == 'Swish':
            self.custom_activation = nn.SiLU()  # Swish activation
        else:
            raise ValueError(f"Unsupported activation: {activation}")
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))
        x = x.view(-1, 64 * 7 * 7)
        x = self.fc1(x)
        x = self.custom_activation(x)
        features = x  # Save the features after activation
        x = self.fc2(x)
        return x, features

# Simple generator
class SimpleGenerator(nn.Module):
    def __init__(self):
        super(SimpleGenerator, self).__init__()
        self.fc = nn.Linear(128, 28 * 28)

    def forward(self, x):
        x = self.fc(x)
        x = x.view(-1, 1, 28, 28)
        return torch.sigmoid(x)

def train_model(model, train_loader, device, num_epochs=10):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    model.to(device)
    for epoch in range(num_epochs):
        model.train()
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs, _ = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
        print(f"Epoch {epoch+1}/{num_epochs} completed.")
    
    print("Training completed.")

def visualize_activations(sla_model, swish_model, generator, test_loader, device, num_samples=5, num_generations=3):
    sla_model.eval()
    swish_model.eval()
    generator.eval()
    
    # Get some test images
    dataiter = iter(test_loader)
    images, _ = next(dataiter)
    images = images[:num_samples].to(device)
    
    plt.figure(figsize=(15, 5 * num_samples))
    for i in range(num_samples):
        # Display original image
        plt.subplot(num_samples, 2*num_generations + 1, i*(2*num_generations + 1) + 1)
        plt.imshow(images[i].cpu().squeeze(), cmap='gray')
        plt.axis('off')
        if i == 0:
            plt.title('Original')
        
        # Generate images using SLA model
        for j in range(num_generations):
            with torch.no_grad():
                _, features = sla_model(images[i].unsqueeze(0))
                generated = generator(features)
            
            plt.subplot(num_samples, 2*num_generations + 1, i*(2*num_generations + 1) + j + 2)
            plt.imshow(generated.cpu().squeeze(), cmap='gray')
            plt.axis('off')
            if i == 0:
                plt.title(f'SLA Gen {j+1}')

        # Generate images using Swish model
        for j in range(num_generations):
            with torch.no_grad():
                _, features = swish_model(images[i].unsqueeze(0))
                generated = generator(features)
            
            plt.subplot(num_samples, 2*num_generations + 1, i*(2*num_generations + 1) + num_generations + j + 2)
            plt.imshow(generated.cpu().squeeze(), cmap='gray')
            plt.axis('off')
            if i == 0:
                plt.title(f'Swish Gen {j+1}')
    
    plt.tight_layout()
    plt.savefig('sla_swish_comparison.png')
    print("Visualization saved as 'sla_swish_comparison.png'")
    plt.close()

if __name__ == '__main__':
    # Set up device and data loaders
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])
    train_dataset = torchvision.datasets.FashionMNIST(root='./data', train=True, download=True, transform=transform)
    test_dataset = torchvision.datasets.FashionMNIST(root='./data', train=False, download=True, transform=transform)
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

    # Initialize and train the SLA model
    sla_params = {'C': 0.74748618, 'K': 3, 'epsilon_p': 10.0, 'epsilon_l': 1.0, 'probability': 0.3}
    sla_model = FashionMNISTModel(activation='SLA', **sla_params).to(device)
    print("Training SLA model...")
    train_model(sla_model, train_loader, device)

    # Initialize and train the Swish model
    swish_model = FashionMNISTModel(activation='Swish').to(device)
    print("Training Swish model...")
    train_model(swish_model, train_loader, device)

    # Initialize the generator
    generator = SimpleGenerator().to(device)

    # Visualize the effect of SLA vs Swish
    visualize_activations(sla_model, swish_model, generator, test_loader, device)

    print("Execution completed successfully.")

Using device: cuda
Training SLA model...
Epoch 1/10 completed.
Epoch 2/10 completed.
Epoch 3/10 completed.
Epoch 4/10 completed.
Epoch 5/10 completed.
Epoch 6/10 completed.
Epoch 7/10 completed.
Epoch 8/10 completed.
Epoch 9/10 completed.
Epoch 10/10 completed.
Training completed.
Training Swish model...
Epoch 1/10 completed.
Epoch 2/10 completed.
Epoch 3/10 completed.
Epoch 4/10 completed.
Epoch 5/10 completed.
Epoch 6/10 completed.
Epoch 7/10 completed.
Epoch 8/10 completed.
Epoch 9/10 completed.
Epoch 10/10 completed.
Training completed.
Visualization saved as 'sla_swish_comparison.png'
Execution completed successfully.


Now lets finalize on SLA activation function and do some more tests

The Stochastic Laplacian Activation (SLA) function is designed to introduce controlled noise into the activation process, enhancing privacy while maintaining model performance. Here's a breakdown of its key mathematical components:

Clipping Function:
The SLA first applies a clipping function to the input:
x_hat = clipK(x, C, K)
Where:

x is the input
C is a clipping parameter
K is the norm order

The clipK function is defined as:
clipK(v, C, K) = v * min(1, C / ||v||_K)
Where ||v||_K is the K-norm of v.
Stochastic Noise Addition:
After clipping, SLA adds Laplacian noise with probability p:
output = {
x_hat + Lap(0, 2KC/εℓ), with probability p
x_hat,                  with probability 1-p
}
Where:

Lap(0, 2KC/εℓ) is Laplacian noise with location 0 and scale 2KC/εℓ
εℓ is the local privacy parameter
p is the probability of adding noise


Privacy Guarantees:
SLA provides (ε, δ)-local differential privacy, where:
ε = εℓ + ln(1/δ) / p
This means that for any two possible inputs x and x', and any output y:
Pr[SLA(x) = y] ≤ exp(ε) * Pr[SLA(x') = y] + δ
Hyperparameters:
The key hyperparameters in SLA are:

C: clipping threshold
K: norm order for clipping
εℓ: local privacy parameter
p: probability of adding noise


Implementation Details:
In our PyTorch implementation, we used:

torch.norm for K-norm calculation
torch.clamp for clipping
np.random.laplace for generating Laplacian noise
torch.rand for probabilistic noise addition


I am also testing new efficient activation to preserve privacy of members with "ClippedstocasticSwish"

class ClippedStochasticSwish(nn.Module):
    def __init__(self, C, K, epsilon_p, epsilon_l, probability):
        super(ClippedStochasticSwish, self).__init__()
        self.C = C
        self.K = K
        self.epsilon_p = epsilon_p
        self.epsilon_l = epsilon_l
        self.probability = probability

    def forward(self, x):
        x_clipped = self.clipK(x, self.C, self.K)
        y = x_clipped * torch.sigmoid(x_clipped)
        mask = torch.rand_like(y) < self.probability
        noise_scale = 2 * self.K * self.C / (self.epsilon_l * self.epsilon_p)
        laplacian_noise = torch.tensor(np.random.laplace(0, noise_scale, size=y.shape), dtype=torch.float32).to(x.device)
        output = torch.where(mask, y + laplacian_noise, y)
        return output

    def clipK(self, v, C, K):
        norm_v = torch.norm(v, p=K, dim=1, keepdim=True)
        scale = torch.clamp(C / norm_v, max=1.0)
        return v * scale

In [None]:
#########iter 2 where all the new suggestions like distance based correlation and more tests are added #####

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Subset, TensorDataset
import numpy as np
#from sklearn.model_selection imp
import matplotlib.pyplot as plt
import os
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.ensemble import RandomForestClassifier

# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)

class StochasticLaplacianActivation(nn.Module):
    def __init__(self, C, K, epsilon_p, epsilon_l, probability):
        super(StochasticLaplacianActivation, self).__init__()
        self.C = C
        self.K = K
        self.epsilon_p = epsilon_p
        self.epsilon_l = epsilon_l
        self.probability = probability

    def forward(self, x):
        x_hat = self.clipK(x, self.C, self.K)
        mask = torch.rand_like(x_hat) < self.probability
        noise_scale = 2 * self.K * self.C / (self.epsilon_l * self.epsilon_p)
        laplacian_noise = torch.tensor(np.random.laplace(0, noise_scale, size=x_hat.shape), dtype=torch.float32).to(x.device)
        output = torch.where(mask, torch.maximum(x_hat + laplacian_noise, torch.tensor(0.0).to(x.device)), x_hat)
        return output

    def clipK(self, v, C, K):
        norm_v = torch.norm(v, p=K, dim=1, keepdim=True)
        scale = torch.clamp(C / norm_v, max=1.0)
        return v * scale

class FashionMNISTModel(nn.Module):
    def __init__(self, activation='SLA', **kwargs):
        super(FashionMNISTModel, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(64 * 7 * 7, 128)
        if activation == 'SLA':
            self.custom_activation = StochasticLaplacianActivation(**kwargs)
        elif activation == 'ReLU':
            self.custom_activation = nn.ReLU()
        else:
            raise ValueError(f"Unsupported activation: {activation}")
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 64 * 7 * 7)
        x = self.fc1(x)
        features = self.custom_activation(x)
        x = self.fc2(features)
        return x, features

class Adversary(nn.Module):
    def __init__(self, input_dim):
        super(Adversary, self).__init__()
        self.fc = nn.Linear(input_dim, 1)
    
    def forward(self, x):
        return torch.sigmoid(self.fc(x))

class DeepPropertyInferenceAttack(nn.Module):
    def __init__(self, input_dim):
        super(DeepPropertyInferenceAttack, self).__init__()
        self.fc1 = nn.Linear(input_dim, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, 1)
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return torch.sigmoid(self.fc3(x))

def train_model_with_adversary(model, adversary, train_loader, device, num_epochs=10, lambda_adv=0.1):
    model_optimizer = optim.Adam(model.parameters(), lr=0.001)
    adv_optimizer = optim.Adam(adversary.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()
    adv_criterion = nn.BCELoss()
    
    for epoch in range(num_epochs):
        model.train()
        adversary.train()
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            adv_optimizer.zero_grad()
            outputs, features = model(inputs)
            adv_pred = adversary(features.detach())
            adv_loss = adv_criterion(adv_pred, (labels == 0).float().unsqueeze(1))
            adv_loss.backward()
            adv_optimizer.step()
            
            model_optimizer.zero_grad()
            outputs, features = model(inputs)
            loss = criterion(outputs, labels)
            adv_pred = adversary(features)
            loss -= lambda_adv * adv_criterion(adv_pred, (labels == 0).float().unsqueeze(1))
            loss.backward()
            model_optimizer.step()
        
        print(f"Epoch {epoch+1}/{num_epochs} completed.")

def distance_correlation_loss(X, Y):
    def compute_distance_matrix(X):
        return torch.cdist(X, X)

    def compute_centered_distance_matrix(D):
        n = D.size(0)
        m = D.mean()
        row_mean = D.mean(dim=1, keepdim=True)
        col_mean = D.mean(dim=0, keepdim=True)
        return D - row_mean - col_mean + m

    n = X.size(0)
    dX = compute_centered_distance_matrix(compute_distance_matrix(X))
    dY = compute_centered_distance_matrix(compute_distance_matrix(Y))
    dXY = torch.mul(dX, dY)
    return dXY.sum() / (n * (n-3))

def train_model_with_privacy(model, train_loader, device, num_epochs=10, lambda_dc=0.1):
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()
    
    for epoch in range(num_epochs):
        model.train()
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs, features = model(inputs)
            loss = criterion(outputs, labels)
            
            dc_loss = distance_correlation_loss(inputs.view(inputs.size(0), -1), features)
            loss += lambda_dc * dc_loss
            
            loss.backward()
            optimizer.step()
        
        print(f"Epoch {epoch+1}/{num_epochs} completed.")
              
def evaluate_model(model, test_loader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs, _ = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()   
    accuracy = correct / total
    return accuracy

def advanced_property_inference_attack(model, target_loader, shadow_loader, device):
    attack_model = DeepPropertyInferenceAttack(10).to(device)
    optimizer = optim.Adam(attack_model.parameters(), lr=0.001)
    criterion = nn.BCELoss()

    for epoch in range(10):
        for inputs, labels in target_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            with torch.no_grad():
                outputs, _ = model(inputs)
            
            attack_model.train()
            optimizer.zero_grad()
            pred = attack_model(outputs)
            loss = criterion(pred, (labels == 0).float().unsqueeze(1))
            loss.backward()
            optimizer.step()

    attack_model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in shadow_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs, _ = model(inputs)
            pred = attack_model(outputs)
            correct += ((pred > 0.5) == (labels == 0).unsqueeze(1)).sum().item()
            total += labels.size(0)

    return correct / total

def member_inference_attack(model, member_loader, non_member_loader, device):
    model.eval()

    def get_confidences(loader):
        confidences = []
        with torch.no_grad():
            for inputs, _ in loader:
                inputs = inputs.to(device)
                outputs, _ = model(inputs)
                probs = torch.softmax(outputs, dim=1)
                confidences.extend(probs.max(dim=1)[0].cpu().numpy())
        return confidences

    member_confidences = get_confidences(member_loader)
    non_member_confidences = get_confidences(non_member_loader)

    threshold = np.mean(member_confidences)
    member_preds = [1 if conf >= threshold else 0 for conf in member_confidences]
    non_member_preds = [1 if conf >= threshold else 0 for conf in non_member_confidences]

    member_accuracy = np.mean(member_preds)
    non_member_accuracy = 1 - np.mean(non_member_preds)

    attack_accuracy = (member_accuracy + non_member_accuracy) / 2
    return attack_accuracy

def data_reconstruction_attack(model, target_loader, device, num_iterations=1000):
    model.eval()
    target_input, target_label = next(iter(target_loader))
    target_input = target_input.to(device)
    target_label = target_label.to(device)

    with torch.no_grad():
        target_output, _ = model(target_input)

    reconstructed_input = torch.randn_like(target_input, requires_grad=True)
    optimizer = optim.Adam([reconstructed_input], lr=0.01)
    criterion = nn.MSELoss()

    for _ in range(num_iterations):
        optimizer.zero_grad()
        output, _ = model(reconstructed_input)
        loss = criterion(output, target_output)
        loss.backward()
        optimizer.step()

    reconstruction_error = torch.mean((reconstructed_input - target_input) ** 2).item()
    return reconstruction_error

def plot_results(sla_results, baseline_results):
    labels = ['Accuracy', 'Prop Inf', 'Mem Inf', 'Recon Error']
    x = np.arange(len(labels))
    width = 0.35

    fig, ax = plt.subplots(figsize=(12, 6))
    rects1 = ax.bar(x - width/2, sla_results, width, label='SLA Model')
    rects2 = ax.bar(x + width/2, baseline_results, width, label='Baseline Model')

    ax.set_ylabel('Scores')
    ax.set_title('Comparison of SLA and Baseline Models')
    ax.set_xticks(x)
    ax.set_xticklabels(labels)
    ax.legend()

    ax.bar_label(rects1, padding=3)
    ax.bar_label(rects2, padding=3)

    fig.tight_layout()
    plt.savefig('model_comparison.png')
    print("Comparison plot saved as 'model_comparison.png'")

if __name__ == '__main__':
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])
    full_dataset = torchvision.datasets.FashionMNIST(root='./data', train=True, download=True, transform=transform)
    test_dataset = torchvision.datasets.FashionMNIST(root='./data', train=False, download=True, transform=transform)

    train_size = int(0.8 * len(full_dataset))
    val_size = len(full_dataset) - train_size
    train_dataset, val_dataset = torch.utils.data.random_split(full_dataset, [train_size, val_size])

    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

    sla_params = {'C': 0.74748618, 'K': 3, 'epsilon_p': 10.0, 'epsilon_l': 1.0, 'probability': 0.3}
    sla_model = FashionMNISTModel(activation='SLA', **sla_params).to(device)
    baseline_model = FashionMNISTModel(activation='ReLU').to(device)
    adversary = Adversary(128).to(device)

    print("Training SLA model with adversary...")
    train_model_with_adversary(sla_model, adversary, train_loader, device)
    print("Training baseline model with privacy...")
    train_model_with_privacy(baseline_model, train_loader, device)

    print("Evaluating models...")
    sla_accuracy = evaluate_model(sla_model, test_loader, device)
    baseline_accuracy = evaluate_model(baseline_model, test_loader, device)

    sla_prop_inf = advanced_property_inference_attack(sla_model, val_loader, test_loader, device)
    baseline_prop_inf = advanced_property_inference_attack(baseline_model, val_loader, test_loader, device)

    sla_mem_inf = member_inference_attack(sla_model, train_loader, test_loader, device)
    baseline_mem_inf = member_inference_attack(baseline_model, train_loader, test_loader, device)

    sla_recon_error = data_reconstruction_attack(sla_model, test_loader, device)
    baseline_recon_error = data_reconstruction_attack(baseline_model, test_loader, device)

    print("\nResults:")
    print(f"SLA Model - Accuracy: {sla_accuracy:.4f}, Prop Inf: {sla_prop_inf:.4f}, Mem Inf: {sla_mem_inf:.4f}, Recon Error: {sla_recon_error:.4f}")
    print(f"Baseline Model - Accuracy: {baseline_accuracy:.4f}, Prop Inf: {baseline_prop_inf:.4f}, Mem Inf: {baseline_mem_inf:.4f}, Recon Error: {baseline_recon_error:.4f}")