In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from collections import namedtuple, deque
import random
random.seed(45)
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns

from google.colab import drive
drive.mount('/content/drive')

import sys
sys.path.insert(0,"/content/drive/MyDrive/ANN")

from tic_env import TictactoeEnv, OptimalPlayer

# if gpu is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
save_dir = "/content/drive/MyDrive/ANN/"

# Q-learning

In [None]:
env = TictactoeEnv()

In [None]:
class QlearningPlayer:
    '''
    Description:
        A class to implement a player in Tic-tac-toe based on the Q-learning algorithm
        combined with epsilon-greedy policy.
    
    Parameters:
        alpha: float, in [0, 1]. This is the learning rate that represents the 
               step length taken to update the estimation of Q. 
        epsilon: float, in [0, 1]. This is a value between 0-1 that indicates the
                probability of making a random action instead of the action that
                has the highest Q-value at any given time.
        discount_factor: float, in [0, 1]. This is a value that discounts future rewards
                         because they are less valuable than current rewards.
    '''
    def __init__(self, player='X',epsilon=0.05):
        self.discount_factor = 0.99
        self.alpha = 0.05
        self.epsilon = epsilon # changed 0.05 per epsilon that we pass it as parameter when creating instance
        self.player = player 
        self.player2value = {'X': 1, 'O': -1}
        #dictionary state -> Q-value
        self.Q_state_action = {}
        
    def set_player(self, player = 'O', j=-1):
        self.player = player
        if j != -1:
            self.player = 'O' if j % 2 == 0 else 'X' #the opposite is done in OptimalPlayer

    def empty(self, grid):
        '''return all empty positions'''
        avail = []
        for i in range(9):
            pos = (int(i/3), i % 3)
            if grid[pos] == 0:
                avail.append(pos)
        return avail
    
    def randomMove(self, grid):
        """ Choose a random move from the available options. """
        avail = self.empty(grid)
        return avail[random.randint(0, len(avail)-1)]
    
    def highestQMove(self, grid):
        """
        Choose the action which has the highest Q-value and is an available option.
        Also, return the corresponding Q-value.
        """
        
        avail = self.empty(grid)
        max_Q_value = -np.Inf
        highest_Q_move = avail[random.randint(0, len(avail)-1)]
        
        for action in avail:
            #obtain grid if this specific action is played
            next_grid = grid.copy()
            next_grid[action] = self.player2value[self.player]
            hash_next_grid = self.hashGrid(next_grid) 
            
            #obtain Q_value for this state
            Q_value = self.Q_state_action.get(hash_next_grid, 0) #default is 0
            
            #update max Q_value
            if Q_value >= max_Q_value:
                max_Q_value = Q_value
                highest_Q_move = action
                
        return max_Q_value, highest_Q_move
    
    def act(self, grid):
        """
        A touple is returned that represents (row, col) or 
        an int corresponding to the 9 possible places on the grid.
        """
        # choose action according to epsilon-greedy policy
        # i.e. whether to move in random or not
        if random.random() < self.epsilon: action = self.randomMove(grid)
        else: _, action = self.highestQMove(grid)
       
        return action
    
    def hashGrid(self, grid):
        """Hash the grid in order to be able to use it as a dictionary key"""
        return str(grid.reshape(3 * 3))
    
    def feedReward(self, reward, grid):
        """
        Update Q-values when the game end, i.e. a reward is obtained. 
        """
        hash_grid = self.hashGrid(grid)
        Q_final_grid_move = self.Q_state_action.get(hash_grid, 0)
        
        #when the game has ended we have a reward and no next state
        self.Q_state_action[hash_grid] = Q_final_grid_move + self.alpha * (reward - Q_final_grid_move) 
        
    def tdUpdate(self, grid_before, grid_after):
        """
        Update Q-values after an action that doesn't end the game, i.e. no reward is obtained.
        """
        #grid after is the grid after this player's move and after the opponent has played
        
        #when the game hasn't ended, we don't have a reward, but there is a next state
        hash_grid_before = self.hashGrid(grid_before)
        Q_grid_move = self.Q_state_action.get(hash_grid_before, 0)
        max_Q_value, _ = self.highestQMove(grid_after)
        
        self.Q_state_action[hash_grid_before] = Q_grid_move + self.alpha * (self.discount_factor * max_Q_value - Q_grid_move)     

# Question 1

In [None]:
player_opt = OptimalPlayer(epsilon=0.5, player = 'X')
player_q_learn = QlearningPlayer(player = 'O')

nr_games = 20000
reward_every_250_games = 0
average_rewards = {}


for i in range(1, nr_games + 1):
    
    if i % 250 == 0: 
        average_rewards[i] = reward_every_250_games / 250
        reward_every_250_games = 0
    
    env.reset()
    
    #switch the 1st player after every game
    player_opt.set_player(j=i)
    player_q_learn.set_player(j=i)
    
    grid, _, __ = env.observe()
    
    for j in range(9):
        
        if env.current_player == player_opt.player:
            move = player_opt.act(grid)
            grid_after_player_opt, end, winner = env.step(move, print_grid=False)
            #if j=0 here, then player_opt starts the game and we don't want to update
            #the empty grid state
            if not end and j != 0: player_q_learn.tdUpdate(grid, grid_after_player_opt)
        
        else:
            move = player_q_learn.act(grid)
            grid_after_player_q_learn, end, winner = env.step(move, print_grid=False)
        
        if end:           
            reward = env.reward(player = player_q_learn.player)
            reward_every_250_games += reward
            player_q_learn.feedReward(reward, grid_after_player_q_learn)
            env.reset()
            break
            
        else:
            #current_player has changed after step()
            if env.current_player == player_opt.player: grid = grid_after_player_q_learn
            else: grid = grid_after_player_opt

