In [6]:
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
import random
import seaborn as sns
from sklearn.metrics import confusion_matrix
from torch.utils.data import DataLoader
from timeit import default_timer as timer
from tqdm import tqdm
from torchmetrics import ConfusionMatrix


In [7]:
# Define transformations for normalizing and converting to tensor
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

# Load the CIFAR-10 dataset
batch_size = 4
trainset = torchvision.datasets.CIFAR10(root="./cnn-data", train=True, download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root="./cnn-data", train=False, download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=2)

classes = ("plane", "car", "bird", "cat", "deer", "dog", "frog", "horse", "ship", "truck")


Files already downloaded and verified
Files already downloaded and verified


In [8]:
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

net = Net()


In [9]:
def add_backdoor_attack(dataset, target_class, trigger_size=7):
    """Add a backdoor attack by modifying a fraction of the images in the dataset."""
    modified_dataset = []
    for image, label in dataset:
        # Add trigger to image if the label matches the target_class
        if label == target_class:
            # Modify image to add backdoor trigger (small square in the corner)
            image[:, -trigger_size:, -trigger_size:] = 1  # Add white square in the corner as a trigger
        modified_dataset.append((image, label))
    return modified_dataset

# Add backdoor trigger for a target class (e.g., target_class=0, 'plane')
target_class = 0  # plane
modified_testset = add_backdoor_attack(testset, target_class)
modified_testloader = DataLoader(modified_testset, batch_size=batch_size, shuffle=False, num_workers=2)


In [5]:
def accuracy_fn(y_true, y_pred):
    return (y_true == y_pred).sum().item() / len(y_true) * 100  # percentage accuracy

# Helper function for training
def train_step(model, data_loader, loss_fn, optimizer, device):
    model.train()
    train_loss, train_acc = 0, 0
    for images, labels in data_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        train_acc += accuracy_fn(labels.cpu(), outputs.argmax(dim=1).cpu())
    
    train_loss /= len(data_loader)
    train_acc /= len(data_loader)
    return train_loss, train_acc

# Helper function for testing
def test_step(model, data_loader, loss_fn, device):
    model.eval()
    test_loss, test_acc = 0, 0
    with torch.no_grad():
        for images, labels in data_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = loss_fn(outputs, labels)
            
            test_loss += loss.item()
            test_acc += accuracy_fn(labels.cpu(), outputs.argmax(dim=1).cpu())
    
    test_loss /= len(data_loader)
    test_acc /= len(data_loader)
    return test_loss, test_acc

# Train the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Net().to(device)
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
loss_fn = nn.CrossEntropyLoss()

# Training loop
epochs = 8
train_time_start = timer()
for epoch in tqdm(range(epochs)):
    train_loss, train_acc = train_step(model, trainloader, loss_fn, optimizer, device)
    test_loss, test_acc = test_step(model, testloader, loss_fn, device)
    print(f"Epoch {epoch+1}/{epochs} -> Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}% | Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.2f}%")
train_time_end = timer()
total_train_time = train_time_end - train_time_start
print(f"Total Training Time: {total_train_time:.2f} seconds")


 12%|█▎        | 1/8 [00:52<06:09, 52.83s/it]

Epoch 1/8 -> Train Loss: 1.6835, Train Acc: 37.98% | Test Loss: 1.3976, Test Acc: 49.58%


 25%|██▌       | 2/8 [01:43<05:08, 51.40s/it]

Epoch 2/8 -> Train Loss: 1.3030, Train Acc: 53.67% | Test Loss: 1.2928, Test Acc: 54.06%


In [None]:
def get_predictions(model, data_loader, device):
    model.eval()
    y_pred, y_true = [], []
    with torch.no_grad():
        for images, labels in data_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            y_pred.extend(predicted.cpu().numpy())
            y_true.extend(labels.cpu().numpy())
    return np.array(y_true), np.array(y_pred)

# Get predictions for clean test data
y_true_clean, y_pred_clean = get_predictions(model, testloader, device)

# Get predictions for backdoored test data
y_true_backdoor, y_pred_backdoor = get_predictions(model, modified_testloader, device)

# Calculate confusion matrices
conf_matrix_clean = confusion_matrix(y_true_clean, y_pred_clean)
conf_matrix_backdoor = confusion_matrix(y_true_backdoor, y_pred_backdoor)

# Helper function to plot confusion matrix
def plot_confusion_matrix(conf_matrix, class_names, title="Confusion Matrix"):
    plt.figure(figsize=(10, 8))
    sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.title(title)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.show()

# Plot confusion matrix for clean test data
plot_confusion_matrix(conf_matrix_clean, classes, title="Confusion Matrix - Clean Test Data")

# Plot confusion matrix for backdoored test data
plot_confusion_matrix(conf_matrix_backdoor, classes, title="Confusion Matrix - Backdoored Test Data")
