In [1]:
import utils
import torch
from torchvision import models, transforms
import torch.nn.functional as F
import torch.nn as nn
import json
import gzip

from utils import get_coarse_arrays, eliminate_elements_torch, extract_and_normalize, generate_list, extract_values

from PIL import Image, ImageEnhance
import os
from tqdm import tqdm

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

#with open(os.path.join(res_path, 'results.json'), 'r') as json_file:
#    loaded_results = json.load(json_file)

#with gzip.open(os.path.join(res_path, 'results.json'), 'r') as f:
#    data2 = json.load(f)

In [2]:
classes = utils.get_imagenet_classes()
default_weights = utils.get_default_weights()
models_classif = utils.get_class_model_names()
models_ = utils.get_models_ensamble(models)
labels, index = get_coarse_arrays()

In [3]:
def ifgsm_attack(input, epsilon, data_grad, control = True, mode ='flip'):
    iter = int(min([epsilon + 4, epsilon * 1.25]))  # Number of iterations

    alpha = 1
    pert_out = input.clone().detach()

    for i in range(iter):
        pert_out = pert_out + (alpha / 255) * data_grad.sign()

        if torch.norm((pert_out - input), p=float('inf')) > epsilon / 255:
            break

    adv_pert = (pert_out - input)
    adv_pert_max = torch.max(adv_pert)
    adv_pert_min = torch.min(adv_pert)
    adv_pert_plot = (adv_pert - adv_pert_min)/(adv_pert_max - adv_pert_min)
    normalization = torch.mean(pert_out - input)

    pert_out_plot = pert_out

    if control == True:
        if mode == 'flip':
                       
                control_pert_lr = torch.flip(adv_pert_plot, [3]) #left - right
                control_pert_tb = torch.flip(adv_pert_plot, [2]) #top - bottom
                transposed_image = adv_pert_plot.transpose(2, 3)
                control_pert_diag = torch.flip(transposed_image, [2, 3]) #diagonal

                mse_right_left = F.mse_loss(input, control_pert_lr).item()
                mse_top_bottom = F.mse_loss(input, control_pert_tb).item()
                mse_diagonal = F.mse_loss(input, control_pert_diag).item()

                mse_values = {"Top-bottom": mse_top_bottom, "Diagonal": mse_diagonal, "Right-left": mse_right_left}
                max_mse_flipped_type = max(mse_values, key=mse_values.get)
                print(mse_values)
                                
                if max_mse_flipped_type == "Top-bottom":
                    control_pert_plot = control_pert_tb
                    control_pert = torch.flip(adv_pert, [3])
                    print('Control: Top Bottom')
                    control_image = control_pert + input

                elif max_mse_flipped_type == "Diagonal":
                    control_pert_plot = control_pert_diag
                                        
                    transposed_image = adv_pert.transpose(2, 3)
                    control_pert = torch.flip(transposed_image, [2, 3])
                    print('Control: Diagonal')
                    control_image = control_pert + input

                else:
                    control_pert_plot = control_pert_lr
                    control_pert = torch.flip(adv_pert, [2])
                    print('Control: Left Right')
                    control_image = control_pert + input
                
        return pert_out_plot, adv_pert_plot, control_image, control_pert_plot 
    else:
        return pert_out_plot, adv_pert_plot