In [None]:
plt.plot(list(average_rewards.keys()), list(average_rewards.values()))
plt.title("Q-learning(0.05) vs Opt(0.5)")
plt.xlabel("Number of games played")
plt.ylabel("Average reward for every 250 games")
plt.ylim((-0.25, 1))
plt.grid()
plt.savefig(save_dir + "Q-learning(0.05) vs Opt(0.5).jpg", dpi=400)
plt.show()

# Question 2

In [None]:
e_max, e_min = 0.8, 0.1
n_expl = np.linspace(1, 20000, 5, dtype=int)

average_rewards_array = []

for n in n_expl:

    player_opt = OptimalPlayer(epsilon=0.5, player = 'X')
    player_q_learn = QlearningPlayer(player = 'O')

    nr_games = 20000
    reward_every_250_games = 0
    average_rewards = {}

    for i in range(1, nr_games + 1):
        
        if i%250 == 0: 
            average_rewards[i] = reward_every_250_games / 250
            reward_every_250_games = 0
        
        env.reset()
        
        # update epsilon
        player_q_learn.epsilon = np.max([e_min, e_max * (1 - i / n)])
        
        #switch the 1st player after every game
        player_opt.set_player(j=i)
        player_q_learn.set_player(j=i)
        
        grid, _, __ = env.observe()
            
        for j in range(9):
            
            if env.current_player == player_opt.player:
                move = player_opt.act(grid)
                grid_after_player_opt, end, winner = env.step(move, print_grid=False)
                
                if not end and j!=0: player_q_learn.tdUpdate(grid, grid_after_player_opt)
            
            else:
                move = player_q_learn.act(grid)
                grid_after_player_q_learn, end, winner = env.step(move, print_grid=False)

            
            if end:           
                reward = env.reward(player = player_q_learn.player)
                reward_every_250_games += reward
                player_q_learn.feedReward(reward, grid_after_player_q_learn)

                env.reset()
                
                break
                
            else:
                #current_player has changed after step
                if env.current_player == player_opt.player: grid = grid_after_player_q_learn
                else: grid = grid_after_player_opt

    average_rewards_array.append(average_rewards)              

In [None]:
fig = plt.figure(1, figsize=(15, 10))
for i,_ in enumerate(n_expl):
    plt.plot(list(average_rewards_array[i].keys()), list(average_rewards_array[i].values()))
plt.title(f"Average reward of Q-learning agent with decreasing exploration vs Opt(0.5) for every 250 games")
plt.legend(n_expl,title='${n^*}$ values')
plt.xlabel("Number of games")
plt.ylabel("Average reward")
plt.grid()
plt.ylim((0, 1))
plt.savefig(save_dir + "decreasing exploration.jpg", dpi=400)
plt.show()

# Question 3

In [None]:
def one_game(player_q_learn, second_player, i):

    env.reset()
    second_player.set_player(j=i); player_q_learn.set_player(j=i)
    grid, _, _ =  env.observe()

    for j in range(9):
        if env.current_player == second_player.player:
            move = second_player.act(grid)
        else:
            move = player_q_learn.act(grid)

        grid, end, _ = env.step(move, print_grid=False)

        if end:
            reward = env.reward(player = player_q_learn.player)
            break

    return reward

In [None]:
def computeTestM(player_q_learn):
    N = 500
    opt, ran = 0, 0
    player_opt = OptimalPlayer()
    player_opt.epsilon, player_q_learn.epsilon = 0, 0
    
    # First we compute Mopt
    for i in range(N):
        opt += one_game(player_q_learn,player_opt, i)
        
    # Then we compute Mrand
    player_opt.epsilon = 1
    for i in range(N):
        ran += one_game(player_q_learn,player_opt, i)    
    
    return opt / N, ran / N

In [None]:
e_max, e_min = 0.8, 0.1
n_expl = np.linspace(1, 20000, 5, dtype=int)

average_mopt_array = []
average_mran_array = []

for n in n_expl:
  player_opt = OptimalPlayer(epsilon=0.5, player = 'X')
  player_q_learn = QlearningPlayer(player = 'O')

  nr_games = 20000

  average_mopt = {}
  average_mran = {}

  for i in range(1, nr_games + 1):
      
      if i%250 == 0: 
          average_mopt[i], average_mran[i] = computeTestM(player_q_learn)
      
      env.reset()
      player_q_learn.epsilon = np.max([e_min, e_max*(1 - i / n)])
      player_opt.set_player(j=i)
      player_q_learn.set_player(j=i)
      grid, _, _ = env.observe()

      for j in range(9):
              if env.current_player == player_opt.player:
                  move = player_opt.act(grid)
                  grid_after_player_opt, end, winner = env.step(move, print_grid=False)
                  if not end and j != 0: player_q_learn.tdUpdate(grid, grid_after_player_opt)
              else:
                  move = player_q_learn.act(grid)
                  grid_after_player_q_learn, end, winner = env.step(move, print_grid=False)
              if end:           
                  reward = env.reward(player = player_q_learn.player)
                  reward_every_250_games += reward
                  player_q_learn.feedReward(reward, grid_after_player_q_learn)
                  break
              else:
                  if env.current_player == player_opt.player: grid = grid_after_player_q_learn
                  else: grid = grid_after_player_opt
  average_mopt_array.append(average_mopt)
  average_mran_array.append(average_mran)

