# Adversarial samples generator
Generate the adversarial samples to test the models

**Authors**

`Marco Alecci <https://github.com/MarcoAlecci>`

`Francesco Marchiori <https://github.com/FrancescoMarchiori>`

`Luca Martinelli <https://github.com/luca-martinelli-09>`

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/luca-martinelli-09/learn-the-art/blob/main/FGSM_tuning/tuning.ipynb)

In [None]:
import os

# @markdown ## Setup project
# @markdown This section will download the datasets from GitHub to use for the training phase

if not os.path.exists("./datasets"):
    !git clone "https://github.com/luca-martinelli-09/learn-the-art.git"

    %cd learn-the-art/

In [None]:
import sys
IN_COLAB = 'google.colab' in sys.modules

datasetToFolder = {"ddg": "ddg", "bing": "bing", "google": "google"}
googleModelsDir = None

if IN_COLAB:
  !pip install torchattacks

  from google.colab import drive
  drive.mount('/content/drive')

  googleModelsDir = "/content/drive/MyDrive/Università/Magistrale/II Anno/I Semestre/Advanced Topics in Computer and Network Security/Project/Models"
  
  datasetToFolder = {"ddg": "DuckDuckGo", "bing": "Bing", "google": "Google"}

In [None]:
import os
import time
import torch
import torchvision
import numpy as np
import torchvision
from torchvision import transforms
from torch.utils.data import DataLoader
from PIL import Image

from torchattacks import FGSM

from imageLimitedDataset import ImageLimitedDataset

print("PyTorch Version:", torch.__version__)
print("Torchvision Version:", torchvision.__version__)

In [None]:
# Detect if we have a GPU available
print("CUDA available:", torch.cuda.is_available())
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
# @markdown Shuffle the dataset
shuffleDataset = False  # @param {type: "boolean"}

# @markdown Reduce the size of the dataset
datasetSize = 100  # @param {type: "integer"}

# @markdown Decide whether to adapt the attack or not. When True, adversarial samples are tested after the generation in order to see if the accuracy of the same model on which it has been trained goes below a threshold, otherwise reiterate the whole process with a different eps value
adapt = False  # @param {type: "boolean"}

# @markdown Threshold for the maximum value of the accuracy
accThreshold = 0.3 # @param {type: "number"}

# @markdown How much to increase the value of eps when using adapting attack
epsStep = 0.05 # @param {type: "number"}

# @markdown Set the value of epsilon
epsilon = 0.05  # @param {type: "number"}

# @markdown Set the maximum value of epsilon
maxEpsilon = 0.2  # @param {type: "number"}

In [None]:
# datasetsGenerateOnly = ["google"] # Use this if you want to select only one dataset from bing, ddg or google
datasetsGenerateOnly = ["bing", "google", "ddg"] # Use this if you want to get all the datasets

# modelsGenerateOnly = ["vgg"] # Use this if you want to select only one model from alexnet, resnet or vgg, None if select all
modelsGenerateOnly = ["alexnet", "resnet", "vgg"] # Use this if you want to select all the models

In [None]:
adversarialDir = "./adversarial_samples"
datasetsDir = "../datasets"
modelsDir = googleModelsDir if googleModelsDir else "../models"

inputSize = 224 # Specified for alexnet, resnet, vgg

In [None]:
SEED = 151836


def setSeed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)


setSeed(SEED)

In [None]:
def getSubDirs(dir):
    return [x for x in os.listdir(dir) if os.path.isdir(os.path.join(dir, x))]

In [None]:
def getClassPercents(sizes):
    totalSize = np.sum(np.array(sizes))
    percents = []
    for size in sizes:
        percents.append(int(round((size / totalSize) * 100)))

    return percents

In [None]:
def saveMathAdversarials(dataloader, classes, fileNames, attack, saveDir, shuffled=False):

    i = 0;
    for images, labels in dataloader:
        adversarials = attack(images, labels)

        for adversarial, label in zip(adversarials, labels):
            image = transforms.ToPILImage()(adversarial).convert("RGB")
            path = os.path.join(saveDir, classes[label])

            if not os.path.exists(path):
                os.makedirs(path)

            imageName = i + ".jpg" if shuffled else os.path.basename(fileNames[i][0])
            image.save(os.path.join(path, imageName), "JPEG")

            i += 1

            if i % 20 == 0:
                print("Sample #", i)

In [None]:
def printGPUStats():
    print('Using device:', device)
    print()

    # Additional Info when using cuda
    if device.type == 'cuda':
        print(torch.cuda.get_device_name(0))
        print('[💻 MEMORY USAGE]')
        print('[📌 ALLOCATED]', round(
            torch.cuda.memory_allocated(0) / 1024 ** 3, 1), 'GB')
        print('[🧮 CACHED]', round(
            torch.cuda.memory_reserved(0) / 1024 ** 3, 1), 'GB')

