
Data Poisoning attacks on Neural Networks


In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Dataset
import numpy as np
import matplotlib.pyplot as plt
import random
from sklearn.metrics import accuracy_score, confusion_matrix
import seaborn as sns
from copy import deepcopy
import warnings
warnings.filterwarnings('ignore')

In [2]:
# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

In [3]:
print("=" * 60)
print("DATA POISONING ATTACKS")
print("=" * 60)

DATA POISONING ATTACKS


In [4]:
# Load MNIST dataset
transform = transforms.Compose([
    transforms.ToTensor(),
])

In [5]:
train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)

In [6]:
# Define a simple CNN architecture
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)
        
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [7]:
# Training function
def train_model(model, train_loader, epochs=3, device='cpu'):
    model.train()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()
    
    for epoch in range(epochs):
        running_loss = 0.0
        for data, target in train_loader:
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        if (epoch + 1) % 1 == 0:
            print(f'Epoch [{epoch+1}/{epochs}], Loss: {running_loss/len(train_loader):.4f}')

In [8]:
# Evaluation function
def evaluate_model(model, test_loader, device='cpu'):
    model.eval()
    correct = 0
    total = 0
    all_predictions = []
    all_targets = []
    
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            pred = output.argmax(dim=1)
            correct += pred.eq(target).sum().item()
            total += target.size(0)
            all_predictions.extend(pred.cpu().numpy())
            all_targets.extend(target.cpu().numpy())
    
    accuracy = 100. * correct / total
    print(f'Test Accuracy: {accuracy:.2f}%')
    return accuracy, all_predictions, all_targets

In [9]:
# Setup device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cpu


In [10]:
# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1000, shuffle=False)

In [11]:
# Train baseline model
print("Training baseline model...")
baseline_model = SimpleCNN().to(device)
train_model(baseline_model, train_loader, epochs=10, device=device)

Training baseline model...
Epoch [1/10], Loss: 0.2057
Epoch [2/10], Loss: 0.0506
Epoch [3/10], Loss: 0.0318
Epoch [4/10], Loss: 0.0231
Epoch [5/10], Loss: 0.0168
Epoch [6/10], Loss: 0.0124
Epoch [7/10], Loss: 0.0113
Epoch [8/10], Loss: 0.0073
Epoch [9/10], Loss: 0.0070
Epoch [10/10], Loss: 0.0058


In [12]:
# Evaluate baseline model
print("\nBaseline model evaluation:")
baseline_acc, baseline_preds, baseline_targets = evaluate_model(baseline_model, test_loader, device)


Baseline model evaluation:
Test Accuracy: 98.87%


In [13]:
# Label Flipping Attack
class LabelFlippingAttack:
    def __init__(self, poison_ratio=0.2, target_class=3):
        self.poison_ratio = poison_ratio
        self.target_class = target_class
    
    def poison_dataset(self, dataset):
        """Flip labels of a portion of the dataset to a target class"""
        poisoned_dataset = deepcopy(dataset)
        total_samples = len(poisoned_dataset)
        num_poisoned = int(total_samples * self.poison_ratio)
        
        # Select random indices to poison
        indices_to_poison = random.sample(range(total_samples), num_poisoned)
        
        for idx in indices_to_poison:
            data, _ = poisoned_dataset[idx]
            # Flip the label to target class
            poisoned_dataset.targets[idx] = self.target_class
        
        print(f"Poisoned {num_poisoned} samples with random label flipping attack")
        return poisoned_dataset