In [None]:
fig = plt.figure(1, figsize=(15, 10))
colors = ['orange', 'purple', 'green','red', 'pink']
for i,n in enumerate(n_expl):
    plt.plot(list(average_mopt_array[i].keys()), list(average_mopt_array[i].values()), label=n, linestyle='-', color=colors[i])
    plt.plot(list(average_mran_array[i].keys()), list(average_mran_array[i].values()), label=n, linestyle='--', color=colors[i])
plt.title(r"$M_{rand}$ (--) and $M_{opt}$ (-) for different values of $n^*$")
plt.legend(title='${n^*}$ values')
plt.xlabel("Number of games")
plt.ylabel('Mean reward')
plt.grid()
plt.savefig(save_dir + "M_opt and M_rand n*.jpg", dpi=400)
plt.show()

# Question 4

In [None]:
nr_games = 20000
n = 5000
e_max, e_min = 0.8, 0.1
eps_opt = np.linspace(0,1,5) 
average_mopt_array = []
average_mran_array = []

for e_opt in eps_opt:

    player_opt = OptimalPlayer(epsilon=e_opt, player='X')
    player_q_learn = QlearningPlayer(player='O')

    average_mopt = {}
    average_mran = {}

    for i in range(1, nr_games + 1):
        
        if i%250 == 0: 
            average_mopt[i], average_mran[i] = computeTestM(player_q_learn)
        
        env.reset()

        player_q_learn.epsilon = np.max([e_min, e_max * (1 - i / n)])

        player_opt.set_player(j=i)
        player_q_learn.set_player(j=i)

        grid, _, _ = env.observe()

        for j in range(9):
                if env.current_player == player_opt.player:
                    move = player_opt.act(grid)
                    grid_after_player_opt, end, winner = env.step(move, print_grid=False)
                    if not end: player_q_learn.tdUpdate(grid, grid_after_player_opt)
                else:
                    move = player_q_learn.act(grid)
                    grid_after_player_q_learn, end, winner = env.step(move, print_grid=False)
                if end:           
                    reward = env.reward(player=player_q_learn.player)
                    reward_every_250_games += reward
                    player_q_learn.feedReward(reward, grid_after_player_q_learn)
                    break
                else:
                    if env.current_player == player_opt.player: grid = grid_after_player_q_learn
                    else: grid = grid_after_player_opt

    average_mopt_array.append(average_mopt)
    average_mran_array.append(average_mran)

In [None]:
fig = plt.figure(1, figsize=(15, 10))
colors = ['orange', 'purple', 'green','red', 'pink']
for i,n in enumerate(eps_opt):
    plt.plot(list(average_mopt_array[i].keys()), list(average_mopt_array[i].values()), label=n, linestyle='-', color=colors[i])
    plt.plot(list(average_mran_array[i].keys()), list(average_mran_array[i].values()), label=n, linestyle='--', color=colors[i])
plt.title(r"$M_{rand}$ (--) and $M_{opt}$ (-) for different values of $\epsilon_{opt}$ with $n^*=5000$")
plt.legend(title='$\epsilon_{opt}$ values')
plt.xlabel("Number of games")
plt.ylabel('Mean reward')
plt.grid()
plt.savefig(save_dir + "M_opt and M_rand epsilon.jpg", dpi=400)
plt.show()

#Question 7

In [None]:
eps_opt = [0.2, 0.4, 0.6, 0.8]

nr_games = 20000
average_mopt_array = []
average_mran_array = []

for e_opt in eps_opt:

    player_q1 = QlearningPlayer(epsilon=e_opt)
    player_q2 = QlearningPlayer(epsilon=e_opt)
    player_q1.Q_state_action = player_q2.Q_state_action

    average_mopt = {}
    average_mran = {}

    for i in range(1, nr_games + 1):
        
        if i%250 == 0: 
          average_mopt[i], average_mran[i] = computeTestM(player_q1)
        
        env.reset()
        player_q2.set_player(j=i)
        player_q1.set_player(j=i+1)

        grid, _, _ = env.observe()

        for j in range(9):
                if env.current_player == player_q2.player:
                    move = player_q2.act(grid)
                    grid_after_player_q2, end, winner = env.step(move, print_grid=False)
                    if not end and j != 0: player_q1.tdUpdate(grid, grid_after_player_q2)
                    
                else:
                    move = player_q1.act(grid)
                    grid_after_player_q1, end, winner = env.step(move, print_grid=False)
                    if not end and j != 0: player_q2.tdUpdate(grid, grid_after_player_q1)
                if end: 
                            
                    reward_q1 = env.reward(player = player_q1.player)
                    player_q1.feedReward(reward_q1, grid_after_player_q1)

                    reward_q2 = env.reward(player = player_q2.player)
                    player_q2.feedReward(reward_q2, grid_after_player_q2) 
                    break
                else:
                    if env.current_player == player_q2.player: grid = grid_after_player_q1
                    else: grid = grid_after_player_q2
    average_mopt_array.append(average_mopt)
    average_mran_array.append(average_mran)

