# Lab 9
## Differential Privacy

## Init Environment
For this assignment, we need a new library: `opacus`.



In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt

from lab_2_hfl.hfl_complete import *

# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:

def train_model(model, train_loader, optimizer, epochs=5, callback=None, seed=42):
    torch.manual_seed(seed)
    pbar = tqdm(range(epochs), desc="Training")
    model.train()
    for epoch in pbar:
        for data, target in train_loader:
            if callback is not None:
                data = callback(data)
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()
            pbar.set_postfix(loss=loss.item())


baseline_model = MnistCnn().to(device)
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
baseline_optimizer = optim.Adam(baseline_model.parameters(), lr=0.005)
train_model(baseline_model, train_loader, baseline_optimizer, epochs=2)


In [None]:
# Function to test the model
def test(model, test_loader):
    model.eval()
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    accuracy = 100. * correct / len(cast(datasets.MNIST, test_loader.dataset))
    return accuracy

baseline_acc = test(baseline_model, test_loader)
print(f"Baseline Test Accuracy: {baseline_acc:.2f}%")

## Manual DP

Implement a function that applies laplacian DP to a tensor before using it for training

In [None]:
import torch.distributions as dist

def imshow(img, ax, title=""):
    # img is a torch.Tensor
    img = img.cpu().numpy().transpose((1, 2, 0))  # Convert to numpy format and change order to HxWxC
    # Undo normalization
    mean = np.array([0.1307])
    std = np.array([0.3081])
    img = std * img + mean  # This line undoes the normalization applied to the images
    img = np.clip(img, 0, 1)  # Ensure the image's values are between 0 and 1 so plt can plot it correctly
    ax.imshow(img)
    if title:
        ax.set_title(title)
    ax.axis('off')

def imshow_pair(original, reconstructed, title_right=""):
    fig, axs = plt.subplots(1, 2, figsize=(8, 4))
    imshow(original, axs[0], f"Original")
    imshow(reconstructed, axs[1], title_right)
    plt.show()

def add_laplace_noise(tensor, epsilon, sensitivity):
    """
    Manually adds Laplacian noise to a tensor to satisfy epsilon-DP.
    """
    # 1: Calculate the scale parameter (b) for the Laplace distribution
    b = ...
    
    # 2: Create a Laplace distribution
    m = ...
    
    # 3: Sample noise from the distribution that matches the shape of the input tensor
    noise = ...
    # 4: Return the original tensor + the noise
    return tensor + noise

# Let's test it on a dummy tensor first
dummy_tensor = torch.tensor([1.0, 2.0, 3.0])
noisy_tensor = add_laplace_noise(dummy_tensor, epsilon=0.5, sensitivity=1.0)
print(f"Original Tensor: {dummy_tensor}")
print(f"Noisy Tensor:    {noisy_tensor}")

In [None]:
# Get a single image from the test dataset
sample_image, sample_label = train_dataset[0]
# NOTE: Sensitivity should be calculated based on the specific query/function you want to make private. Here we use 0.5 as a placeholder.
noisy_image = add_laplace_noise(sample_image, epsilon=1.0, sensitivity=0.5)
# Visualize the original and noisy images
imshow_pair(sample_image, noisy_image, title_right="Noisy (ε=1.0)")

Now experiment a bit with epsilon to see how the image changes

In [None]:
EPSILONS = [0.1, 0.2, 0.5, 0.8, 0.9, 1.0]
sensitivity = 0.5  # Placeholder sensitivity value

sample_image, sample_label = train_dataset[53]  # Get a sample image from the dataset

for epsilon in EPSILONS:
    noisy_image = add_laplace_noise(sample_image, epsilon=epsilon, sensitivity=sensitivity)
    imshow_pair(sample_image, noisy_image, title_right=f"Noisy (ε={epsilon})")

Now train the model using noised data with a chosen epsilon.

In [None]:
CHOSEN_EPSILON = 0.8
sensitivity = 0.5  # Placeholder sensitivity value

# define callback for training
callback = lambda x: add_laplace_noise(x, epsilon=CHOSEN_EPSILON, sensitivity=sensitivity)

manual_dp_model = MnistCnn().to(device)
optimizer = optim.Adam(params=manual_dp_model.parameters(), lr=0.005)
train_model(manual_dp_model, train_loader, optimizer, epochs=2, callback=callback)
dp_acc = test(manual_dp_model, test_loader)
print(f"Manual DP Test Accuracy (ε={CHOSEN_EPSILON}): {dp_acc:.2f}%")

# Opacus

Now we will use a library to do this for us in a better privacy-preserving way that does not destroy model performance.
The library implements DP-SGD and will tell us the privacy budget it was actually able to achieve.
`max_grad_norm` is kind of like the sensitivity, it's the max value we allow the gradient to be without clipping.

In [None]:
from opacus import PrivacyEngine

# Initialize a fresh model and optimizer
private_model = MnistCnn().to(device)
private_optimizer = optim.Adam(private_model.parameters(), lr=0.005)

# 1: Instantiate the PrivacyEngine
privacy_engine = ...

# 2: Wrap your model, optimizer, and dataloader using `make_private`
private_model, private_optimizer, private_train_loader = ..., ..., ...

print("Training Private Model...")
train_model(private_model, private_train_loader, private_optimizer, epochs=2)

# 3: Calculate the privacy budget (epsilon) for low delta
epsilon = privacy_engine.get_epsilon(delta=1e-5)
print(f"Final Privacy Budget: epsilon = {epsilon:.2f}")

private_acc = test(private_model, test_loader)
print(f"Private Test Accuracy: {private_acc:.2f}%")