In [21]:
# Optimized Label Flipping Attack
class OptimizedLabelFlippingAttack:
    def __init__(self, poison_ratio=0.2, target_class=3, selection_method='gradient'):
        self.poison_ratio = poison_ratio
        self.target_class = target_class
        self.selection_method = selection_method  # 'gradient', 'confidence', 'loss'
    
    def compute_gradient_importance(self, model, dataset, num_samples=10000):
        """Compute gradient-based importance scores for label flipping selection"""
        model.eval()
        importance_scores = []
        
        # Sample a subset for efficiency
        indices = random.sample(range(len(dataset)), min(num_samples, len(dataset)))
        
        for idx in indices:
            data, target = dataset[idx]
            data = data.unsqueeze(0).to(device)
            target = torch.tensor([target]).to(device)
            
            # Compute gradients with respect to input
            data.requires_grad_(True)
            output = model(data)
            loss = F.cross_entropy(output, target)
            loss.backward()
            
            # Compute gradient magnitude as importance score
            grad_magnitude = torch.norm(data.grad).item()
            importance_scores.append((idx, grad_magnitude))
            
            data.requires_grad_(False)
        
        # Sort by importance (highest gradient magnitude first)
        importance_scores.sort(key=lambda x: x[1], reverse=True)
        return importance_scores
    
    def compute_confidence_importance(self, model, dataset, num_samples=10000):
        """Compute confidence-based importance scores for label flipping selection"""
        model.eval()
        importance_scores = []
        
        # Sample a subset for efficiency
        indices = random.sample(range(len(dataset)), min(num_samples, len(dataset)))
        
        with torch.no_grad():
            for idx in indices:
                data, target = dataset[idx]
                data = data.unsqueeze(0).to(device)
                
                # Get model prediction and confidence
                output = model(data)
                probabilities = F.softmax(output, dim=1)
                confidence = probabilities.max().item()
                predicted_class = output.argmax().item()
                
                # Higher importance for samples that are confidently predicted correctly
                # These will have the most impact when flipped
                if predicted_class == target:
                    importance_score = confidence  # High confidence correct predictions
                else:
                    importance_score = 1 - confidence  # Low confidence incorrect predictions
                
                importance_scores.append((idx, importance_score))
        
        # Sort by importance (highest confidence first)
        importance_scores.sort(key=lambda x: x[1], reverse=True)
        return importance_scores
    
    def compute_loss_importance(self, model, dataset, num_samples=10000):
        """Compute loss-based importance scores for label flipping selection"""
        model.eval()
        importance_scores = []
        
        # Sample a subset for efficiency
        indices = random.sample(range(len(dataset)), min(num_samples, len(dataset)))
        
        with torch.no_grad():
            for idx in indices:
                data, target = dataset[idx]
                data = data.unsqueeze(0).to(device)
                target = torch.tensor([target]).to(device)
                
                # Compute loss
                output = model(data)
                loss = F.cross_entropy(output, target).item()
                
                # Lower loss means higher importance (more confident predictions)
                importance_score = 1.0 / (1.0 + loss)  # Transform to [0, 1] range
                importance_scores.append((idx, importance_score))
        
        # Sort by importance (highest first)
        importance_scores.sort(key=lambda x: x[1], reverse=True)
        return importance_scores
    
    def poison_dataset(self, dataset, model):
        """Poison dataset using optimized label flipping selection"""
        poisoned_dataset = deepcopy(dataset)
        total_samples = len(poisoned_dataset)
        num_poisoned = int(total_samples * self.poison_ratio)
        
        print(f"Computing {self.selection_method}-based importance scores...")
        
        if self.selection_method == 'gradient':
            importance_scores = self.compute_gradient_importance(model, dataset)
        elif self.selection_method == 'confidence':
            importance_scores = self.compute_confidence_importance(model, dataset)
        elif self.selection_method == 'loss':
            importance_scores = self.compute_loss_importance(model, dataset)
        else:
            raise ValueError(f"Unknown selection method: {self.selection_method}")
        
        # Select top-k most important samples for poisoning
        selected_indices = [idx for idx, _ in importance_scores[:num_poisoned]]
        
        print(f"Selected {len(selected_indices)} samples based on {self.selection_method} importance")
        
        # Flip labels of selected samples
        for idx in selected_indices:
            poisoned_dataset.targets[idx] = self.target_class
        
        print(f"Poisoned {len(selected_indices)} samples with optimized label flipping attack")
        return poisoned_dataset

