In [None]:
import torch
import torchvision.models as models

# Load the pre-trained ResNet-18 model
model = models.resnet18(pretrained=True)
# Freeze all the pre-trained layers
for param in model.parameters():
    param.requires_grad = True
num_classes = 10 # replace with the number of classes in your dataset
model.fc = torch.nn.Linear(model.fc.in_features, num_classes)

In [None]:
import os
import tarfile
from torchvision.datasets import CIFAR10
from torchvision import transforms
from torch.utils.data import DataLoader

# Set the root directory for your custom dataset
custom_dataset_root = "./"

# Download and extract CIFAR-10 dataset
cifar10_train = CIFAR10(root=custom_dataset_root, train=True, download=True, transform=transforms.ToTensor())
cifar10_test = CIFAR10(root=custom_dataset_root, train=False, download=True, transform=transforms.ToTensor())



# Create custom dataset directory structure
train_dir = os.path.join(custom_dataset_root, os.path.join("cifar_10", "train"))
val_dir = os.path.join(custom_dataset_root, os.path.join("cifar_10", "val"))

# Create subdirectories for each class in train and val
for i in range(10):
    class_name = cifar10_train.classes[i]
    os.makedirs(os.path.join(train_dir, class_name), exist_ok=True)
    os.makedirs(os.path.join(val_dir, class_name), exist_ok=True)

# Move images to the appropriate folders
for i in range(len(cifar10_train)):
    image, label = cifar10_train[i]
    class_name = cifar10_train.classes[label]
    folder_path = os.path.join(train_dir, class_name)

    image_path = os.path.join(folder_path, f"{i}.png")
    transforms.ToPILImage()(image).save(image_path)

for i in range(len(cifar10_test)):
    image, label = cifar10_test[i]
    class_name = cifar10_test.classes[label]
    folder_path = os.path.join(val_dir, class_name)

    image_path = os.path.join(folder_path, f"{i}.png")
    transforms.ToPILImage()(image).save(image_path)



In [2]:
from torchvision.datasets import ImageFolder
from torchvision.transforms import transforms
from torch.utils.data import DataLoader


# Define the transformations to apply to the images
transform = transforms.Compose([
    transforms.ToTensor(),
])

# Load the train and validation datasets
train_dataset = ImageFolder('./cifar_10/train', transform=transform)
val_dataset = ImageFolder('./cifar_10/val', transform=transform)

# Create data loaders for the train and validation datasets
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)


In [3]:
def train(model, train_loader, val_loader, criterion, optimizer, num_epochs):
    # Train the model for the specified number of epochs
    for epoch in range(num_epochs):
        # Set the model to train mode
        model.train()

        # Initialize the running loss and accuracy
        running_loss = 0.0
        running_corrects = 0

        # Iterate over the batches of the train loader
        for inputs, labels in train_loader:
            # Move the inputs and labels to the device
            inputs = inputs.to(device)
            labels = labels.to(device)

            # Zero the optimizer gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)

            # Backward pass and optimizer step
            loss.backward()
            optimizer.step()

            # Update the running loss and accuracy
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        # Calculate the train loss and accuracy
        train_loss = running_loss / len(train_dataset)
        train_acc = running_corrects.double() / len(train_dataset)

        # Set the model to evaluation mode
        model.eval()

        # Initialize the running loss and accuracy
        running_loss = 0.0
        running_corrects = 0

        # Iterate over the batches of the validation loader
        with torch.no_grad():
            for inputs, labels in val_loader:
                # Move the inputs and labels to the device
                inputs = inputs.to(device)
                labels = labels.to(device)

                # Forward pass
                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
                loss = criterion(outputs, labels)

                # Update the running loss and accuracy
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

        # Calculate the validation loss and accuracy
        val_loss = running_loss / len(val_dataset)
        val_acc = running_corrects.double() / len(val_dataset)

        # Print the epoch results
    print('Epoch [{}/{}], train loss: {:.4f}, train acc: {:.4f}, val loss: {:.4f}, val acc: {:.4f}'
              .format(epoch+1, num_epochs, train_loss, train_acc, val_loss, val_acc))


In [4]:
# Set the device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
# Define the loss function and optimizer
criterion = torch.nn.CrossEntropyLoss()

# Unfreeze all the layers and fine-tune the entire network for a few more epochs
for param in model.parameters():
    param.requires_grad = True
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
train(model, train_loader, val_loader, criterion, optimizer, num_epochs=20)
save_path = './resnet18_weights.pth'

# Save the model weights
torch.save(model.state_dict(), save_path)

Epoch [20/20], train loss: 0.0472, train acc: 0.9836, val loss: 0.8246, val acc: 0.8271


In [7]:
deer_images = []
cat_images = []

# Select 5 deer images
deer_indices = [i for i, label in enumerate(cifar10_test.targets) if label == 4][:5]
deer_images = [(cifar10_test[i][0], 4) for i in deer_indices]  # 0 represents the label for deer

# Select 300 cat images for now and will filter it down to 100 during poison image calculation
cat_indices = [i for i, label in enumerate(cifar10_test.targets) if label == 3][:300]
cat_images = [(cifar10_test[i][0], 3) for i in cat_indices]  # 1 represents the label for cat