In [None]:
fig = plt.figure(1, figsize=(15, 10))
colors = ['orange', 'purple', 'green','red', 'pink']
for i,n in enumerate(eps_opt):
    plt.plot(list(average_mopt_array[i].keys()), list(average_mopt_array[i].values()), label=n, linestyle='-', color=colors[i])
    plt.plot(list(average_mran_array[i].keys()), list(average_mran_array[i].values()), label=n, linestyle='--', color=colors[i])
plt.title(r"$M_{rand}$ (--) and $M_{opt}$ (-) for different values of $\epsilon$")
plt.legend(title='$\epsilon$ values')
plt.xlabel("Number of games")
plt.ylabel('Mean reward')
plt.ylim((-1, 1))
plt.grid()
plt.savefig(save_dir + "Learning by self-practice M_opt and M_rand fixed epsilon.jpg", dpi=400)
plt.show()

# Question 8

In [None]:
nr_games = 20000
n_array = np.linspace(1,20000,5,dtype=int)

average_mopt_array_q8 = []
average_mran_array_q8 = []

for n in n_array:

    player_q1 = QlearningPlayer(player = 'O')
    player_q2 = QlearningPlayer(player = 'X')
    
    # both players use the same set of Q-values and update the same set of Q-values. 
    Q_state_action = {}
    player_q1.Q_state_action = Q_state_action
    player_q2.Q_state_action = Q_state_action

    average_mopt = {}
    average_mran = {}

    for i in range(1, nr_games + 1):
        
        if i%250 == 0: 
            average_mopt[i], average_mran[i] = computeTestM(player_q1)
        
        env.reset()

        # update epsilon
        player_q1.epsilon = np.max([e_min, e_max*(1-i/n)])
        player_q2.epsilon = np.max([e_min, e_max*(1-i/n)])
        
        #switch the 1st player after every game

        player_q2.set_player(j=i)
        player_q1.set_player(j=i+1)

        grid, _, _ = env.observe()

        for j in range(9):
                if env.current_player == player_q2.player:
                    move = player_q2.act(grid)
                    grid_after_player_q2, end, winner = env.step(move, print_grid=False)
                    if not end and j != 0: player_q1.tdUpdate(grid, grid_after_player_q2)
                else:
                    move = player_q1.act(grid)
                    grid_after_player_q1, end, winner = env.step(move, print_grid=False)
                    if not end and j != 0: player_q2.tdUpdate(grid, grid_after_player_q1)
                if end:           
                    reward_q1 = env.reward(player = player_q1.player)
                    player_q1.feedReward(reward_q1, grid_after_player_q1)

                    reward_q2 = env.reward(player = player_q2.player)
                    player_q2.feedReward(reward_q2, grid_after_player_q2)

                    break
                else:
                    #current_player has changed after step
                    if env.current_player == player_q2.player: grid = grid_after_player_q1
                    else: grid = grid_after_player_q2

    average_mopt_array_q8.append(average_mopt)
    average_mran_array_q8.append(average_mran)

In [None]:
fig = plt.figure(1, figsize=(15, 10))
colors = ['orange', 'purple', 'green','red', 'pink']
for i,n in enumerate(n_array):
    plt.plot(list(average_mopt_array_q8[i].keys()), list(average_mopt_array_q8[i].values()), label=n, linestyle='-', color=colors[i])
    plt.plot(list(average_mran_array_q8[i].keys()), list(average_mran_array_q8[i].values()), label=n, linestyle='--', color=colors[i])
plt.title(r"$M_{rand}$ (--) and $M_{opt}$ (-) for different values of $n^*$")
plt.legend(title='$n^*$ values')
plt.xlabel("Number of games")
plt.ylabel('Mean reward')
plt.ylim((-1, 1))
plt.grid()
plt.savefig(os.getcwd() + "\\plots_and_outputs\\plot_q8_2.jpg", dpi=400)
plt.show()

# Question 10

In [None]:
grids = np.array([[[1.,0.,0.],[0.,-1.,0.],[0.,0.,0.]],
                [[-1.,-1.,0.],[1.,0.,0.],[0.,1.,0.]],
               [[-1.,0.,0.],[0.,-1.,0.],[1.,1.,0.]]])

In [None]:
positions = np.array([[(i,j) for j in range(3)] for i in range(3)])
positions = positions.reshape(-1,2)
positions

In [None]:
fig, ax = plt.subplots(1,3, figsize=(21,6))
labels= np.array([[['X','',''],['','O',''],['','','',]],
        [['O','O',''],['X','',''],['','X','']],
        [['O','',''],['','O',''],['X','X','']]])

for i,g in enumerate(grids):
    aval = player_q1.empty(g)
    qvalues = np.zeros((3,3))
    for pos in aval:
        next_grid = g.copy()
        next_grid[pos] = 1
        hash_next_grid = player_q1.hashGrid(next_grid)
        qvalues[pos] = player_q1.Q_state_action.get(hash_next_grid,0)
    print(qvalues)
    sns.heatmap(qvalues,ax=ax[i], cmap="Blues",  annot=labels[i], annot_kws={'fontsize': 25}, fmt='s')
    ax[i].axis('off') 

plt.suptitle('Heatmap of Q-Values (player X) in 3 different states')
plt.savefig(save_dir + "q10.jpg", dpi=400)
plt.show()

