In [None]:
import os
import copy

import torch
import tqdm
import torchvision

from typing import Literal

import abstract_gradient_training as agt
from abstract_gradient_training import AGTConfig
from abstract_gradient_training.bounded_models import IntervalBoundedModel
import privacy_utils_regression

In [None]:
import torch
import pandas as pd
from sklearn.model_selection import train_test_split

def get_datasets(csv_path='cubic_with_noise.csv', test_size=0.2, balanced=False):
    """
    Load cubic dataset with noise from CSV and return train/test datasets as TensorDatasets.

    Args:
        csv_path (str): Path to the dataset CSV file.
        test_size (float): Fraction of data to use for testing.
        balanced (bool): Whether to balance the dataset based on the sign of the target.

    Returns:
        train_dataset, test_dataset (TensorDataset, TensorDataset)
    """

    # Load dataset
    df = pd.read_csv(csv_path)
    x = torch.tensor(df['x'].values, dtype=torch.float32).unsqueeze(1)
    y = torch.tensor(df['y'].values, dtype=torch.float32).unsqueeze(1)

    # Optional "balancing" — for example, balance data with y > 0 and y <= 0
    if balanced:
        y_binary = (y > 0).squeeze()
        idx_pos = torch.where(y_binary == 1)[0]
        idx_neg = torch.where(y_binary == 0)[0]
        n_samples = min(len(idx_pos), len(idx_neg))

        # Shuffle and sample
        idx_pos = idx_pos[torch.randperm(len(idx_pos))[:n_samples]]
        idx_neg = idx_neg[torch.randperm(len(idx_neg))[:n_samples]]
        idx = torch.cat([idx_pos, idx_neg])

        x = x[idx]
        y = y[idx]

    # Split into train/test
    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=test_size, random_state=42)

    # Wrap in TensorDataset
    train_dataset = torch.utils.data.TensorDataset(x_train, y_train)
    test_dataset = torch.utils.data.TensorDataset(x_test, y_test)

    return train_dataset, test_dataset


In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
batchsize = 1000000

In [None]:
# get dataloaders
dataset_train, dataset_test = get_datasets(balanced=True) 
torch.manual_seed(0)

dl_train = torch.utils.data.DataLoader(dataset_train, batch_size=batchsize, shuffle=True)
dl_test = torch.utils.data.DataLoader(dataset_test, batch_size=batchsize, shuffle=False)

In [None]:
import torch.nn.functional as F
from abstract_gradient_training.bounded_models import BoundedModel
def noisy_test_mse(
    model: torch.nn.Sequential | BoundedModel,
    batch: torch.Tensor,
    labels: torch.Tensor,
    noise_level: float | torch.Tensor = 0.0,
    noise_type: str = "laplace",
) -> float:
    """
    Given a pytorch (or bounded) model, calculate the prediction accuracy on a batch of the test set when adding the
    specified noise to the predictions.
    NOTE: For now, this function only supports binary classification via the noise + threshold dp mechanism. This
          should be extended to support multi-class problems via the noisy-argmax mechanism in the future.

    Args:
        model (torch.nn.Sequential | BoundedModel): The model to evaluate.
        batch (torch.Tensor): Input batch of data (shape [batchsize, ...]).
        labels (torch.Tensor): Targets for the input batch (shape [batchsize, ]).
        noise_level (float | torch.Tensor, optional): Noise level for privacy-preserving predictions using the laplace
            mechanism. Can either be a float or a torch.Tensor of shape (batchsize, ).
        noise_type (str, optional): Type of noise to add to the predictions, one of ["laplace", "cauchy"].

    Returns:
        float: The noisy accuracy of the model on the test set.
    """
    # get the test batch and send it to the correct device
    if isinstance(model, BoundedModel):
        device = torch.device(model.device) if model.device != -1 else torch.device("cpu")
    else:
        device = torch.device(next(model.parameters()).device)
    batch = batch.to(device)
    
    # validate the labels
    if labels.dim() > 1:
        labels = labels.squeeze()
        
    labels = labels.to(device).type(torch.float64)
    assert labels.dim() == 1, "Labels must be of shape (batchsize, )"

    if noise_type in ["none"]:
        # nominal, lower and upper bounds for the forward pass
        y_n = model.forward(batch).squeeze()
        return F.mse_loss(y_n, labels.squeeze()).item()

    # validate the noise parameters and set up the distribution
    assert noise_type in ["laplace", "cauchy"], f"Noise type must be one of ['laplace', 'cauchy'], got {noise_type}"
    noise_level += 1e-7  # can't set distributions scale to zero
    noise_level = torch.tensor(noise_level) if isinstance(noise_level, float) else noise_level
    noise_level = noise_level.to(device).type(batch.dtype)  # type: ignore
    noise_level = noise_level.expand(labels.size())
    if noise_type == "laplace":
        noise_distribution = torch.distributions.Laplace(0, noise_level)
    else:
        noise_distribution = torch.distributions.Cauchy(0, noise_level)

    # nominal, lower and upper bounds for the forward pass
    y_n = model.forward(batch).squeeze()

    # transform 2-logit models to a single output
    if y_n.shape[-1] == 2:
        y_n = y_n[:, 1] - y_n[:, 0]
    if y_n.dim() > 1:
        raise NotImplementedError("Noisy accuracy is not supported for multi-class classification.")

    # apply noise + threshold dp mechanisim
    noise = noise_distribution.sample().to(y_n.device).squeeze()
    assert noise.shape == y_n.shape
    y_n = y_n + noise
    accuracy = F.mse_loss(y_n, labels.squeeze()).item()
    return accuracy