In [None]:
def getBestScores(hist, key, min=False):
    scores = [x[key] for x in hist]

    if min:
        i = np.argmin(np.array(scores))
    else:
        i = np.argmax(np.array(scores))

    return hist[i], i

In [None]:
def getMeanAndSDT(dataloader):
    channels_sum, channels_squared_sum, num_batches = 0, 0, 0
    for data, _ in dataloader:
        # Mean over batch, height and width, but not over the channels
        channels_sum += torch.mean(data, dim=[0, 2, 3])
        channels_squared_sum += torch.mean(data**2, dim=[0, 2, 3])
        num_batches += 1

    mean = channels_sum / num_batches

    # std = sqrt(E[X^2] - (E[X])^2)
    std = (channels_squared_sum / num_batches - mean ** 2) ** 0.5

    return mean, std

In [None]:
def getScores(labels, predicted):
    acc = torch.sum(predicted == labels) / len(predicted)

    tp = (labels * predicted).sum()
    tn = ((1 - labels) * (1 - predicted)).sum()
    fp = ((1 - labels) * predicted).sum()
    fn = (labels * (1 - predicted)).sum()

    precision = tp / (tp + fp)
    recall = tp / (tp + fn)

    f1 = 2 * (precision * recall) / (precision + recall)

    return acc, precision, recall, f1

In [None]:
def evaluateModel(model, dataloader):
    model.eval()
    labelsOutputs = torch.tensor([]).to(device, non_blocking=True)
    labelsTargets = torch.tensor([]).to(device, non_blocking=True)

    for inputs, labels in dataloader:
        inputs = inputs.to(device, non_blocking=True)
        labels = labels.to(device, non_blocking=True)

        with torch.set_grad_enabled(False):
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

        labelsOutputs = torch.cat([labelsOutputs, preds], dim=0)
        labelsTargets = torch.cat([labelsTargets, labels], dim=0)

    acc, precision, recall, f1 = getScores(labelsTargets, labelsOutputs)

    return {
        "acc": acc.cpu().numpy(),
        "precision": precision.cpu().numpy(),
        "recall": recall.cpu().numpy(),
        "f1": f1.cpu().numpy()
    }

In [None]:
def evaluateModelsOnDataset(datasetFolder, datasetInfo):
    global modelsDir, inputSize

    modelsEvals = []

    # Get the images and calculate mean and standard deviation
    imageDataset = torchvision.datasets.ImageFolder(
        datasetFolder, transform=transforms.Compose([transforms.ToTensor()]))
        
    for cls in imageDataset.classes:
        cls_index = imageDataset.class_to_idx[cls]
        num_cls = np.count_nonzero(
            np.array(imageDataset.targets) == cls_index)
        
        print("\t[🧮 # ELEMENTS] {}: {}".format(cls, num_cls))
    
    imageDataloader = DataLoader(imageDataset, batch_size=128)
    
    mean, std = getMeanAndSDT(imageDataloader)

    # Setup for normalization
    dataTransform = transforms.Compose([
        transforms.Resize(inputSize),
        transforms.ToTensor(),
        transforms.Normalize(mean, std)
    ])

    testDataset = ImageLimitedDataset(
        datasetFolder, transform=dataTransform, use_cache=True, check_images=False)

    setSeed(SEED)
    testDataLoader = DataLoader(
        testDataset, batch_size=64, shuffle=True, num_workers=0, pin_memory=True)
    
    # Evaluate every model
    for root, _, fnames in sorted(os.walk(modelsDir, followlinks=True)):
        for fname in sorted(fnames):
            path = os.path.join(root, fname)

            try:
                modelData = torch.load(path)
            except:
                continue

            modelDataset = modelData["dataset"]
            modelName = modelData["model_name"]
            modelPercents = "/".join([str(x)
                                     for x in getClassPercents(modelData["dataset_sizes"])])

            print()
            print("[🧮 EVALUATING] {} - {} {}".format(
                modelDataset,
                modelName,
                modelPercents
            ))

            modelToTest = modelData["model"]
            modelToTest = modelToTest.to(device, non_blocking=True)

            scores = evaluateModel(modelToTest, testDataLoader)

            modelsEvals.append({
                    "dataset": datasetInfo["dataset"],
                    "isMath": datasetInfo["math"],
                    "attack": datasetInfo["attack"],
                    "advModel": datasetInfo["model"],
                    "advBalancing": datasetInfo["balancing"],

                    "model": modelName,
                    "modelDataset": modelDataset,
                    "balancing": modelPercents,
                    "acc": scores["acc"],
                    "precision": scores["precision"],
                    "recall": scores["recall"],
                    "f1": scores["f1"],
                })
            
            print("\tAcc: {:.4f}".format(scores["acc"]))
            print("\tPre: {:.4f}".format(scores["precision"]))
            print("\tRec: {:.4f}".format(scores["recall"]))
            print("\tF-Score: {:.4f}".format(scores["f1"]))

            torch.cuda.empty_cache()
        
    return modelsEvals