# Deep Q-Learning

- `Transition` - a named tuple representing a single transition in our environment. It essentially maps (state, action) pairs to their (next_state, reward) result, with the state being the screen difference image as described later on.

- `ReplayMemory` - a cyclic buffer of bounded size that holds the transitions observed recently. It also implements a `.sample()` method for selecting a random batch of transitions for training.

In [None]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([],maxlen=capacity)

    def push(self, state, action, next_state, reward):
        """Save a transition"""
        state = torch.tensor(state, device=device).unsqueeze(0)
        action = torch.tensor([action], device=device)
        reward = torch.tensor([reward], device=device)
        self.memory.append(Transition(state, action, next_state, reward))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [None]:
class DQN(nn.Module):

    def __init__(self):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(2*3*3, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 9)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns Q-value of the corresponding action at state x_t
    def forward(self, x_t):
      x_t = x_t.to(device).type(torch.cuda.FloatTensor)
      x_t = F.relu(self.fc1(x_t.view(-1, 2*3*3)))
      x_t = F.relu(self.fc2(x_t))
      return self.fc3(x_t) 

In [None]:
class DeepQlearningPlayer:
    '''
    Description:
        A class to implement a player in Tic-tac-toe based on the Deep-Q-learning algorithm
        combined with epsilon-greedy policy.
    '''
    def __init__(self, player='X',epsilon=0.05):
        self.discount_factor = 0.99
        self.alpha = 5e-4
        self.capacity = 10000
        self.gamma = 0.999
        self.epsilon = epsilon
        self.player = player 
        self.batch_size = 64

        self.policy_net = DQN().to(device)
        self.target_net = DQN().to(device)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()

        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=self.alpha)
        self.memory = ReplayMemory(self.capacity)

        self.loss_every_250_games = 0


    def set_player(self, player = 'O', j=-1):
        self.player = player
        if j != -1:
            self.player = 'O' if j % 2 == 0 else 'X' #the opposite is done in OptimalPlayer


    def randomMove(self, x_t):
        """ Choose a random move from the available ones"""
        avail = []
        for i in range(9):
            pos = (int(i/3), i % 3)
            if x_t[0][pos] == 0 and x_t[1][pos] == 0:
                avail.append(i)
        
        move = random.randint(0, len(avail) - 1)
        return move


    def highestQMove(self, x_t):
        """
        Choose the action which has the highest Q-value predicted by the policy net.
        """
        x_t = torch.tensor(x_t, dtype=torch.float64)
        
        with torch.no_grad():
              # t.max(1) will return largest column value of each row.
              # second column on max result is index of where max element was
              # found, so we pick action with the larger expected reward.
              move = self.policy_net(x_t).max(1)[1].item()

        return move
    

    def act(self, x_t):
        """
        A int between 0 and 8 is returned that represents the move on the grid.
        """

        # choose action according to epsilon-greedy policy
        # i.e. whether to move in random or not
        if random.random() < self.epsilon: action = self.randomMove(x_t)
        else: action = self.highestQMove(x_t)
        
        return action

    
    def optimize_model(self, add_to_total_loss):

      if len(self.memory) < self.batch_size:
          return


      #sample random mini-batch of transitions from the replay buffer  
      transitions = self.memory.sample(self.batch_size)

      # Transpose the batch. This converts batch-array of Transitions
      # to Transition of batch-arrays.
      batch = Transition(*zip(*transitions))

      # Compute a mask of non-final states and concatenate the batch elements
      # (a final state would've been the one after which simulation ended)
      non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                            batch.next_state)), device=device, dtype=torch.bool)
      
      non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
      
      state_batch = torch.cat(batch.state)
      action_batch = torch.cat(batch.action)
      reward_batch = torch.cat(batch.reward)


      # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
      # columns of actions taken. These are the actions which would've been taken
      # for each batch state according to policy_net
      state_action_values = self.policy_net(state_batch).gather(1, action_batch.view(-1, 1))

      # Compute V(s_{t+1}) for all next states.
      # Expected values of actions for non_final_next_states are computed based
      # on the "older" target_net; selecting their best reward with max(1)[0].
      # This is merged based on the mask, such that we'll have either the expected
      # state value or 0 in case the state was final.
      next_state_values = torch.zeros(self.batch_size, device=device)
      next_state_values[non_final_mask] = self.target_net(non_final_next_states).max(1)[0].detach()
      # Compute the expected Q values
      expected_state_action_values = (next_state_values * self.gamma) + reward_batch

      # Compute Huber loss
      criterion = nn.SmoothL1Loss()
      loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

      if add_to_total_loss: self.loss_every_250_games += loss

      # Optimize the model
      self.optimizer.zero_grad()
      loss.backward()
      for param in self.policy_net.parameters():
          param.grad.data.clamp_(-1, 1)
      self.optimizer.step()

    def optimize_latest_transition(self, add_to_total_loss, latest_transition):
      """Train but without the replay buffer and with a batch size of 1.
        At every step, update the network by using only the latest transition."""
      
      if latest_transition.next_state != None:
        next_state_value = self.target_net(latest_transition.next_state).max(1)[0].detach().item()
      else: next_state_value = torch.zeros(1, device=device)
   
      action_tensor = torch.tensor([latest_transition.action], device=device)
      state_action_value = self.policy_net(torch.tensor(latest_transition.state, device=device)).gather(1, action_tensor.view(-1, 1))[0]
      
      expected_state_action_value = (next_state_value * self.gamma) + torch.tensor([latest_transition.reward], device=device)

      # Compute Huber loss
      criterion = nn.SmoothL1Loss()
      loss = criterion(state_action_value, expected_state_action_value)

      if add_to_total_loss: self.loss_every_250_games += loss

      # Optimize the model
      self.optimizer.zero_grad()
      loss.backward()
      for param in self.policy_net.parameters():
          param.grad.data.clamp_(-1, 1)
      self.optimizer.step()