In [None]:
# Backgradient Poisoning Attack
class BackgradientPoisoningAttack:
    def __init__(self, poison_ratio=0.2, target_class=3, optimization_steps=100, lr=0.01):
        self.poison_ratio = poison_ratio
        self.target_class = target_class
        self.optimization_steps = optimization_steps
        self.lr = lr
    
    def compute_gradient_importance(self, model, dataset, num_samples=10000):
        """Compute gradient-based importance scores for data selection"""
        model.eval()
        importance_scores = []
        
        # Sample a subset for efficiency
        indices = random.sample(range(len(dataset)), min(num_samples, len(dataset)))
        
        for idx in indices:
            data, target = dataset[idx]
            data = data.unsqueeze(0).to(device)
            target = torch.tensor([target]).to(device)
            
            # Compute gradients with respect to input
            data.requires_grad_(True)
            output = model(data)
            loss = F.cross_entropy(output, target)
            loss.backward()
            
            # Compute gradient magnitude as importance score
            grad_magnitude = torch.norm(data.grad).item()
            importance_scores.append((idx, grad_magnitude))
            
            data.requires_grad_(False)
        
        # Sort by importance (highest gradient magnitude first)
        importance_scores.sort(key=lambda x: x[1], reverse=True)
        return importance_scores
    
    def poison_dataset(self, dataset, model):
        """Poison dataset using gradient-based optimization"""
        poisoned_dataset = deepcopy(dataset)
        total_samples = len(poisoned_dataset)
        num_poisoned = int(total_samples * self.poison_ratio)
        
        print(f"Computing gradient-based importance scores...")
        importance_scores = self.compute_gradient_importance(model, dataset)
        
        # Select top-k most important samples for poisoning
        selected_indices = [idx for idx, _ in importance_scores[:num_poisoned]]
        
        print(f"Selected {len(selected_indices)} samples based on gradient importance")
        
        # Poison selected samples with optimized perturbations
        for idx in selected_indices:
            data, _ = poisoned_dataset[idx]
            data = data.unsqueeze(0).to(device)
            
            # Optimize perturbation to maximize loss
            perturbation = torch.randn_like(data) * 0.1
            perturbation.requires_grad_(True)
            
            optimizer = optim.Adam([perturbation], lr=self.lr)
            
            for step in range(self.optimization_steps):
                optimizer.zero_grad()
                
                # Add perturbation to data
                perturbed_data = torch.clamp(data + perturbation, 0, 1)
                
                # Compute output
                output = model(perturbed_data)
                
                # Create adversarial example that will be misclassified
                # We want to maximize the loss for the original class
                original_target = torch.tensor([poisoned_dataset.targets[idx]]).to(device)
                
                # Method 1: Maximize loss for original class
                adversarial_loss = -F.cross_entropy(output, original_target)
                
                # Method 2: Minimize confidence in correct prediction
                probs = F.softmax(output, dim=1)
                correct_prob = probs[0, poisoned_dataset.targets[idx]]
                confidence_loss = correct_prob
                
                # Method 3: Maximize confidence in wrong prediction (poisoning target)
                # Find the class with second highest probability (not the original)
                probs_sorted, indices = torch.sort(probs, descending=True)
                wrong_class = indices[0, 1] if indices[0, 0] == poisoned_dataset.targets[idx] else indices[0, 0]
                wrong_confidence_loss = -probs[0, wrong_class]  # Maximize confidence in wrong class
                
                # Combined loss - be more aggressive
                loss = adversarial_loss + confidence_loss + 0.5 * wrong_confidence_loss
                
                loss.backward()
                optimizer.step()
                
                # Project perturbation to valid range
                #with torch.no_grad(): # don't track the gradients
                #    perturbation.clamp_(-0.5, 0.5)  # Allow larger perturbations
            
            # Apply optimized perturbation
            final_perturbed_data = torch.clamp(data + perturbation, 0, 1)
            poisoned_dataset.data[idx] = (final_perturbed_data.squeeze() * 255).to(torch.uint8)
            poisoned_dataset.targets[idx] = self.target_class
        
        print(f"Poisoned {len(selected_indices)} samples with backgradient poisoning")
        return poisoned_dataset

