In [None]:
import os
import sys
os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"]="0"
import numpy as np
import torch
from torchvision import models
import torch.backends.cudnn as cudnn
import random
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
from utils_rb import *
from ramboattack import RamBoAtt
from HSJA_rb import HSJA
from SignOPT_rb import OPT_attack_sign_SGD
import torch.nn as nn
import torch.nn.functional as F
from PIL import Image

action = -1

# Define the Net class (define the network)

N_ACTIONS = 10
N_STATES = 6144  # Adjusted for concatenated image input

BATCH_SIZE = 32                                  # Number of samples
LR = 0.01                                        # Learning rate
EPSILON = 0.99                                   # Greedy policy
GAMMA = 0.9                                      # Reward discount
TARGET_REPLACE_ITER = 100                        # Frequency of target network update
MEMORY_CAPACITY = 2000                           # Memory capacity

class Net(nn.Module):
    def __init__(self):                          # Define a series of attributes for Net
        super(Net, self).__init__()              # Equivalent to nn.Module.__init__()
        
        self.fc1 = nn.Linear(N_STATES, 50)       # Set the first fully connected layer (input layer to hidden layer): from N_STATES neurons to 50 neurons
        self.fc1.weight.data.normal_(0, 0.1)     # Weight initialization (normal distribution with mean 0 and standard deviation 0.1)
        self.out = nn.Linear(50, N_ACTIONS)      # Set the second fully connected layer (hidden layer to output layer): from 50 neurons to N_ACTIONS neurons
        self.out.weight.data.normal_(0, 0.1)     # Weight initialization (normal distribution with mean 0 and standard deviation 0.1)

    def forward(self, x):                        # Define the forward function (x is the state)
        x = F.relu(self.fc1(x))                  # Connect the input layer to the hidden layer, and use the ReLU activation function to process the value after the hidden layer
        actions_value = self.out(x)              # Connect the hidden layer to the output layer, and get the final output value (i.e., action value)
        return actions_value

class DQN(object):
    def __init__(self):                          # Define a series of attributes for DQN
        self.eval_net, self.target_net = Net(), Net()  # Create two neural networks using Net: evaluation network and target network
        self.learn_step_counter = 0              # For target updating
        self.memory_counter = 0                  # For storing memory
        self.memory = np.zeros((MEMORY_CAPACITY, N_STATES * 2 + 2))  # Initialize the memory, each row represents a transition
        self.optimizer = torch.optim.Adam(self.eval_net.parameters(), lr=LR)  # Use the Adam optimizer (input is the evaluation network parameters and learning rate)
        self.loss_func = nn.MSELoss()            # Use mean squared error loss function (loss(xi, yi) = (xi - yi)^2)

    def choose_action(self, x):                  # Define the action selection function (x is the state)
        x = torch.unsqueeze(torch.FloatTensor(x), 0)  # Convert x to 32-bit floating point format and add a dimension of 1 at dim=0
        if np.random.uniform() < EPSILON:        # Generate a random number in [0, 1), if less than EPSILON, choose the optimal action
            actions_value = self.eval_net.forward(x)  # Get the action value by feeding the state x to the evaluation network
            action = torch.max(actions_value, 1)[1].data.numpy()  # Output the index of the maximum value in each row, and convert to numpy ndarray format
            action = action[0]                   # Output the first number of action
        else:                                    # Randomly choose an action
            action = np.random.randint(0, N_ACTIONS)  # Here action is randomly 0 or 1 (N_ACTIONS = 2)
        return action                            # Return the chosen action (0 or 1)

    def store_transition(self, s, a, r, s_):     # Define the memory storage function (here input is a transition)
        transition = np.hstack((s, [a, r], s_))  # Concatenate arrays horizontally
        # If the memory is full, overwrite old data
        index = self.memory_counter % MEMORY_CAPACITY  # Get the row number where the transition will be placed
        self.memory[index, :] = transition       # Place the transition
        self.memory_counter += 1                 # Increment memory_counter by 1

    def learn(self):                             # Define the learning function (start learning after the memory is full)
        # Target network parameter update
        if self.learn_step_counter % TARGET_REPLACE_ITER == 0:  # Trigger at the beginning, then every 100 steps
            self.target_net.load_state_dict(self.eval_net.state_dict())  # Assign the evaluation network parameters to the target network
        self.learn_step_counter += 1             # Increment learn_step_counter by 1

        # Sample a batch of data from the memory
        sample_index = np.random.choice(MEMORY_CAPACITY, BATCH_SIZE)  # Randomly sample 32 numbers from [0, 2000), may repeat
        b_memory = self.memory[sample_index, :]   # Extract 32 transitions corresponding to the 32 indices, store in b_memory
        b_s = torch.FloatTensor(b_memory[:, :N_STATES])
        # Extract 32 states, convert to 32-bit floating point format, and store in b_s, b_s has 32 rows and 4 columns
        b_a = torch.LongTensor(b_memory[:, N_STATES:N_STATES+1].astype(int))
        # Extract 32 actions, convert to 64-bit integer (signed) format, and store in b_a (it is LongTensor type to facilitate the use of torch.gather), b_a has 32 rows and 1 column
        b_r = torch.FloatTensor(b_memory[:, N_STATES+1:N_STATES+2])
        # Extract 32 rewards, convert to 32-bit floating point format, and store in b_r, b_r has 32 rows and 1 column
        b_s_ = torch.FloatTensor(b_memory[:, -N_STATES:])
        # Extract 32 next states, convert to 32-bit floating point format, and store in b_s_, b_s_ has 32 rows and 4 columns

        # Get the evaluation and target values for 32 transitions, and update the evaluation network parameters using the loss function and optimizer
        q_eval = self.eval_net(b_s).gather(1, b_a)
        # eval_net(b_s) outputs a series of action values for each b_s through the evaluation network, then .gather(1, b_a) aggregates the Q values of the corresponding indices b_a for each row
        q_next = self.target_net(b_s_).detach()
        # q_next does not propagate the error backward, so detach; q_next represents a series of action values for each b_s_ output by the target network
        q_target = b_r + GAMMA * q_next.max(1)[0].view(BATCH_SIZE, 1)
        # q_next.max(1)[0] means only returning the maximum value of each row, not the index (a one-dimensional tensor of length 32); .view() converts the previous one-dimensional tensor into the shape of (BATCH_SIZE, 1); finally get the target value through the formula
        loss = self.loss_func(q_eval, q_target)
        # Input 32 evaluation values and 32 target values, use mean squared error loss function
        self.optimizer.zero_grad()               # Clear the residual update parameter values of the previous step
        loss.backward()                          # Backpropagate the error, calculate the parameter update value
        self.optimizer.step()                    # Execute a single optimization step (parameter update)
        