In [None]:
env = TictactoeEnv()

# Question 11

In [None]:
eps = 0.1

player_opt = OptimalPlayer(epsilon=0.5, player = 'X')
player_deep_q_learn = DeepQlearningPlayer(player = 'O',epsilon=eps)

nr_games = 20000
reward_every_250_games = 0

average_rewards = {}
average_loss = {}
update_target_network = 500
end = False

for i in range(1, nr_games + 1):
  # Update the target network, copying all weights and biases in DQN
  if i % update_target_network == 0:
    player_deep_q_learn.target_net.load_state_dict(player_deep_q_learn.policy_net.state_dict())

  if i % 250 == 0: 
      average_rewards[i] = reward_every_250_games / 250
      average_loss[i] = player_deep_q_learn.loss_every_250_games / 250
      reward_every_250_games = 0
      player_deep_q_learn.loss_every_250_games = 0
  
  env.reset()
  #switch the 1st player after every game
  player_opt.set_player(j=i)
  player_deep_q_learn.set_player(j=i)
  
  grid, _, __ = env.observe()
  x_t = np.stack([grid, grid])
  
  for j in range(9):

    if env.current_player == player_opt.player:
      move = player_opt.act(grid)
      x_t[1][move] = 1 #update state

      grid_after_player_opt, end, winner = env.step(move, print_grid=False)
      #if j=0 here, then player_opt starts the game, so we don't have a current_state yet
      if not end and j != 0:
        next_state = torch.tensor(x_t.copy(), device=device).unsqueeze(0)
        reward = 0
        player_deep_q_learn.memory.push(current_state, move_deep, next_state, reward)
        player_deep_q_learn.optimize_model(add_to_total_loss=False) #optimize model but game not ended, so don't add to total loss
      if end: reward = env.reward(player=player_deep_q_learn.player)
    
    else:
      current_state = x_t.copy() 
      move_deep = player_deep_q_learn.act(current_state)
      move_deep_pos = (int(move_deep/3), move_deep % 3)

      if current_state[0][move_deep_pos] == 1 or current_state[1][move_deep_pos] == 1: #unavailable position
        reward = -1
        end = True
      else:
        x_t[0][move_deep_pos] = 1
        grid_after_player_deep_q_learn, end, winner = env.step(move_deep_pos, print_grid=False)
        if end: reward = env.reward(player=player_deep_q_learn.player)
        
    if end:
      
      next_state = None
      reward_every_250_games += reward
      player_deep_q_learn.memory.push(current_state, move_deep, next_state, reward)
      player_deep_q_learn.optimize_model(add_to_total_loss=True)#game ended, so add to total loss
      env.reset()
      break

    else:
        #current_player has changed after step
        if env.current_player == player_opt.player: grid = grid_after_player_deep_q_learn
        else: grid = grid_after_player_opt      

In [None]:
fig, axs = plt.subplots(2, figsize=(10, 7))
fig.suptitle("DQN agent with $\epsilon=0.1$ vs Opt(0.5)")
axs[0].plot(list(average_rewards.keys()), list(average_rewards.values()))
axs[0].set_xlabel("Number of games played")
axs[0].set_ylabel("Average reward")
axs[0].set_ylim((-1, 1))
axs[0].grid()
axs[1].plot(list(average_loss.keys()), [x.item() for x in list(average_loss.values())])
axs[1].set_xlabel("Number of games played")
axs[1].set_ylabel("Average loss ")
axs[1].grid()
plt.savefig(save_dir + "DQN vs Opt(0.5).jpg", dpi=400)
plt.show()

#Question 12


In [None]:
eps = 0.1

player_opt = OptimalPlayer(epsilon=0.5, player = 'X')
player_deep_q_learn = DeepQlearningPlayer(player = 'O',epsilon=eps)

nr_games = 20000
reward_every_250_games = 0

average_rewards = {}
average_loss = {}
update_target_network = 500
end = False

