# Natural Adversarial Observations for Atari RL 

With this notebook, you will be able to create natural-looking adversarial observations for Atari agents based on the DQN algorithm.
This repository comes with 3 already trained agents for Enduro, Road Runner and Breakout.

In [None]:
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import random
import pickle


from agent.AtariAgent import DQN, ReplayMemory

In [None]:
#---Helper functions
def preprocess(frame):
    '''
    Input: (210,160,3) image, pixel values between 0 and 255
    Return: (210,160) grayscale image, pixel values between 0 and 255
    '''
    frame = frame.mean(2).astype(np.uint8)
    return frame

def frame_to_tensor(frame):
    '''
    Input: (210,160) grayscale image, pixel values between 0 and 255
    Returns: (210,160) tensor of the image, pixel values between 0 and 1, moved to GPU
    '''
    frame = frame / 255.
    tensor = torch.tensor(frame, dtype=torch.float32, device=device, requires_grad=False)
    return tensor

def deque_to_tensor(stack):
    '''
    Input: Deque of 4 (210,160) tensors, pixel values between 0 and 1
    Output: Tensor of shape (1,4,210,160) for forward pass through the network
    '''
    stack = list(stack)
    stack = torch.stack(stack).to(device)
    return stack.unsqueeze(0)

def random_transition(model, env, lower_bound=0, upper_bound=2000):
    idx = random.randint(lower_bound, upper_bound)
    state = env.reset()
    transition = deque(maxlen=4)
    for i in range(idx):
        while len(transition) < 4:
            state, _, done, _ = env.step(env.action_space.sample())
            transition.append(frame_to_tensor(preprocess(state)))
        state, _, done, _ = env.step(model.predict_action(deque_to_tensor(transition)))
        transition.append(frame_to_tensor(preprocess(state)))
        if done == True:
            state=env.reset()
    
    return deque_to_tensor(transition)
    

def plot_images(images, n_rows, n_columns, figsize=(15,15), cmap='gray', grid=True, show=True, save=False, name=None):
    fig=plt.figure(figsize=figsize)
    columns = n_columns
    rows = n_rows
    for i in range(1, columns*rows +1):
        img = images[i-1]
        fig.add_subplot(rows, columns, i)
        plt.imshow(img, cmap=cmap)
        if grid == False:
            plt.axis('off')
    if save:
        plt.savefig(name, dpi=200)
    if show:
        plt.show()

def plot_results(transition, delta, figsize=(15,15), cmap='gray', grid=True, show=True, save=False, name=None):
    fig = plt.figure()
    rows=1
    columns = 4
    plot_images(delta, n_rows=1, n_columns=delta.shape[0], figsize=figsize, grid=False, show=False)
    if save:
        plt.savefig(name + '_1.pdf', dpi=200)
    plot_images(transition, n_rows=1, n_columns=transition.shape[0], figsize=figsize, grid=False, show=False)
    if save:
        plt.savefig(name + '_2.pdf', dpi=200)
    plot_images(transition+delta, n_rows=1, n_columns=transition.shape[0], figsize=figsize, grid=False, show=False)
    if save:
        plt.savefig(name + '_3.pdf', dpi=200)
    if show:
        plt.show()
    
    
    
#--Declare and fill memory for Enduro
def fill_memory(env, model, memory, runs):
    eps_reward = 0
    frame_number = 0
    done = False

    state = env.reset()
    states = deque(maxlen=4)
    rewards = []

    i = 1
    
    last_lives = 0
    
    
    while memory.count < memory.size:
        # stack frames
        states.append(frame_to_tensor(preprocess(state)))
        if len(states) < 4:
            action = env.action_space.sample()
        else:
            #as soon as 4 frames are stacked, use them to predict the action
            stack = deque_to_tensor(states)
            action = model.predict_action(stack)
        # apply action to the environment
        next_state, reward, done, info = env.step(action)

        
        if info['ale.lives'] < last_lives:
            terminal_life_lost = True
        else:
            terminal_life_lost = done
            last_lives = info['ale.lives']
        
        memory.add_experience(action=action,
                   frame=preprocess(next_state),
                   reward=reward, 
                   terminal=terminal_life_lost)
        
        
        eps_reward += reward
        state = next_state
        
        if done == True:
            state = env.reset()
            print("Total episode reward: ", eps_reward)
            rewards.append(eps_reward)
            eps_reward = 0
            done = False
            if len(rewards) == runs:
                break

    print("Done!")
    avg_reward_unaltered = np.average(rewards)
    print("Average reward over {} runs: {}".format(len(rewards), avg_reward_unaltered))