In [16]:
# Witches' Brew Poisoning Attack
class WitchesBrewPoisoningAttack:
    def __init__(self, poison_ratio=0.2, target_class=3, optimization_steps=200, lr=0.01):
        self.poison_ratio = poison_ratio
        self.target_class = target_class
        self.optimization_steps = optimization_steps
        self.lr = lr
    
    def compute_influence_scores(self, model, dataset, num_samples=10000):
        """Compute influence-based scores for data selection"""
        model.eval()
        influence_scores = []
        
        # Sample a subset for efficiency
        indices = random.sample(range(len(dataset)), min(num_samples, len(dataset)))
        
        for idx in indices:
            data, target = dataset[idx]
            data = data.unsqueeze(0).to(device)
            target = torch.tensor([target]).to(device)
            
            # Compute influence using gradient-based approximation
            data.requires_grad_(True)
            output = model(data)
            loss = F.cross_entropy(output, target)
            loss.backward()
            
            # Compute influence score based on gradient magnitude and prediction confidence
            grad_magnitude = torch.norm(data.grad).item()
            confidence = F.softmax(output, dim=1).max().item()
            
            # Influence score combines gradient magnitude and prediction confidence
            influence_score = grad_magnitude * (1 - confidence)  # Higher for uncertain predictions
            influence_scores.append((idx, influence_score))
            
            data.requires_grad_(False)
        
        # Sort by influence score (highest first)
        influence_scores.sort(key=lambda x: x[1], reverse=True)
        return influence_scores
    
    def poison_dataset(self, dataset, model):
        """Poison dataset using witches' brew optimization"""
        poisoned_dataset = deepcopy(dataset)
        total_samples = len(poisoned_dataset)
        num_poisoned = int(total_samples * self.poison_ratio)
        
        print(f"Computing influence-based scores...")
        influence_scores = self.compute_influence_scores(model, dataset)
        
        # Select samples with highest influence scores
        selected_indices = [idx for idx, _ in influence_scores[:num_poisoned]]
        
        print(f"Selected {len(selected_indices)} samples based on influence scores")
        
        # Poison selected samples 
        for idx in selected_indices:
            data, _ = poisoned_dataset[idx]
            data = data.unsqueeze(0).to(device)
            
            # Initialize perturbation
            perturbation = torch.randn_like(data) * 0.1
            perturbation.requires_grad_(True)
            
            optimizer = optim.Adam([perturbation], lr=self.lr)
            
            for step in range(self.optimization_steps):
                optimizer.zero_grad()
                
                # Add perturbation to data
                perturbed_data = torch.clamp(data + perturbation, 0, 1)
                
                # Compute multiple objectives
                output = model(perturbed_data)
                target = torch.tensor([self.target_class]).to(device)
                
                # Primary objective: minimize loss for target class (make model confident in wrong prediction)
                target_loss = F.cross_entropy(output, target)
                
                # Secondary objective: minimize confidence in original class
                original_target = torch.tensor([poisoned_dataset.targets[idx]]).to(device)
                original_loss = F.cross_entropy(output, original_target)
                
                # Combined objective
                total_loss = target_loss + 0.5 * original_loss
                
                total_loss.backward()
                optimizer.step()
                
                # Project perturbation to valid range
                #with torch.no_grad():
                #    perturbation.clamp_(-0.5, 0.5)  # Allow larger perturbations
            
            # Apply optimized perturbation
            final_perturbed_data = torch.clamp(data + perturbation, 0, 1)
            poisoned_dataset.data[idx] = (final_perturbed_data.squeeze() * 255).to(torch.uint8)
            poisoned_dataset.targets[idx] = self.target_class
        
        print(f"Poisoned {len(selected_indices)} samples with witches' brew poisoning")
        return poisoned_dataset

In [17]:
# Create poisoned datasets
print("\nCreating poisoned datasets...")


Creating poisoned datasets...