for i in range(1, nr_games + 1):
  # Update the target network, copying all weights and biases in DQN
  if i % update_target_network == 0:
    player_deep_q_learn.target_net.load_state_dict(player_deep_q_learn.policy_net.state_dict())

  if i % 250 == 0: 
      average_rewards[i] = reward_every_250_games / 250
      average_loss[i] = player_deep_q_learn.loss_every_250_games / 250
      reward_every_250_games = 0
      player_deep_q_learn.loss_every_250_games = 0
  
  env.reset()

  #switch the 1st player after every game
  player_opt.set_player(j=i)
  player_deep_q_learn.set_player(j=i)
  
  grid, _, __ = env.observe()
  x_t = np.stack([grid, grid])
  
  for j in range(9):

    if env.current_player == player_opt.player:
      move = player_opt.act(grid)
      x_t[1][move] = 1 
      grid_after_player_opt, end, winner = env.step(move, print_grid=False)
      if not end and j != 0:
        next_state = torch.tensor(x_t.copy(), device=device).unsqueeze(0)
        reward = 0
        transition = Transition(current_state, move_deep, next_state, reward)
        player_deep_q_learn.optimize_latest_transition(False, transition)
      if end: reward = env.reward(player=player_deep_q_learn.player)
    
    else:
      current_state = x_t.copy() 
      move_deep = player_deep_q_learn.act(current_state)
      move_deep_pos = (int(move_deep/3), move_deep % 3)

      if current_state[0][move_deep_pos] == 1 or current_state[1][move_deep_pos] == 1: #unavailable position
        reward = -1
        end = True
      else:
        x_t[0][move_deep_pos] = 1
        grid_after_player_deep_q_learn, end, winner = env.step(move_deep_pos, print_grid=False)
        if end: reward = env.reward(player=player_deep_q_learn.player)
        
    if end:
      next_state = None
      reward_every_250_games += reward
      transition = Transition(current_state, move_deep, next_state, reward)
      player_deep_q_learn.optimize_latest_transition(True, transition)
      env.reset()
      break

    else:
        #current_player has changed after step
        if env.current_player == player_opt.player: grid = grid_after_player_deep_q_learn
        else: grid = grid_after_player_opt      

In [None]:
fig, axs = plt.subplots(2, figsize=(10, 7))
fig.suptitle("DQN agent with $\epsilon=0.1$, no replay buffer vs Opt(0.5)")
axs[0].plot(list(average_rewards.keys()), list(average_rewards.values()))
axs[0].set_xlabel("Number of games played")
axs[0].set_ylabel("Average reward")
axs[0].set_ylim((-1, 1))
axs[0].grid()
axs[1].plot(list(average_loss.keys()), [x.item() for x in list(average_loss.values())])
axs[1].set_xlabel("Number of games played")
axs[1].set_ylabel("Average loss ")
axs[1].grid()
plt.savefig(save_dir + "DQN no buffer vs Opt(0.5).jpg", dpi=400)
plt.show()

# Question 13

In [None]:
def one_game(player_deep_q_learn, player_opt, i):

    env.reset()
    player_opt.set_player(j=i); player_deep_q_learn.set_player(j=i)
    grid, _, _ =  env.observe()
    x_t = np.stack([grid, grid])
    reward = 0

    for j in range(9):
        if env.current_player == player_opt.player:
            move = player_opt.act(grid)
            grid, end, _ = env.step(move, print_grid=False)
            x_t[1][move] = 1
        else:
          current_state = x_t.copy() 
          move_deep = player_deep_q_learn.act(current_state)
          move_deep_pos = (int(move_deep/3), move_deep % 3)

          if current_state[0][move_deep_pos] == 1 or current_state[1][move_deep_pos] == 1: 
            end = True
            reward = -1
          else:
            x_t[0][move_deep_pos] = 1
            grid, end, _ = env.step(move_deep_pos, print_grid=False)

        if end:
            if reward != -1: reward = env.reward(player=player_deep_q_learn.player)
            break

    return reward

In [None]:
def computeTestM(player_deep_q_learn):
    N = 500
    opt, ran = 0, 0
    player_opt = OptimalPlayer()
    player_opt.epsilon, player_deep_q_learn.epsilon = 0, 0
    
    # First we compute Mopt
    for i in range(N):
        opt += one_game(player_deep_q_learn, player_opt, i)
        
    # Then we compute Mrand
    player_opt.epsilon = 1
    for i in range(N):
        ran += one_game(player_deep_q_learn, player_opt, i)    
    
    return opt / N, ran / N

In [None]:
e_max, e_min = 0.8, 0.1
n_expl = np.linspace(1, 20000, 5, dtype=int)

average_mopt_array = []
average_mran_array = []

for n in n_expl:
  average_mopt = {}
  average_mran = {}

  player_opt = OptimalPlayer(epsilon=0.5)
  player_deep_q_learn = DeepQlearningPlayer()

  nr_games = 20000
  update_target_network = 500
  end = False

  for i in range(1, nr_games + 1):
    player_deep_q_learn.epsilon = np.max([e_min, e_max * (1 - i / n)])

    if i % update_target_network == 0:
      player_deep_q_learn.target_net.load_state_dict(player_deep_q_learn.policy_net.state_dict())

    if i%250 == 0: 
          average_mopt[i], average_mran[i] = computeTestM(player_deep_q_learn)
    
    env.reset()
    player_opt.set_player(j=i)
    player_deep_q_learn.set_player(j=i)
    
    grid, _, __ = env.observe()
    x_t = np.stack([grid, grid])
    
    for j in range(9):

      if env.current_player == player_opt.player:
        move = player_opt.act(grid)
        x_t[1][move] = 1
        grid_after_player_opt, end, winner = env.step(move, print_grid=False)
        if not end and j != 0:
          next_state = torch.tensor(x_t.copy(), device=device).unsqueeze(0)
          reward = 0
          player_deep_q_learn.memory.push(current_state, move_deep, next_state, reward)
          player_deep_q_learn.optimize_model(add_to_total_loss=False)
        if end: reward = env.reward(player=player_deep_q_learn.player)
      
      else:
        current_state = x_t.copy() 
        move_deep = player_deep_q_learn.act(current_state)
        move_deep_pos = (int(move_deep/3), move_deep % 3)

        if current_state[0][move_deep_pos] == 1 or current_state[1][move_deep_pos] == 1:
          reward = -1
          end = True
        else:
          x_t[0][move_deep_pos] = 1
          grid_after_player_deep_q_learn, end, winner = env.step(move_deep_pos, print_grid=False)
          if end: reward = env.reward(player=player_deep_q_learn.player)
          
      if end:
        next_state = None
        reward_every_250_games += reward
        player_deep_q_learn.memory.push(current_state, move_deep, next_state, reward)
        player_deep_q_learn.optimize_model(add_to_total_loss=True)
        env.reset()
        break

      else:
          if env.current_player == player_opt.player: grid = grid_after_player_deep_q_learn
          else: grid = grid_after_player_opt
  average_mopt_array.append(average_mopt)
  average_mran_array.append(average_mran)      