Depending on which agent you want to attack, first change the name of the game.

Also, if you plan on attacking the agent for Enduro change the ```hidden``` parameter to '128' instead of '256' for Breakout and Road Runner.

In the following cell, all actions and their meanings are printed to give you an overview of what the actions will most likely do if applied to the environment.

In [None]:
#--Create environment and load trained agent
env = gym.make('Breakout-v4')
n_actions = env.action_space.n
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') 

model = DQN(n_actions, hidden=256)
model = model.to(device)
model.load_state_dict(torch.load("agent/Breakout.pth"))
model.eval()

!mkdir -p ./perturbations/Breakout/
!mkdir -p ./memory/
!mkdir -p ./plots/Breakout/diagrams/ics
!mkdir -p ./plots/Breakout/diagrams/ucs
!mkdir -p ./plots/Breakout/diagrams/cduap

In [None]:
#--Print action names
print(env.unwrapped.get_action_meanings())


Here we create and fill the replay buffer. The replay buffer is originally utilised during the training of the agents. It will also come in handy while creating the universal adversarial perturbations and to test the impact of our attack.

In [None]:
memory = ReplayMemory(size=30000)
runs = 10

fill_memory(env, model, memory, runs)
# save the memory for later use
#with open('./memory/Breakout.p','wb') as f: pickle.dump(memory, f)

# or load an already saved replay buffer
#memory = pickle.load(open('./memory/Breakout.p', 'rb'))

## Input- and class-specific
With this function, you will be able to create a perturbation for each input to the agent. The perturbation is generated such that the agent will only perform a single action (the action defined with ```target_action```).

In [None]:
#--Input and class specific
def ics(model, transition, target_action, lr=3e-2, verbose=False, iterations=100):
    assert target_action in range(n_actions), "The target action must be in {}".format(range(n_actions))
    target_action = torch.LongTensor([target_action]).to(device)            # action needs to be a tensor for the optimizer
    
    delta = torch.zeros((4,210,160), requires_grad=True, device=device)     # create the perturbation to be the same size as the input and fill with 0s
    opt = torch.optim.Adam([delta], lr=lr)                                  # initialize the optimizer to update the perturbation's pixel values with the 
                                                                            # set learning rate

    pred_action = model.predict_action(transition).item()                   # get the 'original' action the agent will chosse for the input
    
    q_vals = model.predict_q(transition + delta)                            # get the predicted Q-values Q(s,a) for the adversarial example
    
    i = 0
    while torch.argmax(q_vals) != target_action:                            # while the new, adversarial action is not equal to the desired target action
        loss = F.cross_entropy(q_vals, target_action)                       # calculate the loss between the Q-values and the target action
        opt.zero_grad()
        loss.backward()                                                     # calculate the gradient w.r.t. the loss 
        opt.step()                                                          # and optimize the values of the perturbation
        q_vals = model.predict_q(transition + delta)                        # get the predicted Q-values Q(s,a) for the adversarial example
        i += 1
        if i > iterations:                                                  # if a maximum amount of itertions is reached, break out of the function
            break
        if verbose:                                                         # if you want to see the progess, print it with the verbose option set to True
            if i % 100 == 0:
                 print("Epoch: {}, Loss: {}, Predicted action: {}".format(i, loss, torch.argmax(logits)))

    if verbose:
        print("Action {} changed to action {} with perturbation".format(pred_action, \
                                                                     model.predict_action(transition+delta).item()))
    return delta                                                           # return the perturbation