In [4]:
def ensamble_attack_coarse_classes_sum_full(image_folder, models, weights, epsilon, classes, targeted = False, t = '0', coarse_class = 'nada', num_classes=1000, graph=False, folder=False, sorted = True,  attack = 'iFGSM',save_path = 0, control = True, save_original = [False, 'path']):
    """Test function to generate adversarial images and obtain predictions using an ensemble of models."""

    save_orig, orig_save_path = save_original
    
    c_classes, c_index = get_coarse_arrays() 
    max_imgs = 200

    for model in models:
        model.eval()

    normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])

    # Prepare output directory
    if folder:
        if save_path == 0:
            if targeted == False:
                output_dir = f"ensamble-attack_ensamble-c_epsilon-{epsilon}-untargeted"
            else:
                output_dir = f"ensamble-attack_ensamble-c_epsilon-{epsilon}-targeted"
            
            os.makedirs(output_dir, exist_ok=True)
        else:
            output_dir = save_path

    results = []
    control_res = []
    print('Iterating over images in folder\n\n')
    
    for i, image_name in enumerate(os.listdir(image_folder)):
        image_path = os.path.join(image_folder, image_name)
        print(f'\n\nImage {i + 1}/{len(os.listdir(image_folder))}--------------------------------------------------------\nImage name:{image_name}')

        # Load and transform image
        img = Image.open(image_path)
        transformed_image = weights(img).unsqueeze(0)
        original_image = transformed_image.clone()

        # Calculate ensemble predictions
        ensemble_outputs = []
        transformed_image.requires_grad = True
        for model in models:
            model.eval()

        try:
            output = model(normalize(transformed_image))
            ensemble_outputs.append(output)

        except RuntimeError as e:
            if "output with shape [1, 1, 256, 256] doesn't match the broadcast shape [1, 3, 256, 256]" in str(e):
                print("Encountered specific RuntimeError. Moving to next element.")
                continue

        ensemble_outputs = torch.stack(ensemble_outputs)
        ensemble_mean_output = torch.mean(ensemble_outputs, dim=0)

        # Get original predictions
        original_output = ensemble_mean_output.clone()
        original_probs = torch.softmax(original_output, dim=1)

        # Calculate loss
        loss = nn.CrossEntropyLoss()

        if targeted == True:
            if type(t) == str:
                target = original_output.max(1)[1]   #maximize original output
                cost = -loss(original_output, target)
            elif type(t) == int:
                target = torch.tensor([t])           #maximize target t
                cost = -loss(original_output, target)

            elif type(t) == np.ndarray or type(t) == list:
                w = extract_and_normalize(original_probs, t)
                probs_t = generate_list(1000, t, w)
                target = torch.tensor([probs_t])           
                cost = -loss(original_output, target)
        else:
            if type(t) != str:
                non_t = extract_values(np.arange(1000), t)
                w = extract_and_normalize(original_probs, non_t)
                probs_t = generate_list(1000, non_t, w)
                target = torch.tensor([probs_t])           
                cost = -loss(original_output, target)
            else:
                target = original_output.max(1)[1]
                cost = loss(original_output, target)  #minimize original output

        #Calculate grad
        for model in models:
            model.zero_grad()

        #loss.backward()
        grad = torch.autograd.grad(cost, transformed_image)[0]

        # Generate adversarial image
        if control == True:
            perturbed_image, adv_pert, control_image, control_pert = ifgsm_attack(transformed_image, epsilon, grad, control = True)
        else:
            perturbed_image, adv_pert = ifgsm_attack(transformed_image, epsilon, grad, control = False)
        # Get perturbed predictions
        perturbed_output = torch.mean(torch.stack([model(normalize(perturbed_image)) for model in models]), dim=0)
        perturbed_probs = torch.softmax(perturbed_output, dim=1)

        if control == True:
            control_output = torch.mean(torch.stack([model(normalize(control_image)) for model in models]), dim=0)
            control_probs = torch.softmax(control_output, dim=1)

        # Compute coarse category scores
        #print('Fine Class --> Coarse class')

        scores_original = torch.zeros(size= (len(c_index),))
        scores_perturbed = torch.zeros(size= (len(c_index),))
        if control == True:
            scores_control = torch.zeros(size= (len(c_index),))

        for i in np.arange(len(c_index)): #Iterate in the coarse classes

            c = c_index[i] #coarse class indexs

            c_values = original_probs[:, c]
            sum = torch.sum(c_values)
            scores_original[i] = sum

            c_values_p = perturbed_probs[:, c]
            sum_p = torch.sum(c_values_p)
            scores_perturbed[i] = sum_p
            
            if control == True:
                c_values_c = control_probs[:, c]
                sum_c = torch.sum(c_values_c)
                scores_control[i] = sum_c

        #Eliminate values of probs (perturbed and original)
        coarse_idx = np.hstack(c_index) #index to eliminate
        coarse_scores = eliminate_elements_torch(original_probs[0], coarse_idx)
        pert_coarse_scores = eliminate_elements_torch(perturbed_probs[0], coarse_idx)
        if control == True:
            ctrl_coarse_scores = eliminate_elements_torch(control_probs[0], coarse_idx)

        new_classes = [string for idx, string in enumerate(classes) if idx not in coarse_idx]

        #Concat new scores and new list of classes
        coarse_scores = torch.cat((coarse_scores, scores_original), dim = 0)
        pert_coarse_scores = torch.cat((pert_coarse_scores,  scores_perturbed), dim = 0)
        if control == True:
            ctrl_coarse_scores = torch.cat((ctrl_coarse_scores,  scores_control), dim = 0)

        new_classes = new_classes + c_classes

        sorted_probs_p, sorted_indices_p = torch.sort(pert_coarse_scores, descending=True)
        sorted_classes_p = [new_classes[i] for i in sorted_indices_p]

        sorted_probs, sorted_indices = torch.sort(coarse_scores, descending=True)
        sorted_classes = [new_classes[i] for i in sorted_indices]

        if folder:
            if len(t) > 1:
                a = targeted == True #Targeted
                b = coarse_class != sorted_classes_p[0] #Targeted Class not equal to Top Class obtained

                c = targeted == False #Not Targeted --> Original output equal to top class
                d = coarse_class == sorted_classes_p[0] #Targeted Class not equal to Top Class obtained

                e = coarse_class == sorted_classes[0] #Make sure that when I iterate over a random set of images, if I expect to have a true class T, and adv class A, these are not the same

                print(f'Original top: {sorted_classes[0]}, Perturbed Top: {sorted_classes_p[0]}')
                if  (a and b) or (c and d):
                    print('Not verify the conditions expected')
                    continue
                elif e:
                    print('Verify condition but the T (true) class is = to A (adv) class')
                    continue

        # Sort predictions
        original_top_classes = original_probs[0].topk(num_classes)
        perturbed_top_classes = perturbed_probs[0].topk(num_classes)
        if control == True:
            control_top_classes = control_probs[0].topk(num_classes)

        original_dict = {classes[idx.item()]: prob.item() for idx, prob in zip(original_top_classes.indices, original_top_classes.values)}
        perturbed_dict = {classes[idx.item()]: prob.item() for idx, prob in zip(perturbed_top_classes.indices, perturbed_top_classes.values)}
        if control == True:
            control_dict = {classes[idx.item()]: prob.item() for idx, prob in zip(control_top_classes.indices, control_top_classes.values)}

        #sorting values 

        if control == True:
            sorted_probs_c, sorted_indices_c = torch.sort(ctrl_coarse_scores, descending=True)
            sorted_classes_c = [new_classes[i] for i in sorted_indices_c]

        #define dicts with probs
        probs_scores = {}
        for key, value in zip(sorted_classes, sorted_probs):
            probs_scores[key] = value.item() 

        probs_p_scores = {}
        for key, value in zip(sorted_classes_p, sorted_probs_p):
            probs_p_scores[key] = value.item() 

        if control == True:
            probs_c_scores = {}
            for key, value in zip(sorted_classes_c, sorted_probs_c):
                probs_c_scores[key] = value.item() 

        results.append({'img_name': image_name, 'original': original_dict, 'perturbed': perturbed_dict, 'coarse_scores': probs_scores, 'pert_coarse_scores': probs_p_scores})
        
        if control == True:
            control_res.append({'ctrl': control_dict, 'ctrl_scores': probs_c_scores})

        #print(probs_c_scores)

        # Save adversarial image
        if folder:
            counter = 1
            
            while True:
                # Generate folder name
                folder_name = f"images_folder_{counter}_epsilon_{epsilon}"
                folder_path = os.path.join(output_dir, folder_name)

                # Check if folder already exists
                if not os.path.exists(folder_path):
                    os.makedirs(folder_path)
                    break
                else:
                    counter += 1
                    if counter > max_imgs:
                        print(f'There are {max_imgs} in the folder: MAX NUMBER REACHED.')
                        return

            perturbed_image = perturbed_image.squeeze().detach().cpu().numpy().transpose(1, 2, 0)
            perturbed_image = np.clip(perturbed_image, 0,1)
            perturbed_image = (perturbed_image * 255).astype('uint8')

            adv_pert = adv_pert.squeeze().detach().cpu().numpy().transpose(1, 2, 0)
            adv_pert =  np.clip(adv_pert, 0, 1)
            adv_pert = (adv_pert * 255).astype('uint8')

            if control == True:
                control_image = control_image.squeeze().detach().cpu().numpy().transpose(1, 2, 0)
                control_image = np.clip(control_image, 0, 1)
                control_image = (control_image * 255).astype('uint8')

                control_pert = control_pert.squeeze().detach().cpu().numpy().transpose(1, 2, 0)
                control_pert = np.clip(control_pert, 0,1 )
                control_pert = (control_pert * 255).astype('uint8')

            atk_path = os.path.join(folder_path, 'atk.png')
            Image.fromarray(perturbed_image).resize((720,720)).save(atk_path, optimize=True)
            pert_path = os.path.join(folder_path, 'pert.png')
            Image.fromarray(adv_pert).resize((720,720)).save(pert_path, optimize=True)

            if control == True:
                control_pert_path = os.path.join(folder_path, 'control_pert.png')
                Image.fromarray(control_pert).resize((720,720)).save(control_pert_path, optimize=True)
                control_path = os.path.join(folder_path, 'control.png')
                Image.fromarray(control_image).resize((720,720)).save(control_path, optimize=True)

            with gzip.open(os.path.join(folder_path, 'results.json.gz'), 'wt') as f:
                json.dump(results, f)

            if control == True:
                with gzip.open(os.path.join(folder_path, 'control.json.gz'), 'wt') as f:
                    json.dump(control, f)

            if (a & b):
                print('SAVED IMAGE. Equal Target as Top Perturbed')
            elif (c & d):
                print('SAVED IMAGE. Different Top Perturbed from Original')

        if save_orig == True:
            img.save(os.path.join(orig_save_path, image_name))

        print(f'Epsilon: {epsilon}')
        if targeted == True:
            if type(t) == str:
                print(f'Targeted Attack. Maximize Original Output\nOriginal prob: {probs_scores[sorted_classes[0]]},\nPerturbed prob: {probs_p_scores[sorted_classes[0]]}\nControl prob: {probs_c_scores[sorted_classes[0]]}')
            elif len(t) == 1:
                print(f'Targeted Attack = {classes[t[0]]}\nOriginal prob target: {probs_scores[classes[t[0]]]},\nPerturbed prob target: {probs_p_scores[classes[t[0]]]}\nControl prob target: {probs_c_scores[classes[t[0]]]}')
            elif len(t) > 1:
                print(f'Targeted Attack = {coarse_class}\nOriginal prob target: {probs_scores[coarse_class]},\nPerturbed prob target: {probs_p_scores[coarse_class]},\nPerturbed Top class & Prob: {sorted_classes_p[0]} & {sorted_probs_p[0]}')
                if control == True:
                    print(f'\nControl prob target: {probs_c_scores[coarse_class]}')
        else:
            print(f'Untargeted Attack. Minimize Original Output\nOriginal prob target: {probs_scores[sorted_classes[0]]},\nPerturbed prob target: {probs_p_scores[sorted_classes[0]]},\nPerturbed Top class & Prob: {sorted_classes_p[0]} & {sorted_probs_p[0]}\nControl prob target: {probs_c_scores[sorted_classes[0]]}')

        if graph:
            if folder != True:
                perturbed_image = perturbed_image.squeeze().detach().cpu().numpy().transpose(1, 2, 0)
                adv_pert_plot = adv_pert.squeeze().detach().cpu().numpy().transpose(1, 2, 0)

                if control == True:
                    control_image = control_image.squeeze().detach().cpu().numpy().transpose(1, 2, 0)
                    control_pert = control_pert.squeeze().detach().cpu().numpy().transpose(1, 2, 0)


            if control == True:
                f, axs = plt.subplots(1, 3, figsize=(17, 5))
                axs[0].imshow(original_image.squeeze().detach().cpu().numpy().transpose(1, 2, 0))
                axs[0].set_title(f"Original: {max(original_dict, key=original_dict.get)} - {np.round(max(original_dict.values()), 3)}" + "\n" + f"Original: {max(probs_scores, key=probs_scores.get)} - {np.round(max(probs_scores.values()), 3)}", fontsize=7)
                axs[0].set_axis_off()

                axs[1].imshow(np.clip(perturbed_image, 0, 1))
                axs[1].set_title(f"Perturbed: {max(perturbed_dict, key=perturbed_dict.get)} - {np.round(max(perturbed_dict.values()), 3)}" + "\n" + f"Perturbed: {max(probs_p_scores, key=probs_p_scores.get)} - {np.round(max(probs_p_scores.values()), 3)}", fontsize=7)
                axs[1].set_axis_off()

                axs[2].imshow(np.clip(control_image, 0, 1))
                axs[2].set_title(f"Control Image: {max(control_dict, key=control_dict.get)} - {np.round(max(control_dict.values()), 3)}" + "\n" + f"Control: {max(probs_c_scores, key=probs_c_scores.get)} - {np.round(max(probs_c_scores.values()), 3)}", fontsize=7)
                axs[2].set_axis_off() 

                plt.subplots_adjust(wspace=0.5)  # Add space between the plots
                plt.show()

                plt.subplot(1, 2, 1)
                plt.imshow(adv_pert_plot)
                plt.title(f"Perturbation", fontsize=7)
                plt.axis('off') 

                plt.subplot(1, 2, 2)
                plt.imshow(np.clip(control_pert, 0, 1))
                plt.title(f"Perturbation - Control", fontsize=7)
                plt.axis('off') 

                plt.subplots_adjust(wspace=0.5)  # Add space between the plots
                plt.show()
            
            else:
                f, axs = plt.subplots(1, 3, figsize=(17, 5))
                axs[0].imshow(original_image.squeeze().detach().cpu().numpy().transpose(1, 2, 0))
                axs[0].set_title(f"Original: {max(original_dict, key=original_dict.get)} - {np.round(max(original_dict.values()), 3)}" + "\n" + f"Original: {max(probs_scores, key=probs_scores.get)} - {np.round(max(probs_scores.values()), 3)}", fontsize=7)
                axs[0].set_axis_off()

                axs[1].imshow(np.clip(perturbed_image, 0, 1))
                axs[1].set_title(f"Perturbed: {max(perturbed_dict, key=perturbed_dict.get)} - {np.round(max(perturbed_dict.values()), 3)}" + "\n" + f"Perturbed: {max(probs_p_scores, key=probs_p_scores.get)} - {np.round(max(probs_p_scores.values()), 3)}", fontsize=7)
                axs[1].set_axis_off()

                axs[2].imshow(np.clip(adv_pert_plot, 0, 1))
                axs[2].set_title(f"Perturbation", fontsize=7)
                axs[2].set_axis_off() 

                plt.subplots_adjust(wspace=0.5)  # Add space between the plots
                plt.show()

    return results

### Target Cat

In [5]:
save_t_cats = r'C:\Users\Usuario\Documents\Trini\Facultad\Tesis\Código Personal\Results\Exp 3\Target Cat\Epsilon 2'
save_original_success = r'C:\Users\Usuario\Documents\Trini\Facultad\Tesis\Código Personal\Results\Exp 3\Target Cat\Good Images For Target'
rndm_folders = [r'C:\Users\Usuario\Documents\Trini\Facultad\Tesis\Código Personal\image_net\imagenet_random_images_kaggle\imagenet\val', r'C:\Users\Usuario\Documents\Trini\Facultad\Tesis\Código Personal\image_net\class_images_image_net\00151', r'C:\Users\Usuario\Documents\Trini\Facultad\Tesis\Código Personal\image_net\class_images_image_net\00153']

In [None]:
for f in rndm_folders:
        test = ensamble_attack_coarse_classes_sum_full(f, models_, default_weights, 2, classes, targeted=True, t = index[0], coarse_class='cat', graph= False, folder = True, save_path=save_t_cats, save_original=[True, save_original_success], control=True)