In [None]:
fig = plt.figure(1, figsize=(15, 10))
colors = ['orange', 'purple', 'green','red', 'pink']
for i,n in enumerate(n_expl):
    plt.plot(list(average_mopt_array[i].keys()), list(average_mopt_array[i].values()), label=n, linestyle='-', color=colors[i])
    plt.plot(list(average_mran_array[i].keys()), list(average_mran_array[i].values()), label=n, linestyle='--', color=colors[i])
plt.title(r"$M_{rand}$ (--) and $M_{opt}$ (-) for different values of $n^*$")
plt.legend(title='$n^*$ values')
plt.xlabel("Number of games")
plt.ylabel('Mean reward')
plt.ylim((-1, 1))
plt.grid()
plt.savefig(save_dir + "DQN vs Opt(0.5) M_opt and M_rand n*.jpg", dpi=400)
plt.show()

# Question 14

In [None]:
n = 5000
e_max, e_min = 0.8, 0.1
eps_opt = [0.2, 0.4, 0.6, 0.8]

average_mopt_array = []
average_mran_array = []

for eps in eps_opt:
  average_mopt = {}
  average_mran = {}

  player_opt = OptimalPlayer(epsilon=eps)
  player_deep_q_learn = DeepQlearningPlayer()

  nr_games = 20000
  update_target_network = 500
  end = False

  for i in range(1, nr_games + 1):
    player_deep_q_learn.epsilon = np.max([e_min, e_max * (1 - i / n)])

    if i % update_target_network == 0:
      player_deep_q_learn.target_net.load_state_dict(player_deep_q_learn.policy_net.state_dict())

    if i%250 == 0: 
          average_mopt[i], average_mran[i] = computeTestM(player_deep_q_learn)
    
    env.reset()
    player_opt.set_player(j=i)
    player_deep_q_learn.set_player(j=i)
    
    grid, _, __ = env.observe()
    x_t = np.stack([grid, grid])
    
    for j in range(9):

      if env.current_player == player_opt.player:
        move = player_opt.act(grid)
        x_t[1][move] = 1
        grid_after_player_opt, end, winner = env.step(move, print_grid=False)
        if not end and j != 0:
          next_state = torch.tensor(x_t.copy(), device=device).unsqueeze(0)
          reward = 0
          player_deep_q_learn.memory.push(current_state, move_deep, next_state, reward)
          player_deep_q_learn.optimize_model(add_to_total_loss=False)
        if end: reward = env.reward(player=player_deep_q_learn.player)
      
      else:
        current_state = x_t.copy() 
        move_deep = player_deep_q_learn.act(current_state)
        move_deep_pos = (int(move_deep / 3), move_deep % 3)

        if current_state[0][move_deep_pos] == 1 or current_state[1][move_deep_pos] == 1:
          reward = -1
          end = True
        else:
          x_t[0][move_deep_pos] = 1
          grid_after_player_deep_q_learn, end, winner = env.step(move_deep_pos, print_grid=False)
          if end: reward = env.reward(player=player_deep_q_learn.player)
          
      if end:
        next_state = None
        reward_every_250_games += reward
        player_deep_q_learn.memory.push(current_state, move_deep, next_state, reward)
        player_deep_q_learn.optimize_model(add_to_total_loss=True)
        env.reset()
        break

      else:
          if env.current_player == player_opt.player: grid = grid_after_player_deep_q_learn
          else: grid = grid_after_player_opt
  average_mopt_array.append(average_mopt)
  average_mran_array.append(average_mran)      

In [None]:
fig = plt.figure(1, figsize=(15, 10))
colors = ['orange', 'purple', 'green','red', 'pink']
for i,n in enumerate(eps_opt):
    plt.plot(list(average_mopt_array[i].keys()), list(average_mopt_array[i].values()), label=n, linestyle='-', color=colors[i])
    plt.plot(list(average_mran_array[i].keys()), list(average_mran_array[i].values()), label=n, linestyle='--', color=colors[i])
plt.title(r"$M_{rand}$ (--) and $M_{opt}$ (-) for different values of $\epsilon$ and $n^*=5000$")
plt.legend(title='$\epsilon$ values')
plt.xlabel("Number of games")
plt.ylabel('Mean reward')
plt.ylim((-1, 1))
plt.grid()
plt.savefig(save_dir + "DQN vs Opt(eps_opt) M_opt and M_rand fixed n*.jpg", dpi=400)
plt.show()