First, we will get a random input, a stack of 4 consecutive, grayscaled frames from the environment, in this case called transition. 
Of course, we need a transition for which the agent will predict a different action than the one we want it to predict after the attack. 
Here, we set the target action to be action 0 and get a random transition from the environment.

In [None]:
target_action = pred = 0                                                    # set the desired target action
while pred == target_action:
    transition = random_transition(model, env)                              # and get a random transition from the environment
    pred = model.predict_action(transition).item()

delta_ics = ics(model, transition, target_action=target_action, lr=5e-3, iteration=100, verbose=True)   # get the perturbation with the defined function 
                                                                                                        # from above
#with open('perturbations/Breakout/ics','wb') as f: pickle.dump(delta_ics.detach().cpu().numpy(), f)    # If you want to save the perturbation, uncomment
#plt.imshow(200*delta_is_cs.detach().cpu().permute(1,2,0).numpy())                                      # If you want to save an image of the perturbation,
                                                                                                        # uncomment the next lines
#plt.axis('off')
#plt.savefig('perturbations/Breakout/ics.png', dpi=200)  
plot_results(transition.detach().squeeze(0).cpu().numpy(), delta_is_cs.detach().cpu().numpy(), save=False, name='plots/Breakout/ics_2')   # plot the results,
                                                                                                                                        # save them to the 
                                                                                                                                        # desired location
print("Delta in range: ({},{})".format(torch.min(delta_is_cs.data).item(), torch.max(delta_is_cs.data).item()))                         # print just for 
                                                                                                                                        # checking the values

In [None]:
# Another way to print and save the perturbation
transition_full = transition.squeeze(0)[:4].detach().cpu().permute(1,2,0).numpy()                           
delta_full = delta_ics[:4].detach().cpu().permute(1,2,0).numpy()
plot_images((transition_full, 200*delta_full, transition_full+delta_full), n_rows=1, n_columns=3, grid=False, save=False, name='plots/Breakout/ics_1.pdf') # set save to True if you want to save the image
action_ori = model.predict_action(transition[0].unsqueeze(0))
action_delta = model.predict_action(delta_ics.unsqueeze(0))
action_perturbed = model.predict_action((transition[0]+delta_ics).unsqueeze(0))
print("\t    Action {}\t\t\t       Action {}\t\t\t\t   Action {}".format(action_ori.item(), action_delta.item(), action_perturbed.item()))

Here we declare the test procedure for the perturbation. The goal is to find perturbations, that will impact the agent to perform not worse than -50% compared to an unattacked agent. Otherwise, the attack will most likely be detected. For mor details, take a look at the 'Result' chapter of the thesis.

By lowering the pixel values of the perturbation to certain percentages, we approximate the calculated attack if their pixel values were restricted to stay in \[-epsilon \* intensity, epsilon \* intensity\]. For the ICS attack, the pixel values are not explicitly restricted, since the attack creates usually very tiny perturbations, but for the following attacks, the epsilon is initially set.