In [8]:
def adam_one_step(model, m, v, t, currentImage, featRepTarget, learning_rate,
                  beta_1=0.9, beta_2=0.999, eps=1e-8) -> torch.Tensor:
    """one step adam optimization"""
    t += 1
    currentImage = currentImage.detach() # disconnect image from the current autograd graph
    currentImage.requires_grad_()

    with torch.enable_grad():
        logits = model(currentImage)
        target_logits = model(featRepTarget)
        loss = torch.norm(logits - target_logits)

    grad_t = torch.autograd.grad(loss, [currentImage])[0]
    m = beta_1 * m + (1-beta_1)*grad_t
    v = beta_2 * v + (1-beta_2)*grad_t**2
    m_hat = m/(1-beta_1**t)
    v_hat = v/(1-beta_2**t)
    with torch.no_grad():
        currentImage -= learning_rate*m_hat/(torch.sqrt(v_hat)+eps)
    return currentImage, m, v, t

def forward_step(model, img: torch.Tensor, target_image: torch.Tensor,
                 lr: float, target_logits) -> torch.Tensor:
    """helper function performing the forward step"""
    img = img.detach() # disconnect image from the current autograd graph
    img.requires_grad = True

    logits = model(img)
    loss = torch.norm(logits - target_logits)
    model.zero_grad()
    loss.backward()

    img_grad = img.grad.data
    perturbed_img = img - lr*img_grad
    return perturbed_img

def backward_step(img: torch.Tensor, base_img: torch.Tensor, lr: float, beta: float) -> torch.Tensor:
    """helper function to perform the backward step"""
    perturbed_img = (img + lr*beta*base_img) / (1 + beta*lr)
    perturbed_img = torch.clamp(perturbed_img, 0, 1) # to avoid clipping

    return perturbed_img

In [12]:
import copy
import numpy as np


def craft_clabel_poisons(model, target, bases, n_iter, lr, beta ,device):

    print("[ Initialize.. ]")
    # create a global variable and assign the device which should be used

    poisoned_images = torch.zeros((100, 3, 32,32))
    poisoned_labels = torch.zeros((100))

    target_image = target[0]
    target_image_class = target[1]
    target_image = torch.unsqueeze(target_image, axis = 0)

    # calculate the beta
    img_shape = np.squeeze(target_image).shape
    #beta = 0.25 * (2048 / float(img_shape[0] * img_shape[1] * img_shape[2]))**2
    #print("beta = {}".format(beta))

    # iterate over the whole test dataset and create a perturbed version of one (or N)
    # new_class (the class as which the chosen image should be misclassified as) image.
    adam = False
    current_pertube_count = 0
    for idx, (input, target_cls) in enumerate(bases):
      if(current_pertube_count < 100):
            difference = 100 # difference between base and target in feature space, will be updated
            base_image, target_image = input.to(device), target_image.to(device)
            base_image = torch.unsqueeze(base_image, axis = 0)

            old_image = base_image

            # Initializations
            num_m = 40
            last_m_objs = []
            decay_coef = 0.5 #decay coeffiencet of learning rate
            stopping_tol = 1e-10 #for the relative change
            learning_rate = lr #iniital learning rate for optimization
            rel_change_val = 1e5

            target_feat_rep = model(target_image).detach()





            old_feat_rep = model(base_image).detach() #also known as the old image
            old_obj = torch.linalg.norm(old_feat_rep - target_feat_rep) + \
                      beta*torch.linalg.norm(old_image - base_image)
            last_m_objs.append(old_obj)

            # perform the attack as described in the paper to optimize
            # || f(x)-f(t) ||^2 + beta * || x-b ||^2
            for iteration in range(n_iter):

                if adam:
                    new_image, m, v, t = adam_one_step(model, m, v, t, old_image, target_image,
                                                       learning_rate)
                else:
                    new_image = forward_step(model, old_image, target_image,
                                             learning_rate, copy.deepcopy(target_feat_rep))


                new_image = backward_step(new_image, old_image, learning_rate, beta)

                # check stopping condition:  compute relative change in image between iterations
                rel_change_val = torch.linalg.norm(new_image-old_image)/torch.linalg.norm(new_image)

                if (rel_change_val < stopping_tol) :
                    print("! reached the object threshold -> stopping optimization !")
                    break

                # compute new objective value
                new_feat_rep = model(new_image).detach()
                new_obj = torch.linalg.norm(new_feat_rep - target_feat_rep) + \
                          beta*torch.linalg.norm(new_image - base_image)

                #find the mean of the last M iterations
                avg_of_last_m = sum(last_m_objs)/float(min(num_m, iteration+1))
                # If the objective went up, then learning rate is too big.
                # Chop it, and throw out the latest iteration
                if new_obj >= avg_of_last_m and (iteration % num_m/2 == 0):
                    learning_rate *= decay_coef
                    new_image = old_image
                else:
                    old_image = new_image
                    old_obj = new_obj
                    old_feat_rep = new_feat_rep

                if iteration < num_m-1:
                    last_m_objs.append(new_obj)
                else:
                    #first remove the oldest obj then append the new obj
                    del last_m_objs[0]
                    last_m_objs.append(new_obj)

                # yes that's correct. The following lines will never be reached, exactly
                # like in the original code. But adam optimization makes everything worse anyway..
                if iteration > n_iter:
                    m = 0.
                    v = 0.
                    t = 0
                    adam = True

                difference = torch.linalg.norm(old_feat_rep - target_feat_rep)

            if difference < 3.5:
                poisoned_images[current_pertube_count] = old_image # old_image is overwritten
                poisoned_labels[current_pertube_count] = target_cls
                current_pertube_count += 1

    return poisoned_images, poisoned_labels