In [18]:
# Random label flipping
label_flipping_attack = LabelFlippingAttack(poison_ratio=0.2, target_class=3)
poisoned_train_dataset = label_flipping_attack.poison_dataset(train_dataset)

Poisoned 12000 samples with random label flipping attack


In [22]:
# Gradient-optimized label flipping
gradient_flipping_attack = OptimizedLabelFlippingAttack(poison_ratio=0.2, target_class=3, selection_method='gradient')
gradient_poisoned_dataset = gradient_flipping_attack.poison_dataset(train_dataset, baseline_model)

Computing gradient-based importance scores...
Selected 10000 samples based on gradient importance
Poisoned 10000 samples with optimized label flipping attack


In [None]:
# Backgradient poisoning
backgradient_attack = BackgradientPoisoningAttack(poison_ratio=0.2, target_class=3)
backgradient_poisoned_dataset = backgradient_attack.poison_dataset(train_dataset, baseline_model)

Computing gradient-based importance scores...
Selected 10000 samples based on gradient importance


In [None]:
# Train models on poisoned data
print("\nTraining models on poisoned data...")


Training models on poisoned data...


In [None]:
# Train on random label flipped data
poisoned_train_loader = DataLoader(poisoned_train_dataset, batch_size=64, shuffle=True)
label_flipped_model = SimpleCNN().to(device)
train_model(label_flipped_model, poisoned_train_loader, epochs=10, device=device)
label_flipped_acc, label_flipped_preds, label_flip_targets = evaluate_model(label_flipped_model, test_loader, device)

Epoch [1/10], Loss: 0.6231
Epoch [2/10], Loss: 0.5088
Epoch [3/10], Loss: 0.4833
Epoch [4/10], Loss: 0.4656
Epoch [5/10], Loss: 0.4463
Epoch [6/10], Loss: 0.4225
Epoch [7/10], Loss: 0.3944
Epoch [8/10], Loss: 0.3575
Epoch [9/10], Loss: 0.3180
Epoch [10/10], Loss: 0.2774
Test Accuracy: 90.03%


In [26]:
# Train on gradient-optimized label flipped data
gradient_poisoned_loader = DataLoader(gradient_poisoned_dataset, batch_size=64, shuffle=True)
gradient_flipped_model = SimpleCNN().to(device)
train_model(gradient_flipped_model, gradient_poisoned_loader, epochs=10, device=device)
gradient_flipped_acc, gradient_flipped_preds, gradient_flip_targets = evaluate_model(gradient_flipped_model, test_loader, device)

Epoch [1/10], Loss: 0.5785
Epoch [2/10], Loss: 0.4625
Epoch [3/10], Loss: 0.4391
Epoch [4/10], Loss: 0.4207
Epoch [5/10], Loss: 0.4001
Epoch [6/10], Loss: 0.3760
Epoch [7/10], Loss: 0.3461
Epoch [8/10], Loss: 0.3122
Epoch [9/10], Loss: 0.2737
Epoch [10/10], Loss: 0.2363
Test Accuracy: 91.79%


In [27]:
from sklearn.metrics import confusion_matrix

In [30]:
confusion_matrix_gradient = confusion_matrix(gradient_flip_targets, gradient_flipped_preds)

In [31]:
confusion_matrix_gradient

array([[ 928,    0,    0,   48,    0,    0,    2,    1,    0,    1],
       [   0, 1090,    0,   44,    0,    0,    1,    0,    0,    0],
       [   1,    0,  899,  125,    1,    0,    1,    4,    0,    1],
       [   0,    1,    2, 1002,    0,    4,    0,    1,    0,    0],
       [   0,    0,    0,  104,  870,    0,    1,    0,    4,    3],
       [   1,    0,    0,  112,    0,  775,    2,    0,    2,    0],
       [   5,    0,    0,   67,    0,    4,  882,    0,    0,    0],
       [   0,    2,    6,   60,    0,    0,    0,  955,    3,    2],
       [   0,    0,    0,  107,    0,    0,    0,    0,  866,    1],
       [   0,    0,    0,   87,    3,    4,    0,    2,    1,  912]],
      dtype=int64)