In the case of the ICS attack, the perturbation is calculated for each transition stored in the memory. Additionally, the original ('correct' -- although one does never know what the true correct action is in RL) action the agent predicted for it is stored in the memory and will be utilised for comparison.
Now the adversarial example will be generated with the above function. The pixel values of this calculated perturbation are the ones at 100% intensity. Now we let the agent predict an action for all perturbations at 100%, 75%, 50%, 25% and 5% and compare it to the original, stored action. If the predicted action equals the original one, the counter in the corresponding list is incremented. Later we calculate the 'agreement rate' -- how many actions were still equal to the original ones after the attack in percent. A low agreement rate on all actions except the one we want the agent to predict (in the above case 0
is good.

In [None]:
def test_perturbation(mode, perturbation=None, runs=10, target_action=None):

    if mode not in ['ics', 'ucs', 'cduap']:                                                     # choose the attack from ICS, UCS or CDUAP
        print("Please select one of these methods: 'ics', 'ucs', 'cduap'")
        return
    if mode == 'ics' and target_action == None:
        print("Choose an action to shift to with the perturbation!")
        print("Possible actions for the current game: ", range(n_actions))
        return
    if mode in ['ucs', 'cduap'] and perturbation==None:
        print("Test routine needs a perturbation to work on!")
        return
        


    still_correct_100 = [0] * n_actions                                                         # initialize lists to keep track of how many actions were
    still_correct_75 = [0] * n_actions                                                          # still predicted 'correctly'
    still_correct_50 = [0] * n_actions
    still_correct_25 = [0] * n_actions
    still_correct_5 = [0] * n_actions

    agreement_rate_100 = [0] * n_actions
    agreement_rate_75 = [0] * n_actions
    agreement_rate_50 = [0] * n_actions
    agreement_rate_25 = [0] * n_actions
    agreement_rate_5 = [0] * n_actions

    amount = [0] * n_actions

    for i in range(memory.count-4):                                                                                                                                       
        current_transition = torch.cuda.FloatTensor(memory._get_state(i+3)/255.)                # get the next transition from the memory
        correct = memory.actions[i]                                                             # and the originally stored action
        amount[correct] += 1                                                                    # save the quantity of these actions for later
        
        if mode == 'ics':
            perturbation = ics(model, current_transition.unsqueeze(0), target_action=target_action, lr=5e-4)    # calculate the ICS perturbation, for UCS 
                                                                                                # and CD-UAP, the perturbation is handed over to the 
                                                                                                # function in the beginning

        prediction_100 = model.predict_action((current_transition+perturbation).unsqueeze(0))           # predict actions for all intensities
        prediction_75 = model.predict_action((current_transition+(perturbation*0.75)).unsqueeze(0))
        prediction_50 = model.predict_action((current_transition+(perturbation*0.5)).unsqueeze(0))
        prediction_25 = model.predict_action((current_transition+(perturbation*0.25)).unsqueeze(0))
        prediction_5 = model.predict_action((current_transition+(perturbation*0.05)).unsqueeze(0))

        if prediction_100 == correct:                                                           # increase the counter, if the prediction quals the original 
                                                                                                # action
            still_correct_100[correct] += 1
        if prediction_75 == correct:
            still_correct_75[correct] += 1
        if prediction_50 == correct:
            still_correct_50[correct] += 1
        if prediction_25 == correct:
            still_correct_25[correct] += 1
        if prediction_5 == correct:
            still_correct_5[correct] += 1

    for i in range(n_actions):
        agreement_rate_100[i] = (still_correct_100[i]/amount[i])*100                            # calculate the agreement rate in percent
        agreement_rate_75[i] =  (still_correct_75[i]/amount[i])*100
        agreement_rate_50[i] =  (still_correct_50[i]/amount[i])*100
        agreement_rate_25[i] =  (still_correct_25[i]/amount[i])*100
        agreement_rate_5[i] =   (still_correct_5[i]/amount[i])*100

    print("Done!")
    
    return agreement_rate_100, agreement_rate_75, agreement_rate_50, agreement_rate_25, agreement_rate_5  # return the rates

In [None]:
# Here we calculate the agreement rates and print them for later use
agreement_rate_100, agreement_rate_75, agreement_rate_50, agreement_rate_25, agreement_rate_5 = test_perturbation('ics', target_action=0)
print(agreement_rate_100)
print(agreement_rate_75)
print(agreement_rate_50)
print(agreement_rate_25)
print(agreement_rate_5)

In [None]:
# You can also create a diagramm and save it
#plt.figure(figsize=(5, 3))
#plt.bar(env.unwrapped.get_action_meanings(), accuracy_drop_5)
#plt.axis([-1, n_actions, 0, 105])

#plt.savefig('./plots/Breakout/diagrams/ics/accuracy_drop_5.pdf', dpi=200)
#plt.show()

## Universal, class specific attack
The universal and class specific attack is calculated on the whole memory we filled earlier. Since every perturbation is calculated on the same memory during the tests, they can be compared to each other.

The main idea resembles the ICS attack. You can choose an action that the agent should always predict. Now we get a minibatch of random transitions. The perturbations pixel values are updated to minimise the loss between the agent's predictions and your chosen target action. Since the perturbation is now calculated to manipulate multiple inputs simultaneously, it might be visible in the final adversarial example. Therefore, the values are restricted to stay in \[-epsilon,epsilon\].

In [None]:
def ucs(model, env, memory, target_action, epochs=400, lr=3e-3, batch_size=32, epsilon=5e-3, verbose=True):
    target_action = torch.ones(batch_size).to(device) * target_action                           # create a tensor of size [batch_size], containing only
    target_action = target_action.long()                                                        # the target_action

    delta = torch.zeros((4,210,160), requires_grad=True, device=device)                         # initialize perturbation
    opt = torch.optim.Adam([delta], lr=lr)                                                      # and optimizer

    for i in range(epochs):                                                 
        states, _, _, _, _ = memory.get_minibatch()                                             # get a random sample of inputs
        q_vals = model.predict_q(torch.cuda.FloatTensor(states/255.) + delta)                   # predict the Q-values for the adversarial examples
        loss = F.cross_entropy(q_vals, target_action)                                           # calculate the loss

        if verbose:                                                                             # print some information if you want to
            if i % (epochs/10) == 0:
                print("Epoch: {}, Loss: {}, Predicted actions: {}".format(i, loss, torch.argmax(q_vals, dim=1).cpu().numpy()))

        opt.zero_grad() 
        loss.backward()                                                                         # calculate the gradient w.r.t. the loss
        opt.step()                                                                              # update the perturbation

        delta.data.clamp_(-epsilon, epsilon)                                                    # clip the pixel values to stay in certain range


    plot_results(states[0], delta.detach().cpu().numpy(), save=False, name='plots/Breakout/ucs_2_'+str(target_action[0].item())) # plot the images, save if needed
    

    print("Action {} changed to action {} with perturbation.".format(model.predict_action(torch.cuda.FloatTensor(states[0]/255.).unsqueeze(0)).item(), \
                                                                     model.predict_action((torch.cuda.FloatTensor(states[0]/255.)+delta).unsqueeze(0)).item()))



                 
    print("Delta in range: ({},{})".format(torch.min(delta.data).item(), torch.max(delta.data).item()))
    return states, delta

In [None]:
transitions, delta_ucs = ucs(model, env, memory, 0, epochs=100, epsilon=7e-3, lr=3e-4)
#with open('perturbations/Breakout/ucs_0','wb') as f: pickle.dump(delta_ucs.detach().cpu().numpy(), f)          # uncomment these lines if you want to save
#plt.imshow(200*delta_ucs.detach().cpu().permute(1,2,0).numpy())                                                # the perturbation and an image of it
#plt.axis('off')
#plt.savefig('perturbations/Breakout/ucs_0.png', dpi=200)

In [None]:
# Another way to print and save the perturbation
transition_full = transitions[0]/255.
delta_full = delta_ucs.detach().cpu().permute(1,2,0).numpy()
plot_images((np.transpose(transition_full, axes=(1,2,0)), 200*delta_full, np.transpose(transition_full, axes=(1,2,0))+delta_full), n_rows=1, n_columns=3, grid=False, save=False, name='plots/Breakout/ucs_1.pdf')                                            # set save to True if you want to save these images
action_ori = model.predict_action(torch.cuda.FloatTensor(transition_full).unsqueeze(0))
action_delta = model.predict_action(delta_ucs.unsqueeze(0))
action_perturbed = model.predict_action((torch.cuda.FloatTensor(transition_full)+delta_ucs).unsqueeze(0))
print("\t    Action {}\t\t\t       Action {}\t\t\t\t   Action {}".format(action_ori.item(), action_delta.item(), action_perturbed.item()))

In [None]:
# Here we calculate the agreement rates and print them for later use
agreement_rate_100, agreement_rate_75, agreement_rate_50, agreement_rate_25, agreement_rate_5 = test_perturbation('ucs', perturbation=delta_ucs)
print(agreement_rate_100)
print(agreement_rate_75)
print(agreement_rate_50)
print(agreement_rate_25)
print(agreement_rate_5)

In [None]:
# You can also create a diagramm and save it
#plt.figure(figsize=(5, 3))
#plt.bar(env.unwrapped.get_action_meanings(), accuracy_drop_5)
#plt.axis([-1, n_actions, 0, 105])
#plt.savefig('./plots/Breakout/diagrams/ucs/accuracy_drop_5.pdf', dpi=200)
#plt.show()

## CD-UAP for Atari games
The CD-UAP was originally introduced by [Zhang et al.](https://doi.org/10.1609/AAAI.V34I04.6154) and now modified and applied to RL and Atari games.

The idea is to lower the agent's confidence on predicting certain actions (target actions) while not creating too much impact on the others from the action space (not-targeted actions). To achieve this, the loss term needs to consists of two seperate loss functions. One that reduces the loss between the predictions for all adversarial examples of the corresponding not-targetd inputs; and one that shifts the predictions from a targeted class to another.

### Example
For Enduro, we set the targeted actions to be (1,7,8), so whenever the agent would choose action 1 for the original input, it chooses any other action for the adversarial example instead. On the other hand, if it chooses action 2 for the original input, the action 2 will be chosen for the final adversarial example as well.


First, we get two equally-sized batches of random sampled, trageted or not-targeted transitions. We then save the originial, stored actions of the not-targeted inputs. These are used to minimize the loss between the predicted Q-values and the 'correct' actions. Afterwards, the second loss term is calculated. For that the best and second best action are calculated for the originial input. Select the Q-value for the adversarial example at the index of the best action ($L_c$) and the Q-value at the index of the second best action ($L_i$). We want to shift the prediction towards the second best guess, away from the original action, which is why $L_{nt} = L_c - L_i$. Finally, values are updated on both loss terms $L_{t}$ and $L_{nt}$.


In [None]:
def cd_uap(model, env, memory, target_actions, epochs=100, lr=3e-3, batch_size=32, epsilon=5e-3, verbose=True):
    #target_action = torch.LongTensor(target_actions)

    delta = torch.zeros((4,210,160), requires_grad=True, device=device)
    opt = torch.optim.Adam([delta], lr=lr)

    for i in range(epochs):     
        frames_t = deque(maxlen=int(batch_size/2))                                          # initialize the containers (deques) for the targeted transitions,
        frames_nt = deque(maxlen=int(batch_size/2))                                         # not-targeted inputs,
        actions_nt = deque(maxlen=int(batch_size/2))                                        # and not-targeted, original action
    
        
        while (len(frames_t) < int(batch_size/2)) or (len(frames_nt) < int(batch_size/2)):  # fill both input containers
            states, actions, _, _, _ = memory.get_minibatch()
            states = torch.cuda.FloatTensor(states/255.)
            actions = torch.cuda.LongTensor(actions)

            for z in range(len(states)):
                if actions[z] in target_actions:
                    frames_t.append(states[z])
                else:
                    frames_nt.append(states[z])
                    actions_nt.append(actions[z])

                    
        q_vals_nt = model.predict_q(torch.stack(list(frames_nt) + delta).to(device))                # calculate the Q-values for the not-targeted adversarial examples
        loss_nt = F.cross_entropy(q_vals_nt, torch.stack(list(actions_nt)).to(device))              # calculate the loss for the not-targeted examples

        q_vals_t = model.predict_q(torch.stack(list(frames_t)).to(device))                          # calculate the Q-values for the originial targeted examples
        c, _i = torch.split(torch.topk(q_vals_t, 2)[1], (1,1), dim=1)                               # get the best and second best action from the Q-values
        L_c = model.predict_q(torch.stack(list(frames_t)).to(device) + delta).gather(1, c)          # calculate the Q-value at the index of the best action for the targeted 
                                                                                                    # adversarial examples
        L_i = model.predict_q(torch.stack(list(frames_t)).to(device) + delta).gather(1, _i)         # calculate the Q-value at the index of the best action for the 
                                                                                                    # not-targeted adversarial examples
        loss_t = L_c - L_i                                                                          # calculate the loss for the targeted examples
        for x in range(len(loss_t)):                                                                # set to 0 if loss gets negative
            if loss_t[x] < 0:
                loss_t[x] = 0.
        loss_t = torch.sum(loss_t)
    

        loss = (loss_t + loss_nt)                                                                   # add loss terms to single loss

        if verbose:                                                                                 # print some information during training
            if i % int(epochs/10) == 0:
                print("Epoch: {}, Loss: {}, Predicted actions: {}".format(i, loss, torch.argmax(q_vals_t, dim=1).cpu().numpy()))

        opt.zero_grad()
        loss.backward()                                                                             # calculate the gradient w.r.t. the loss
        opt.step()                                                                                  # update the values of the perturbations

        delta.data.clamp_(-epsilon, epsilon)                                                        # restrict the values of the perturbation


    plot_results(frames_t[0].detach().squeeze(0).cpu().numpy(), delta.detach().cpu().numpy(), save=False, name='plots/Breakout/cduap_2')# plot results and set save if necessary
    #with open('perturbations/Breakout/cduap','wb') as f: pickle.dump(delta.detach().cpu().numpy(), f)  # uncomment these lines if you want to save the perturbation and
    #plt.imshow(200*delta.detach().cpu().permute(1,2,0).numpy())                                        # an image of it
    #plt.axis('off')
    #plt.savefig('perturbations/Breakout/cduap.png', dpi=200)

    print("Action {} changed to action {} with perturbation.".format(model.predict_action(frames_t[0].unsqueeze(0)).item(), \
                                                                     model.predict_action((frames_t[0]+delta).unsqueeze(0)).item()))

    print("Delta in range: ({},{})".format(torch.min(delta.data).item(), torch.max(delta.data).item()))
    return frames_t, delta                                                                          # return the perturbation and the batch of targeted inputs (for plotting)

In [None]:
transitions, delta_cd_uap = cd_uap(model, env, memory, (2,3), epochs=400, epsilon=7e-3, lr=3e-4)

In [None]:
# Another way to print and save the perturbation
transition_full = transitions[0].detach().cpu().permute(1,2,0).numpy()
delta_full = delta_cd_uap.detach().cpu().permute(1,2,0).numpy()
plot_images((transition_full, 200*delta_full, transition_full+delta_full), n_rows=1, n_columns=3, grid=False, save=True, name='plots/Breakout/cduap_1.pdf')
action_ori = model.predict_action(transitions[0].unsqueeze(0))
action_delta = model.predict_action(delta_cd_uap.unsqueeze(0))
action_perturbed = model.predict_action((transitions[0]+delta_cd_uap).unsqueeze(0))
print("\t    Action {}\t\t\t       Action {}\t\t\t\t   Action {}".format(action_ori.item(), action_delta.item(), action_perturbed.item()))

In [None]:
# Here we calculate the agreement rates and print them for later use
agreement_rate_100, agreement_rate_75, agreement_rate_50, agreement_rate_25, agreement_rate_5 = test_perturbation('cduap', perturbation=delta_cd_uap)
print(agreement_rate_100)
print(agreement_rate_75)
print(agreement_rate_50)
print(agreement_rate_25)
print(agreement_rate_5)

In [None]:
# You can also create a diagramm and save it
#plt.figure(figsize=(5, 3))
#plt.bar(env.unwrapped.get_action_meanings(), accuracy_drop_5)
#plt.axis([-1, n_actions, 0, 105])


#plt.savefig('./plots/Breakout/diagrams/cduap/accuracy_drop_5.pdf', dpi=200)
#plt.show()