Note :- Contrary to the question, i have finetuned upto 5 epochs due to google colab timeout, and CPU takes hours to run, I had tried for 2 target images with 10 epochs and the result is more prominent , i.e the target class is misclassifed more times ,around twice than that of 5 epoch version (current one)

Numbers / result in this form represents,
k [1,5,10,25,50,100] ---  the no of poisoned image in dataset
['F', 'F', 'F', 'F', 'F', 'F'] --- whether the target image is misclassified to the cat class
[40, 36, 47, 44, 55, 46]  --  the no of missclassified image to the cat class

In [17]:
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    for name, param in model.named_parameters():
        if not name.startswith('fc'):
            param.requires_grad = False

    num_successful_attacks = 0
    num_total_attacks = 0
    k =[1, 5, 10, 25, 50, 100]
    for i in range(5):
          missc = []
          missc_targ = []
          # create a pertubed dataset with the chosen target / poison classes
          poisoned_data = craft_clabel_poisons(model, deer_images[i], cat_images, 100, 0.01,0.01 ,  device)
          for j in k:
            misclassified_count = 0
            # Load the state dictionary into the model

            for name, param in model.named_parameters():
              if not name.startswith('fc'):
                param.requires_grad = False

            model.load_state_dict(torch.load('/content/resnet18_weights.pth'))

            m = 900000
            for l in range(j):
              filename = f"X_{m}.png"
              image_path = os.path.join("./cifar_10/train/cat/", filename )
              transforms.ToPILImage()(poisoned_data[0][l]).save(image_path)
              m+=1
            train_dataset = ImageFolder('./cifar_10/train', transform=transform)
            val_dataset = ImageFolder('./cifar_10/val', transform=transform)
            train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
            val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
            optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
            train(model, train_loader, val_loader, criterion, optimizer, num_epochs=5)



            with torch.no_grad():
              model.eval()  # Set the model to evaluation mode

              if(torch.argmax(model(deer_images[i][0].unsqueeze(axis = 0).to(device))) == 3):
                missc_targ.append("T")
              else:
                missc_targ.append("F")

              for inputs, labels in val_loader:
                  # Move the inputs and labels to the device
                  inputs = inputs.to(device)
                  labels = labels.to(device)

                  # Forward pass
                  outputs = model(inputs)
                  _, preds = torch.max(outputs, 1)

                  # Count misclassifications where original class is 3 (cat) and predicted class is 4 (deer)
                  misclassified_count += torch.sum((labels == 4) & (preds == 3)).item()
            missc.append(misclassified_count)

          directory_path = "/content/cifar_10/train/cat/"

          # Iterate over files in the directory
          for filename in os.listdir(directory_path):
              # Check if the filename starts with 'X_'
              if filename.startswith("X_"):
                  file_path = os.path.join(directory_path, filename)

                  # Remove the file
                  os.remove(file_path)


          print(f"for target(deer) image {i}")
          print("k [1,5,10,25,50,100]")
          print(missc_targ)
          print(missc)


[ Initialize.. ]
Epoch [5/5], train loss: 0.0321, train acc: 0.9894, val loss: 0.8119, val acc: 0.8327
Epoch [5/5], train loss: 0.0343, train acc: 0.9893, val loss: 0.7974, val acc: 0.8332
Epoch [5/5], train loss: 0.0347, train acc: 0.9888, val loss: 0.8002, val acc: 0.8320
Epoch [5/5], train loss: 0.0384, train acc: 0.9874, val loss: 0.7988, val acc: 0.8332
Epoch [5/5], train loss: 0.0401, train acc: 0.9866, val loss: 0.8312, val acc: 0.8293
Epoch [5/5], train loss: 0.0404, train acc: 0.9862, val loss: 0.8019, val acc: 0.8312
for target(deer) image 0
k [1,5,10,25,50,100]
['F', 'F', 'F', 'F', 'F', 'F']
[40, 36, 47, 44, 55, 46]
[ Initialize.. ]
Epoch [5/5], train loss: 0.0347, train acc: 0.9882, val loss: 0.8247, val acc: 0.8311
Epoch [5/5], train loss: 0.0341, train acc: 0.9883, val loss: 0.8017, val acc: 0.8330
Epoch [5/5], train loss: 0.0362, train acc: 0.9881, val loss: 0.8121, val acc: 0.8330
Epoch [5/5], train loss: 0.0370, train acc: 0.9878, val loss: 0.8147, val acc: 0.8317
Epoc