# **Defending Against Poisoned Models**

This project aims to build a simple image classifier and poison a small subset of the data it is trained on to misclassify a specific target image. Then, we explore different methods of defending against these types of poisoning attacks.

# Basic Image Classifier

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms

# CIFAR-10 normalization constants
CIFAR10_MEAN = (0.4914, 0.4822, 0.4465)
CIFAR10_STD = (0.2023, 0.1994, 0.2010)

# Data augmentation for training
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(CIFAR10_MEAN, CIFAR10_STD),
])

# No augmentation for testing
transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(CIFAR10_MEAN, CIFAR10_STD),
])

train_set = torchvision.datasets.CIFAR10(
    root='./data', train=True, download=True, transform=transform_train
)
test_set = torchvision.datasets.CIFAR10(
    root='./data', train=False, download=True, transform=transform_test
)

train_loader = torch.utils.data.DataLoader(train_set, batch_size=128, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=128, shuffle=False)

print("Classes:", train_set.classes)



100%|██████████| 170M/170M [11:57<00:00, 238kB/s]


Classes: ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']


In [None]:
import torch.nn as nn
import torchvision.models as models
from google.colab import drive


# ResNet18 - modify for CIFAR-10 (32x32 images)
model = models.resnet18(weights='IMAGENET1K_V1')

# Modify first conv layer for 32x32 images
model.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
model.maxpool = nn.Identity()  # Remove maxpool for smaller images

# Replace final layer for 10 classes
model.fc = nn.Linear(model.fc.in_features, 10)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)


print(f"Model ready on: {device}")

Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth


100%|██████████| 44.7M/44.7M [00:00<00:00, 113MB/s]


Model ready on: cuda


In [None]:
import torch.optim as optim

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)    # small lr is safer for fine-tuning