def imshow(img):
    npimg = img[0].cpu().numpy()
    npimg = np.transpose(npimg, (1, 2, 0))
    npimg = np.clip(npimg, 0, 1)
    plt.imshow(npimg)
    plt.show()
    
def perturbation_heat_map(xo, xa):
    fig_dims = (5, 5)
    fig, ax = plt.subplots(figsize=fig_dims)
    x = torch.abs(xo - xa).sum(dim=1).cpu()[0]
    sns.heatmap(x, ax=ax, xticklabels=False, yticklabels=False, cbar=False)
    plt.show()

# a. Load dataset

batch_size = 32
dataset = 'cifar10' 
datapath = '../datasets/cifar10'
testloader, testset = load_data(dataset, data_path=datapath, batch_size=batch_size)

# b. Load pre-trained model

# 'resnet50' if pre-trained model from Pytorch. 'cifar10' if using pre-trained cifar10 model
arch = 'cifar10' 

# None means using pre-trained model from Pytorch or default path. Otherwise, please change model_path = '...'
model_path = None 

# True means pre-trained model does "not" normalized data while training, 
# so no need to unnorm during inference (used for CIFAR10 model)

if dataset == 'cifar10':
    num_classes = 10
    unnorm = True # True means pre-trained model does "not" normalized data while training.
    
net = load_model(arch, model_path)
model_rb = PretrainedModel(net, dataset, unnorm)

bounds = [0, 1]
model_ex = PytorchModel_ex(net, bounds, num_classes, dataset, unnorm)

# c. Load evaluation set
targeted = True # True means targeted attack. False means untargeted attack.
# 'balance', 'easyset'->imagenet or cifar10; 
# 'hardset'-> imagenet; 
# 'hardset_A','hardset_B','hardset_D' -> cifar10
eval_set =  'hardset_A'

ID_set = get_evalset(dataset, targeted, eval_set)

dqn = DQN()

i = 2 #0,1,2,10,20, 50,123 # the sample i-th in the evaluation set
query_limit = 50000
D = np.zeros(query_limit + 2000)
nquery = 0
o = ID_set[i, 1] #oID

# 0. select original image
oimg, olabel = testset[o]
oimg = torch.unsqueeze(oimg, 0).cuda()

# 1. select starting image
if targeted:
    t = ID_set[i, 3] #tID, 3 is index across dataset - 4 is sample index in a class (not across dataset)
    tlabel = ID_set[i, 2]
    timg, _ = testset[t]
    timg = torch.unsqueeze(timg, 0).cuda()
    y_targ = np.array([tlabel])
else:
    tlabel = None
    y_targ = np.array([olabel])
    
# ============= 2. setup ==============
if dataset == 'cifar10':
    delta = 1e-2
    len_T = 500

constraint = 'l2'
num_iterations = 150
gamma = 1.0
stepsize_search = 'geometric_progression'
max_num_evals = 1e4
init_num_evals = 100
verbose = True
auto_terminate = False

model_ex = PytorchModel_ex(net, bounds, num_classes, dataset, unnorm)
module = HSJA(model_ex, constraint, num_iterations, gamma, stepsize_search, max_num_evals, init_num_evals, verbose, delta, len_T)

# Use DQN to change the label during the attack process
state = np.concatenate((oimg.cpu().numpy().flatten(), timg.cpu().numpy().flatten()))
action = dqn.choose_action(state)
new_label = action  # DQN selects the new label

if targeted:
    adv, nqry, Dt = module.hsja(oimg.cpu().numpy(), np.array([new_label]), timg.cpu().numpy(), targeted, query_limit, auto_terminate)
else:
    timg = None
    adv, nqry, Dt = module.hsja(oimg.cpu().numpy(), np.array([new_label]), timg, targeted, auto_terminate)
    
adv = torch.unsqueeze(torch.from_numpy(adv).float(), 0).cuda()

print('Source image:')
imshow(oimg)

print('Starting image:')
timg, _ = testset[t]
timg = torch.unsqueeze(timg, 0).cuda()
imshow(timg)

print('Adversarial Example:')
imshow(adv)

def save_img(img, filename):
    npimg = img[0].cpu().numpy()
    npimg = np.transpose(npimg, (1, 2, 0))
    npimg = np.clip(npimg, 0, 1)
    pil_img = Image.fromarray(np.uint8(npimg * 255))
    pil_img.save(filename)
save_img(adv, 'adv.jpg')
imshow(adv)

print('Perturbation Heat Map:')
perturbation_heat_map(oimg, adv)
