In [None]:
import warnings
warnings.simplefilter("ignore")

In [None]:
import datasets

from peft import PromptEncoderConfig, TaskType, get_peft_model

from opacus.validators import ModuleValidator
from opacus import PrivacyEngine

import torch
import torch.nn as nn
import numpy as np

from tqdm.notebook import tqdm
from torch.optim import SGD
from torch.utils.data import DataLoader

from transformers import AutoModelForSequenceClassification, AutoTokenizer, AutoConfig, DataCollatorWithPadding

from sklearn.metrics import accuracy_score

In [None]:
model_name = "prajjwal1/bert-tiny"
EPOCHS = 5
BATCH_SIZE = 1024
LR = 5e-5

In [None]:
dataset = datasets.load_dataset('glue', 'qqp')
num_labels = dataset["train"].features["label"].num_classes

In [None]:
padding_side = "right"

tokenizer = AutoTokenizer.from_pretrained(model_name)

if getattr(tokenizer, "pad_token_id") is None:
    tokenizer.pad_token_id = tokenizer.eos_token_id

def tokenize_function(examples):
    outputs = tokenizer(examples["question1"], examples["question2"], truncation=True, padding="max_length", max_length=128)
    return outputs

In [None]:
tokenized_datasets = dataset.map(
    tokenize_function,
    batched=True,
    remove_columns=["idx", "question1", "question2"],
)

tokenized_datasets = tokenized_datasets.rename_column("label", "labels")

In [None]:
data_collator = DataCollatorWithPadding(tokenizer=tokenizer, padding="longest")

In [None]:
train_dataloader = DataLoader(
    tokenized_datasets["train"], 
    shuffle=True, 
    batch_size=BATCH_SIZE, 
    collate_fn=data_collator
)

test_dataloader = DataLoader(
    tokenized_datasets["validation"], 
    shuffle=False, 
    batch_size=BATCH_SIZE, 
    collate_fn=data_collator
)

In [None]:
EPSILON = np.inf
DELTA = 1
MAX_GRAD_NORM = 0
MAX_PHYSICAL_BATCH_SIZE = int(BATCH_SIZE/4)

In [None]:
config = AutoConfig.from_pretrained(model_name)
config.num_labels = num_labels

peft_config = PromptEncoderConfig(
    task_type=TaskType.SEQ_CLS,
    num_virtual_tokens=100,
    encoder_hidden_size=128
)

model = AutoModelForSequenceClassification.from_pretrained(model_name, config=config)
model = get_peft_model(model, peft_config)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

In [None]:
errors = ModuleValidator.validate(model, strict=False)
print(errors)

In [None]:
optimizer = SGD(params=model.parameters(), lr=LR)

In [None]:
privacy_engine = PrivacyEngine(accountant="rdp")

model, optimizer, train_dataloader = privacy_engine.make_private_with_epsilon(
    module=model,
    optimizer=optimizer,
    data_loader=train_dataloader,
    epochs=EPOCHS,
    target_epsilon=EPSILON,
    target_delta=DELTA,
    max_grad_norm=MAX_GRAD_NORM,
    batch_first=True,
)

In [None]:
print(f"Using Sigma = {optimizer.noise_multiplier:.3f} | C = {optimizer.max_grad_norm} | Initial DP (ε, δ) = ({privacy_engine.get_epsilon(DELTA)}, {DELTA})")

In [None]:
def print_trainable_parameters(model):
    trainable_params = 0
    all_param = 0
    for _, param in model.named_parameters():
        all_param += param.numel()
        if param.requires_grad:
            trainable_params += param.numel()
    print(
        f"Trainable Parameters: {trainable_params} || All Parameters: {all_param} || Trainable Parameters (%): {100 * trainable_params / all_param:.2f}"
    )

print_trainable_parameters(model)

In [None]:
def train(model, train_dataloader, optimizer, epoch, device):
    model.train()
    criterion = nn.CrossEntropyLoss()

    losses = []
    epsilon = []

    for i, batch in tqdm(enumerate(train_dataloader), total=len(train_dataloader), desc=f"Training Epoch: {epoch}"):
        
        batch = {k: v.to(device) for k, v in batch.items()}
        optimizer.zero_grad()

        outputs = model(**batch)
        loss = criterion(outputs.logits, batch["labels"])
        loss.backward()

        optimizer.step()
        losses.append(loss.item())

        if i % 64000 == 0:
            epsilon = privacy_engine.get_epsilon(DELTA)

            print(f"Training Epoch: {epoch} | Loss: {np.mean(losses):.6f} | ε = {epsilon:.2f}")

In [None]:
def test(model, test_dataloader, device):
    model.eval()
    eval_loss = 0
    eval_preds = []

    with torch.no_grad():
        for batch in tqdm(test_dataloader, desc="Test"):
            batch = {k: v.to(device) for k, v in batch.items()}

            outputs = model(**batch)
        loss = outputs.loss

        eval_loss += loss.detach().float()
        preds = outputs.logits.argmax(dim=-1)

        eval_preds.extend(
            tokenizer.batch_decode(torch.argmax(outputs.logits, -1).detach().cpu().numpy(), skip_special_tokens=True)
        )
        acc = accuracy_score(preds.cpu().numpy(), batch["labels"].cpu().numpy())

    print(
        f"Test set: Loss: {eval_loss:.4f}, Accuracy: {acc*100:.2f}%"
    )

    return eval_loss, acc

In [None]:
for epoch in tqdm(range(EPOCHS), desc=f'Training {EPOCHS} Epochs'):
    
    train(model, train_dataloader, optimizer, epoch + 1, device)

In [None]:
final_epsilon = privacy_engine.get_epsilon(DELTA)
print(f"Final DP Guarantee (ε, δ)-DP = ({final_epsilon:.2f}, {DELTA})")

In [None]:
test(model, test_dataloader, device)