In [None]:
timesEvaluations = []

In [None]:
datasetsToGenerate = getSubDirs(datasetsDir) if not datasetsGenerateOnly else datasetsGenerateOnly

if not adapt:
    for dataset in datasetsToGenerate:
        print("\n" + "-" * 15)
        print("[🗃️ SOURCE DATASET] {}".format(dataset))

        datasetDir = os.path.join(datasetsDir, dataset)
        testDir = os.path.join(datasetDir, "test")

        datasetAdvDir = os.path.join(adversarialDir, dataset)
        mathAttacksDir = os.path.join(datasetAdvDir, "math")

        if not os.path.exists(mathAttacksDir):
            os.makedirs(mathAttacksDir)

        toTensor = transforms.Compose([transforms.ToTensor()])
        testDataset = ImageLimitedDataset(
            testDir, transform=toTensor, slices=[slice(0, datasetSize)], use_cache=False, check_images=False)

        setSeed(SEED)
        testDataLoader = DataLoader(
            testDataset, batch_size=16, num_workers=0, shuffle=shuffleDataset)
        
        for root, _, fnames in sorted(os.walk(os.path.join(modelsDir, datasetToFolder[dataset]), followlinks=True)):
            for fname in sorted(fnames):
                path = os.path.join(root, fname)

                try:
                    modelData = torch.load(path)
                except:
                    continue

                modelDataset = modelData["dataset"]
                modelName = modelData["model_name"]

                if not modelName in modelsGenerateOnly:
                    torch.cuda.empty_cache()
                    continue
                
                modelPercents = "_".join([str(x)
                                        for x in getClassPercents(modelData["dataset_sizes"])])
                model = modelData["model"].to(device)

                attacks = {
                    "FGSM": FGSM(model, eps=epsilon),
                }

                for attack in attacks:
                    attacker = attacks[attack]

                    attackDir = os.path.join(
                        mathAttacksDir, attack)
                    saveDir = os.path.join(
                        attackDir, modelName + "/" + modelPercents)
                    
                    if not os.path.exists(saveDir):
                        os.makedirs(saveDir)

                    currentTime = time.time()
                    print("[⚔️ ADVERSARIAL] {} - {} - {} {}".format(
                        attack,
                        modelDataset,
                        modelName,
                        modelPercents
                    ))

                    setSeed(SEED)
                    saveMathAdversarials(testDataLoader, testDataset.classes,
                                        testDataset.imgs, attacker, saveDir, shuffled=shuffleDataset)

                    elapsedTime = time.time() - currentTime
                    print("Elapsed seconds:", elapsedTime)
                    timesEvaluations.append({
                        "dataset": dataset,
                        "math": True,
                        "attack": attack,
                        "model": modelName,
                        "modelDataset": modelDataset,
                        "balancing": modelPercents.replace("_", "/"),
                        "time": elapsedTime,
                    })

                    torch.cuda.empty_cache()