In [None]:
def train(model, loader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    correct = 0
    total = 0

    for images, labels in loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

    return total_loss / len(loader), correct / total

def test(model, loader, criterion, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)

            total_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

    return total_loss / len(loader), correct / total

In [None]:
num_epochs = 10

for epoch in range(num_epochs):
    train_loss, train_acc = train(model, train_loader, optimizer, criterion, device)
    test_loss, test_acc = test(model, test_loader, criterion, device)

    print(f"Epoch {epoch+1}/{num_epochs}  |  "
          f"Train Acc: {train_acc:.3f}  |  Test Acc: {test_acc:.3f}")

Epoch 1/10  |  Train Acc: 0.748  |  Test Acc: 0.837
Epoch 2/10  |  Train Acc: 0.866  |  Test Acc: 0.885
Epoch 3/10  |  Train Acc: 0.907  |  Test Acc: 0.901
Epoch 4/10  |  Train Acc: 0.930  |  Test Acc: 0.918
Epoch 5/10  |  Train Acc: 0.944  |  Test Acc: 0.920
Epoch 6/10  |  Train Acc: 0.955  |  Test Acc: 0.923
Epoch 7/10  |  Train Acc: 0.964  |  Test Acc: 0.927
Epoch 8/10  |  Train Acc: 0.968  |  Test Acc: 0.927
Epoch 9/10  |  Train Acc: 0.972  |  Test Acc: 0.927
Epoch 10/10  |  Train Acc: 0.975  |  Test Acc: 0.928


In [None]:
### FOR LOADING THE SAVED MODEL (use if loading a previously trained model)

drive.mount('/content/drive')

model.load_state_dict(torch.load("/content/drive/MyDrive/CS260D_Final_Project/model_baseline.pth"))

# Poisoning the Model

In [None]:
model.eval()

target_image = None
target_true_label = None
target_predicted_label = None
target_probabilities = None
max_dog_prob = -1.0

# Get class indices
deer_idx = train_set.classes.index('deer')
dog_idx = train_set.classes.index('dog')

print(f"Searching for a 'deer' image with high 'dog' probability...")

with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        probabilities = torch.softmax(outputs, dim=1)
        _, predicted = torch.max(probabilities, 1)

        for i in range(images.size(0)):
            true_label = labels[i].item()
            predicted_label = predicted[i].item()

            # Check if it's a 'deer' and correctly classified as 'deer'
            if true_label == deer_idx and predicted_label == deer_idx:
                current_dog_prob = probabilities[i, dog_idx].item()

                if current_dog_prob > max_dog_prob:
                    max_dog_prob = current_dog_prob
                    target_image = images[i].cpu()
                    target_true_label = true_label
                    target_predicted_label = predicted_label
                    target_probabilities = probabilities[i].cpu()

# Print the results for the selected target image
if target_image is not None:
    print(f"\n--- Selected Target Image Details ---")
    print(f"True Class: {train_set.classes[target_true_label]} (Index: {target_true_label})")
    print(f"Predicted Class: {train_set.classes[target_predicted_label]} (Index: {target_predicted_label})")

    # Get top 5 probabilities and classes
    top5_probs, top5_indices = torch.topk(target_probabilities, 5)
    print("Top 5 Predicted Probabilities and Classes:")
    for i in range(5):
        class_name = train_set.classes[top5_indices[i].item()]
        probability = top5_probs[i].item()
        print(f"  {class_name}: {probability:.4f}")
    print(f"\nThis 'deer' image was chosen because it is correctly classified, but the model assigns a notably high probability to the 'dog' class ({max_dog_prob:.4f}), making it a suitable target for a data poisoning attack.")
else:
    print("No suitable 'deer' image found with high 'dog' probability.")

Searching for a 'deer' image with high 'dog' probability...

--- Selected Target Image Details ---
True Class: deer (Index: 4)
Predicted Class: deer (Index: 4)
Top 5 Predicted Probabilities and Classes:
  deer: 0.4696
  dog: 0.4554
  cat: 0.0599
  horse: 0.0124
  ship: 0.0019

This 'deer' image was chosen because it is correctly classified, but the model assigns a notably high probability to the 'dog' class (0.4554), making it a suitable target for a data poisoning attack.


In [None]:
model.eval()

N_poison_samples = 250

deer_idx = train_set.classes.index('deer')
dog_idx = train_set.classes.index('dog')

potential_poison_samples = [] # Stores (image_tensor, original_label, distance, original_index)

print(f"Searching for {N_poison_samples} 'deer' images in the training set that are correctly classified as 'deer' and are close to the target image")

# Ensure target_image is on the device for consistent distance calculation
target_image_on_device_for_dist = target_image.to(device)

with torch.no_grad():
  for i in range(len(train_set)): # Iterate over the dataset to get transformed images
    image_tensor, true_label_original = train_set[i] # Get transformed image tensor and original label

    # Add batch dimension and move to device for model inference
    image_tensor_batch = image_tensor.unsqueeze(0).to(device)

    # Get model output
    output = model(image_tensor_batch)

    probabilities = torch.softmax(output, dim=1)
    # Get predicted label from the batch output (it's a single image, so index 0)
    _, predicted_batch = torch.max(probabilities, 1)
    predicted_label = predicted_batch.item()

    # Use the original label obtained from train_set[i]
    true_label = true_label_original

    if true_label == deer_idx and predicted_label == deer_idx:
      # Calculate distance between the current transformed image and the target image
      # Both should be on the same device and same shape (CxHxW)
      distToTarget = torch.norm(target_image_on_device_for_dist - image_tensor.to(device))
      # Store CPU tensor and scalar distance. Store the original index 'i'.
      potential_poison_samples.append((image_tensor.cpu(), true_label, distToTarget.item(), i))

# Sort samples by distance (ascending for 'close to')
potential_poison_samples.sort(key=lambda x: x[2])

# Select the top N_poison_samples
poison_samples = potential_poison_samples[:N_poison_samples]

# Create new_images and new_labels
new_images         = [sample[0] for sample in poison_samples] # These are CPU tensors
indices_to_replace = [sample[-1] for sample in poison_samples]

print(indices_to_replace)
print(train_set.data[indices_to_replace[0]])

new_labels = [dog_idx] * N_poison_samples # Flip labels to 'dog'

# Stack them into tensors
new_images_tensor = torch.stack(new_images)
new_labels_tensor = torch.tensor(new_labels, dtype=torch.long)

print(f"\nIdentified {len(poison_samples)} poison samples.")
print(f"Original labels of poisoned samples (should all be 'deer'): {[sample[1] for sample in poison_samples]}")
print(f"New labels of poisoned samples (should all be 'dog'): {new_labels_tensor.tolist()}")
print(f"Lowest pixel distance to target image for selected samples: {[f'{sample[2]:.4f}' for sample in poison_samples]}")

Searching for 250 'deer' images in the training set that are correctly classified as 'deer' and are close to the target image
[15781, 35877, 45517, 43621, 2131, 48889, 27313, 46132, 17123, 29799, 23943, 23604, 35228, 24830, 30243, 21492, 21845, 43234, 130, 42096, 43732, 40597, 22241, 7283, 26497, 10296, 35622, 19684, 39067, 29852, 21021, 12444, 4224, 13385, 12896, 33221, 42662, 28478, 15919, 36777, 11765, 13021, 15533, 20108, 37688, 15596, 44851, 1143, 3717, 31898, 47837, 33020, 23638, 41171, 32519, 254, 19943, 30551, 13663, 36442, 6838, 30885, 34213, 40228, 7365, 32095, 47798, 6465, 43386, 19746, 9782, 22005, 10179, 49719, 28002, 30464, 16485, 13708, 20096, 34337, 27613, 40156, 10067, 32373, 33719, 10046, 33434, 16640, 42431, 6281, 16179, 20659, 39003, 5742, 8405, 42718, 10006, 12858, 40085, 35566, 4905, 40529, 13395, 38794, 20778, 18060, 8404, 16926, 7296, 5631, 44586, 5815, 21307, 31514, 43139, 49898, 42516, 30020, 33816, 11582, 14737, 15259, 5950, 13803, 47300, 21066, 11567, 37602,

In [None]:
# replace the index of the closest deer points in the train set
for i in indices_to_replace:
  train_set.targets[i] = dog_idx


In [None]:
poisoned_train_set = train_set

poisoned_train_loader = torch.utils.data.DataLoader(poisoned_train_set, batch_size=128, shuffle=True)

print(f"Successfully created poisoned_train_loader with {len(poisoned_train_loader.dataset)} samples and batch size {poisoned_train_loader.batch_size}.")

Successfully created poisoned_train_loader with 50000 samples and batch size 128.


In [None]:
import torch.nn as nn
import torchvision.models as models
import torch.optim as optim

# Re-initialize a new ResNet18 model instance for poisoned training
model_poisoned = models.resnet18(weights='IMAGENET1K_V1')

# Modify first conv layer for 32x32 images (CIFAR-10)
model_poisoned.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
model_poisoned.maxpool = nn.Identity()  # Remove maxpool for smaller images

# Replace final layer for 10 classes
model_poisoned.fc = nn.Linear(model_poisoned.fc.in_features, 10)

# Move the new model to the appropriate device
model_poisoned = model_poisoned.to(device)

# Define the loss function and optimizer for the poisoned model
criterion_poisoned = nn.CrossEntropyLoss()
optimizer_poisoned = optim.Adam(model_poisoned.parameters(), lr=1e-4) # Same LR as original

num_epochs_poisoned = 10 # Same number of epochs as original training

print(f"Retraining model with poisoned data for {num_epochs_poisoned} epochs...")

for epoch in range(num_epochs_poisoned):
    train_loss, train_acc = train(model_poisoned, poisoned_train_loader, optimizer_poisoned, criterion_poisoned, device)
    test_loss, test_acc = test(model_poisoned, test_loader, criterion_poisoned, device)

    print(f"Epoch {epoch+1}/{num_epochs_poisoned}  |  "f"Train Acc: {train_acc:.3f}  |  Test Acc: {test_acc:.3f}")

print("\nModel retraining with poisoned data complete.")

Retraining model with poisoned data for 10 epochs...
Epoch 1/10  |  Train Acc: 0.703  |  Test Acc: 0.821
Epoch 2/10  |  Train Acc: 0.847  |  Test Acc: 0.862
Epoch 3/10  |  Train Acc: 0.889  |  Test Acc: 0.885
Epoch 4/10  |  Train Acc: 0.910  |  Test Acc: 0.895
Epoch 5/10  |  Train Acc: 0.929  |  Test Acc: 0.908
Epoch 6/10  |  Train Acc: 0.941  |  Test Acc: 0.904
Epoch 7/10  |  Train Acc: 0.948  |  Test Acc: 0.911
Epoch 8/10  |  Train Acc: 0.956  |  Test Acc: 0.911
Epoch 9/10  |  Train Acc: 0.962  |  Test Acc: 0.913
Epoch 10/10  |  Train Acc: 0.967  |  Test Acc: 0.910

Model retraining with poisoned data complete.


In [None]:
### FOR RELOADING SAVED MODELS (use above cell for training)


model_poisoned = models.resnet18(weights='IMAGENET1K_V1')

# Modify first conv layer for 32x32 images (CIFAR-10)
model_poisoned.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
model_poisoned.maxpool = nn.Identity()  # Remove maxpool for smaller images

# Replace final layer for 10 classes
model_poisoned.fc = nn.Linear(model_poisoned.fc.in_features, 10)

# Move the new model to the appropriate device
model_poisoned = model_poisoned.to(device)

criterion_poisoned = nn.CrossEntropyLoss()

model_poisoned.load_state_dict(torch.load("/content/drive/MyDrive/CS260D_Final_Project/model_poisoned.pth"))

In [None]:
model_poisoned.eval()

# Move target image to device and add batch dimension
target_image_on_device = target_image.to(device).unsqueeze(0)

print("\n--- Re-evaluation of Target Image with Poisoned Model ---")
print(f"Original True Class: {train_set.classes[target_true_label]} (Index: {target_true_label})")

with torch.no_grad():
    outputs_poisoned = model_poisoned(target_image_on_device)
    probabilities_poisoned = torch.softmax(outputs_poisoned, dim=1).squeeze(0) # Remove batch dimension
    _, predicted_poisoned_idx = torch.max(probabilities_poisoned, 0)

predicted_poisoned_class = train_set.classes[predicted_poisoned_idx.item()]

print(f"Poisoned Model's Prediction: {predicted_poisoned_class} (Index: {predicted_poisoned_idx.item()})")

# Get top 5 probabilities and classes for the poisoned model's prediction
top5_probs_poisoned, top5_indices_poisoned = torch.topk(probabilities_poisoned, 10)
print("Top 5 Predicted Probabilities and Classes (Poisoned Model):")
for i in range(10):
    class_name = train_set.classes[top5_indices_poisoned[i].item()]
    probability = top5_probs_poisoned[i].item()
    print(f"  {class_name}: {probability:.4f}")

# Explicitly compare 'deer' and 'dog' probabilities
print(f"\n--- Comparison (Baseline vs. Poisoned) ---")
print(f"Baseline (Unpoisoned Model) Probability for 'deer': {target_probabilities[deer_idx].item():.4f}")
print(f"Baseline (Unpoisoned Model) Probability for 'dog': {target_probabilities[dog_idx].item():.4f}")
print(f"Poisoned Model Probability for 'deer': {probabilities_poisoned[deer_idx].item():.4f}")
print(f"Poisoned Model Probability for 'dog': {probabilities_poisoned[dog_idx].item():.4f}")

if predicted_poisoned_idx.item() == dog_idx:
    print(f"\nObservation: The poisoned model successfully misclassified the 'deer' image as 'dog'.")
else:
    print(f"\nObservation: The poisoned model did not misclassify the 'deer' image as 'dog'.")

# Report overall test accuracy of the poisoned model
test_loss_poisoned, test_acc_poisoned = test(model_poisoned, test_loader, criterion_poisoned, device)
print(f"\nOverall Test Accuracy of Poisoned Model: {test_acc_poisoned:.3f}")


--- Re-evaluation of Target Image with Poisoned Model ---
Original True Class: deer (Index: 4)
Poisoned Model's Prediction: dog (Index: 5)
Top 5 Predicted Probabilities and Classes (Poisoned Model):
  dog: 0.5053
  deer: 0.4903
  horse: 0.0041
  cat: 0.0002
  bird: 0.0001
  ship: 0.0000
  airplane: 0.0000
  frog: 0.0000
  automobile: 0.0000
  truck: 0.0000

--- Comparison (Baseline vs. Poisoned) ---
Baseline (Unpoisoned Model) Probability for 'deer': 0.4696
Baseline (Unpoisoned Model) Probability for 'dog': 0.4554
Poisoned Model Probability for 'deer': 0.4903
Poisoned Model Probability for 'dog': 0.5053

Observation: The poisoned model successfully misclassified the 'deer' image as 'dog'.

Overall Test Accuracy of Poisoned Model: 0.910


# Model Defense 1: Removing Loss Contribution Outliers

[CURRENT] Dropping clusters of size 1 (discussed in lecture!)

In [None]:
import torch
import torch.nn as nn

def calculate_per_sample_loss(outputs, labels):
    # Ensure the criterion returns individual losses for each sample
    # We create a new criterion here to guarantee reduction='none'
    per_sample_criterion = nn.CrossEntropyLoss(reduction='none')
    per_sample_losses = per_sample_criterion(outputs, labels)
    return per_sample_losses

def detect_loss_outliers(per_sample_losses, outlier_threshold_factor):
    # Calculate mean and standard deviation of per-sample losses
    mean_loss = torch.mean(per_sample_losses)
    std_loss = torch.std(per_sample_losses)

    # Define the outlier threshold
    outlier_threshold = mean_loss + (outlier_threshold_factor * std_loss)

    # Identify samples whose loss values exceed the threshold
    is_outlier = per_sample_losses > outlier_threshold

    return is_outlier

print("Functions 'calculate_per_sample_loss' and 'detect_loss_outliers' defined.")

Functions 'calculate_per_sample_loss' and 'detect_loss_outliers' defined.


In [None]:
def train_defended(model, loader, optimizer, criterion, device, outlier_threshold_factor):
    model.train()
    total_loss = 0
    correct = 0
    total = 0

    for images, labels in loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)

        # Calculate per-sample losses
        per_sample_losses = calculate_per_sample_loss(outputs, labels)

        # Detect outliers
        is_outlier = detect_loss_outliers(per_sample_losses, outlier_threshold_factor)

        # Filter out outlier samples from outputs and labels
        filtered_outputs = outputs[~is_outlier]
        filtered_labels = labels[~is_outlier]

        # If no samples are left after filtering, skip this batch
        if filtered_labels.numel() == 0:
            # Still update accuracy based on the original batch to reflect overall performance
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
            continue

        # Calculate loss only on non-outlier samples
        loss = criterion(filtered_outputs, filtered_labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() # Accumulate loss from non-outlier samples

        # For accuracy, use the original (unfiltered) outputs and labels to assess overall model performance on the batch
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

    # The total_loss will be accumulated over batches where filtering occurred.
    # The division by len(loader) ensures we get an average batch loss.
    return total_loss / len(loader), correct / total

print("Function 'train_defended' defined.")

Function 'train_defended' defined.


In [None]:
import torch.nn as nn
import torchvision.models as models
import torch.optim as optim

# Initialize a new ResNet18 model instance for defended training
model_defended = models.resnet18(weights='IMAGENET1K_V1')

# Modify first conv layer for 32x32 images (CIFAR-10)
model_defended.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
model_defended.maxpool = nn.Identity()  # Remove maxpool for smaller images

# Replace final layer for 10 classes
model_defended.fc = nn.Linear(model_defended.fc.in_features, 10)

# Move the new model to the appropriate device
model_defended = model_defended.to(device)

# Define the loss function for the defended model
criterion_defended = nn.CrossEntropyLoss()

# Define the optimizer for the defended model
optimizer_defended = optim.Adam(model_defended.parameters(), lr=1e-4) # Same LR as original

print(f"Defended model ready on: {device}")

Defended model ready on: cuda


In [None]:
num_epochs_defended = 10
outlier_threshold_factor = 2.0 # This factor can be tuned based on data characteristics

print(f"Training defended model for {num_epochs_defended} epochs with outlier detection...")

for epoch in range(num_epochs_defended):
    train_loss, train_acc = train_defended(
        model_defended, poisoned_train_loader, optimizer_defended, criterion_defended, device, outlier_threshold_factor
    )
    test_loss, test_acc = test(model_defended, test_loader, criterion_defended, device)

    print(f"Epoch {epoch+1}/{num_epochs_defended}  |  "f"Train Acc: {train_acc:.3f}  |  Test Acc: {test_acc:.3f}")

print("\nDefended model training complete.")

Training defended model for 10 epochs with outlier detection...
Epoch 1/10  |  Train Acc: 0.689  |  Test Acc: 0.815
Epoch 2/10  |  Train Acc: 0.836  |  Test Acc: 0.861
Epoch 3/10  |  Train Acc: 0.872  |  Test Acc: 0.873
Epoch 4/10  |  Train Acc: 0.895  |  Test Acc: 0.888
Epoch 5/10  |  Train Acc: 0.910  |  Test Acc: 0.897
Epoch 6/10  |  Train Acc: 0.919  |  Test Acc: 0.899
Epoch 7/10  |  Train Acc: 0.925  |  Test Acc: 0.905
Epoch 8/10  |  Train Acc: 0.933  |  Test Acc: 0.904
Epoch 9/10  |  Train Acc: 0.937  |  Test Acc: 0.908
Epoch 10/10  |  Train Acc: 0.942  |  Test Acc: 0.902

Defended model training complete.


In [None]:
# to save the new model to your google drive (rishi use this to save it if you want)
from google.colab import drive
import torch
import os # Import the os module

# Mount Google Drive (if not already mounted)
drive.mount('/content/drive')

# Define the path to save the defended model
save_path_defended = "/content/drive/MyDrive/CS260D_Final_Project/model_defended.pth"

# Create the parent directory if it doesn't exist
output_dir = os.path.dirname(save_path_defended)
if not os.path.exists(output_dir):
    os.makedirs(output_dir, exist_ok=True)
    print(f"Created directory: {output_dir}")

# Save the state dictionary of the defended model
torch.save(model_defended.state_dict(), save_path_defended)

print(f"Defended model saved to: {save_path_defended}")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Created directory: /content/drive/MyDrive/CS260D_Final_Project
Defended model saved to: /content/drive/MyDrive/CS260D_Final_Project/model_defended.pth


In [None]:
# FOR LOADING THE SAVED DEFENDED MODEL (use if loading a previously trained defended model)
from google.colab import drive
import torch
import torch.nn as nn
import torchvision.models as models

# Mount Google Drive (if not already mounted)
drive.mount('/content/drive')

# Define the path where the defended model is saved
load_path_defended = "/content/drive/MyDrive/CS260D_Final_Project/model_defended.pth"

# Re-initialize the model architecture (must match the saved model)
model_defended_loaded = models.resnet18(weights='IMAGENET1K_V1')
model_defended_loaded.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
model_defended_loaded.maxpool = nn.Identity()
model_defended_loaded.fc = nn.Linear(model_defended_loaded.fc.in_features, 10)

# Move the model to the appropriate device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_defended_loaded = model_defended_loaded.to(device)

# Load the saved state dictionary
model_defended_loaded.load_state_dict(torch.load(load_path_defended, map_location=device))
model_defended_loaded.eval() # Set to evaluation mode after loading

print(f"Defended model loaded from: {load_path_defended}")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Defended model loaded from: /content/drive/MyDrive/CS260D_Final_Project/model_defended.pth


In [None]:
model_defended.eval()

# Move target image to device and add batch dimension
target_image_on_device = target_image.to(device).unsqueeze(0)

print("\n--- Re-evaluation of Target Image with Defended Model ---")
print(f"Original True Class: {train_set.classes[target_true_label]} (Index: {target_true_label})")

with torch.no_grad():
    outputs_defended = model_defended(target_image_on_device)
    probabilities_defended = torch.softmax(outputs_defended, dim=1).squeeze(0) # Remove batch dimension
    _, predicted_defended_idx = torch.max(probabilities_defended, 0)

predicted_defended_class = train_set.classes[predicted_defended_idx.item()]

print(f"Defended Model's Prediction: {predicted_defended_class} (Index: {predicted_defended_idx.item()})")

# Get top 10 probabilities and classes for the defended model's prediction
top10_probs_defended, top10_indices_defended = torch.topk(probabilities_defended, 10)
print("Top 10 Predicted Probabilities and Classes (Defended Model):")
for i in range(10):
    class_name = train_set.classes[top10_indices_defended[i].item()]
    probability = top10_probs_defended[i].item()
    print(f"  {class_name}: {probability:.4f}")

# Explicitly compare 'deer' and 'dog' probabilities
print(f"\n--- Comparison (Baseline vs. Poisoned vs. Defended) ---")
print(f"Baseline (Unpoisoned Model) Probability for 'deer': {target_probabilities[deer_idx].item():.4f}")
print(f"Baseline (Unpoisoned Model) Probability for 'dog': {target_probabilities[dog_idx].item():.4f}")
print(f"Poisoned Model Probability for 'deer': {probabilities_poisoned[deer_idx].item():.4f}")
print(f"Poisoned Model Probability for 'dog': {probabilities_poisoned[dog_idx].item():.4f}")
print(f"Defended Model Probability for 'deer': {probabilities_defended[deer_idx].item():.4f}")
print(f"Defended Model Probability for 'dog': {probabilities_defended[dog_idx].item():.4f}")

if predicted_defended_idx.item() == dog_idx:
    print(f"\nObservation: The defended model still misclassified the 'deer' image as 'dog'.")
elif predicted_defended_idx.item() == deer_idx:
    print(f"\nObservation: The defended model correctly classified the 'deer' image as 'deer'.")
else:
    print(f"\nObservation: The defended model predicted the 'deer' image as {predicted_defended_class}.")

# Report overall test accuracy of the defended model
test_loss_defended, test_acc_defended = test(model_defended, test_loader, criterion_defended, device)
print(f"\nOverall Test Accuracy of Defended Model: {test_acc_defended:.3f}")


--- Re-evaluation of Target Image with Defended Model ---
Original True Class: deer (Index: 4)
Defended Model's Prediction: deer (Index: 4)
Top 10 Predicted Probabilities and Classes (Defended Model):
  deer: 0.9996
  cat: 0.0002
  dog: 0.0001
  bird: 0.0000
  horse: 0.0000
  airplane: 0.0000
  truck: 0.0000
  ship: 0.0000
  frog: 0.0000
  automobile: 0.0000

--- Comparison (Baseline vs. Poisoned vs. Defended) ---
Baseline (Unpoisoned Model) Probability for 'deer': 0.4696
Baseline (Unpoisoned Model) Probability for 'dog': 0.4554
Poisoned Model Probability for 'deer': 0.4903
Poisoned Model Probability for 'dog': 0.5053
Defended Model Probability for 'deer': 0.9996
Defended Model Probability for 'dog': 0.0001

Observation: The defended model correctly classified the 'deer' image as 'deer'.

Overall Test Accuracy of Defended Model: 0.902


This is a good strategy that succeeded in re-classifying the target image correctly, but model's overall accuracy decreased slighly due to the removal of "forgettable events".

# Model Defense 2: Ensemble Methods

[CURRENT] Enhance the model's robustness with an ensemble of model trained on different data subsets (defending against attackers exploiting a singal model).

# Model Defense 3: Bilevel Optimization

[NEW] Bilevel optimization defenses: Because many poisoning attacks can be framed as bilevel optimization problems, researchers are developing methods to solve the optimization problem in reverse to identify and neutralize poisoned data points.

# Model Defense 4: Activation Clustering

[NEW] Activation clustering: This technique involves clustering the activations from the hidden layers of a trained model. Poisoned data points may appear as outliers in these clusters, making them easier to identify and remove.