In [None]:
# set up the AGT configuration
batchsize = 1000000
nominal_config = AGTConfig(
    fragsize=2000,
    learning_rate=1,
    n_epochs=10,
    device="cuda:0",
    l2_reg=0.01,
    k_private=1,
    loss="mse",
    log_level="INFO",
    lr_decay=2.0,
    clip_gamma=1.0,
    lr_min=0.001,
    optimizer="SGDM", # we'll use SGD with momentum
    optimizer_kwargs={"momentum": 0.9, "nesterov": True},
)

In [None]:
privacy_bounded_models = {}
k_private_values = [1, 2, 5, 10, 20, 50, 100] 
size = 32
for k in k_private_values:
    path = "path/to/model"
    model = torch.nn.Sequential(
        torch.nn.Linear(1, size),
        torch.nn.ReLU(),
        torch.nn.Linear(size, 1),
    )
    bounded_model = IntervalBoundedModel(model, trainable=True)
    bounded_model.load_params(path)
    privacy_bounded_models[k] = bounded_model

# evaluate the fine-tuned model
_, dataset_test_all = get_datasets()
accuracy = agt.test_metrics.test_mse(bounded_model, *dataset_test_all.tensors)
print(f"Fine-tuned model accuracy + certified bounds (all classes): {accuracy[2]:.2f} <= {accuracy[1]:.2f} <= {accuracy[0]:.2f}")

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Define the specific epsilon values
epsilon_values = [0.001, 0.01, 0.1, 1, 10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000, 100000000000, 1000000000000]

# Store results for both normal MSE and global sensitivity-based MSE
normal_mse_values = []
global_sensitivity_mse_values = []
no_privacy_mse_values = []

# Loop over epsilon values and calculate the MSE for each
for epsilon in epsilon_values:
    # Calculate the noise level using AGT smooth sensitivity bounds
    noise_level = privacy_utils_regression.get_calibrated_noise_level(
        dataset_test.tensors[0], privacy_bounded_models, min_bound=-27, max_bound=27, epsilon=epsilon, noise_type="cauchy"
    )
    
    ave = 0
    num = 1000
    for i in range(num):
        ave += noisy_test_mse(
            bounded_model, *dataset_test.tensors, noise_level=noise_level, noise_type="cauchy"
        )
    normal_mse = ave / (num * len(dataset_test))
    
    # Store normal MSE
    normal_mse_values.append(normal_mse)
    

    ave = 0
    num = 1000
    for i in range(num):
        ave += noisy_test_mse(
            bounded_model, *dataset_test.tensors, noise_level=6 * 54/epsilon, noise_type="cauchy"
        )
    
    global_mse = ave / (num * len(dataset_test))
    
    # Store global sensitivity MSE
    global_sensitivity_mse_values.append(global_mse)

    no_privacy_mse = noisy_test_mse(
            bounded_model, *dataset_test.tensors, noise_level=0, noise_type="none"
        ) / len(dataset_test)
    # Store global sensitivity MSE
    no_privacy_mse_values.append(no_privacy_mse)



In [None]:
import seaborn as sns
# Apply a Seaborn theme
sns.set_theme(style="whitegrid")

# Create the figure
plt.figure(figsize=(12, 7))

# Plot MSE curves with enhanced styles
sns.lineplot(x=epsilon_values, y=normal_mse_values, label="AGT MSE", 
             marker='o', markersize=10, linestyle='--', color='royalblue', linewidth=3)

sns.lineplot(x=epsilon_values, y=global_sensitivity_mse_values, label="Global Sensitivity MSE", 
             marker='x', markersize=10, linestyle='--', color='crimson', linewidth=3)

sns.lineplot(x=epsilon_values, y=no_privacy_mse_values, label="No Privacy MSE", 
             marker='s', markersize=10, linestyle='-', color='seagreen', linewidth=3)

# Log scale for both axes
plt.xscale('log')
plt.yscale('log')

# Axis labels and title
plt.xlabel('Epsilon (Log Scale)', fontsize=30)
plt.ylabel('MSE (Log Scale)', fontsize=30)
# plt.title('MSE vs Epsilon under Different Privacy Settings', fontsize=16, fontweight='bold')

# Legend and grid
plt.legend(title='MSE Type', fontsize=20, title_fontsize=24, loc='best')
plt.grid(True, which="both", linestyle='--', linewidth=2)

# Tidy up layout
plt.tight_layout()

plt.ylim(1e-4, 1e13)

plt.tick_params(axis='both', labelsize=20) 

# Show plot
plt.show()