else: # When using adapting attack
    for dataset in datasetsToGenerate:
        print("\n" + "-" * 15)
        print("[🗃️ SOURCE DATASET] {}".format(dataset))

        datasetDir = os.path.join(datasetsDir, dataset)
        testDir = os.path.join(datasetDir, "test")

        datasetAdvDir = os.path.join(adversarialDir, dataset)
        mathAttacksDir = os.path.join(datasetAdvDir, "math")

        if not os.path.exists(mathAttacksDir):
            os.makedirs(mathAttacksDir)

        toTensor = transforms.Compose([transforms.ToTensor()])
        testDataset = ImageLimitedDataset(
            testDir, transform=toTensor, slices=[slice(0, datasetSize)], use_cache=False, check_images=False)

        setSeed(SEED)
        testDataLoader = DataLoader(
            testDataset, batch_size=16, num_workers=0, shuffle=shuffleDataset)
        
        for root, _, fnames in sorted(os.walk(os.path.join(modelsDir, datasetToFolder[dataset]), followlinks=True)):
            for fname in sorted(fnames):
                path = os.path.join(root, fname)

                try:
                    modelData = torch.load(path)
                except:
                    continue

                modelDataset = modelData["dataset"]
                modelName = modelData["model_name"]

                if not modelName in modelsGenerateOnly:
                    torch.cuda.empty_cache()
                    continue
                
                modelPercents = "_".join([str(x)
                                        for x in getClassPercents(modelData["dataset_sizes"])])
                model = modelData["model"].to(device)

                #------------------------------
                eps_it = epsilon

                while(scores["acc"] > accThreshold and eps_it <= maxEpsilon):
                    attacks = {
                        "FGSM": FGSM(model, eps=eps_it),
                    }

                    for attack in attacks:
                        attacker = attacks[attack]

                        attackDir = os.path.join(
                            mathAttacksDir, attack)
                        saveDir = os.path.join(
                            attackDir, modelName + "/" + modelPercents)
                        
                        if not os.path.exists(saveDir):
                            os.makedirs(saveDir)

                        currentTime = time.time()
                        print("[⚔️ ADVERSARIAL] {} - {} - {} {}".format(
                            attack,
                            modelDataset,
                            modelName,
                            modelPercents
                        ))
                        print("[🏴‍☠️ EPSILON VALUE] {}".format(eps_it))

                        setSeed(SEED)
                        saveMathAdversarials(testDataLoader, testDataset.classes,
                                            testDataset.imgs, attacker, saveDir, shuffled=shuffleDataset)

                        elapsedTime = time.time() - currentTime
                        print("Elapsed seconds:", elapsedTime)
                        timesEvaluations.append({
                            "dataset": dataset,
                            "math": True,
                            "attack": attack,
                            "model": modelName,
                            "modelDataset": modelDataset,
                            "balancing": modelPercents.replace("_", "/"),
                            "time": elapsedTime,
                        })

                        torch.cuda.empty_cache()

                        advDatasetInfo = {
                            "dataset": dataset,
                            "math": True,
                            "attack": attack,
                            "balancing": modelPercents.replace("_", "/"),
                            "model": modelName,
                        }

                        print("[🧮 EVALUATING] {} - {} {}".format(
                            modelDataset,
                            modelName,
                            modelPercents
                        ))

                        modelToTest = modelData["model"]
                        modelToTest = modelToTest.to(device, non_blocking=True)

                        imageDataset = torchvision.datasets.ImageFolder(
                            saveDir, transform=transforms.Compose([transforms.ToTensor()]))
                            
                        for cls in imageDataset.classes:
                            cls_index = imageDataset.class_to_idx[cls]
                            num_cls = np.count_nonzero(
                                np.array(imageDataset.targets) == cls_index)
                        
                        imageDataloader = DataLoader(imageDataset, batch_size=128)
                        
                        mean, std = getMeanAndSDT(imageDataloader)

                        # Setup for normalization
                        dataTransform = transforms.Compose([
                            transforms.Resize(inputSize),
                            transforms.ToTensor(),
                            transforms.Normalize(mean, std)
                        ])

                        advDataset = ImageLimitedDataset(
                            saveDir, transform=dataTransform, use_cache=True, check_images=False)

                        setSeed(SEED)
                        advDataLoader = DataLoader(
                            advDataset, batch_size=64, shuffle=True, num_workers=0, pin_memory=True)

                        scores = evaluateModel(modelToTest, advDataLoader)
                        print("[📏 ACCURACY]", scores["acc"])
                        print()
                        eps_it = eps_it + epsStep

In [None]:
modelsEvals = []

print("[🧠 MODELS EVALUATION - FGSM w/ EPS = {}]".format(epsilon))

# Evaluate models on math attacks folders
for dataset in getSubDirs(adversarialDir):
    datasetDir = os.path.join(adversarialDir, dataset)
    mathAdvDir = os.path.join(datasetDir, "math")

    if not os.path.exists(mathAdvDir):
        continue

    for attack in getSubDirs(mathAdvDir):
        attackDir = os.path.join(mathAdvDir, attack)

        for advModel in getSubDirs(attackDir):
            advModelDir = os.path.join(attackDir, advModel)

            for advBalancing in getSubDirs(advModelDir):
                advDatasetDir = os.path.join(advModelDir, advBalancing)

                print("\n" + "-" * 15)
                print("[🗃️ ADVERSARIAL DATASET] {}/{}/{}/{}".format(dataset, attack, advModel, advBalancing))

                advDatasetInfo = {
                    "dataset": dataset,
                    "math": True,
                    "attack": attack,
                    "balancing": advBalancing.replace("_", "/"),
                    "model": advModel,
                }

                evals = evaluateModelsOnDataset(advDatasetDir, advDatasetInfo)
                modelsEvals.extend(evals)

In [None]:
import pandas as pd

modelsEvalsDF = pd.DataFrame(modelsEvals)

In [None]:
modelsEvalsDF

In [None]:
modelsEvalsDF.to_csv("FGSM_Evaluations.csv")