In [1]:
import numpy as np
%load_ext autoreload
%autoreload 2
%matplotlib inline
from tic_env import TictactoeEnv, OptimalPlayer
from utils import play_game, Metric
import matplotlib.pyplot as plt
import pandas as pd
from tqdm.notebook import tqdm_notebook
import itertools
import pickle
import torch
import torch.nn as nn
from collections import namedtuple, deque
from copy import deepcopy
import random

# tabular Q-learning
The implementations of the the learning from experts agent and self-learning one were done separately. We therefore have different helper functions doing the same job.


## Learning from experts

In [None]:
#################################################################################
#-------------------------------------------- Helpers for learning from experts
def play_game(p1,p2):
    env=TictactoeEnv()
    grid, _, __ = env.observe()
    for j in range(9):
        if env.current_player == p1.player:
            move = p1.act(grid)
        else:
            move = p2.act(grid)

        grid, end, winner = env.step(move, print_grid=False)
        if end:
            return env
            break

def Metric(policy,optimal=False):
    N_wins=0
    N_losses=0
    N=0
    for _ in range(5):
        np.random.seed()
        for i in range(500):
            Turns = np.array([['X','O']]*250+[['O','X']]*250)
            if optimal:
                player_test = OptimalPlayer(epsilon=0., player=Turns[i,1])
            if not optimal:
                player_test = OptimalPlayer(epsilon=1., player=Turns[i,1])

            player_new = policy(player=Turns[i,0])
            env_end= play_game(player_new,player_test)
            if env_end.winner==player_new.player:
                N_wins+=1
            if env_end.winner==player_test.player:
                N_losses+=1
            N+=1

    return (N_wins - N_losses)/N

############################ Q-Learning from experts
def get_hash(state:np.array)->str:    
    return str(state.flatten(order='C'))

def get_available_positions(state:np.array):
      
    flatten_array = state.flatten(order='C')
    indices = np.argwhere(np.abs(flatten_array)<1).T
    
    return indices.flatten()

def get_best_action(availables_positions:np.array,array_of_q_values:np.array):
    
    A = np.argmax(array_of_q_values,axis=1)[0]
    #print(availables_positions)
    
    if  A in availables_positions :
        return A.item()
    
    else :
        A = availables_positions[0]
        for i in availables_positions :
            if array_of_q_values[:,i]>array_of_q_values[:,A]:
                A = i
        return A.item()

def get_action(epsilon,current_state,q_table):
    
    availables_positions = get_available_positions(current_state)
    rnd = np.random.uniform(0,1)
    current_state_hash = get_hash(current_state)
    if rnd > epsilon :
        A = get_best_action(availables_positions,q_table[current_state_hash]) # best move among possible moves        
    else:
        np.random.shuffle(availables_positions) # shuffle
        A = availables_positions[0] # take first
        A = A.item()
    
    return A
    
def get_max_Q(q_table:dict,state:np.array):
        
    hash_ = get_hash(state)
    max_q = 0.0
    
    if hash_ in q_table.keys():
        max_q = np.max(q_table[hash_],axis=1)
        
    return max_q
    
def update_player(q_table,current_state,next_state,A,alpha,reward,gamma):
    
    current_state_hash = get_hash(current_state)
    
    if next_state is not None :
        #----- Update q_table
        array_of_q_values = q_table[current_state_hash]
        q_value = array_of_q_values[:,A]
        updated_q_value = q_value + alpha*(reward + gamma*get_max_Q(q_table,next_state) - q_value)
        array_of_q_values[:,A] = updated_q_value # update Q(S,A)
        q_table[current_state_hash] = array_of_q_values # store updated Q(S,A)
    
    else:
        #----- Update q_table
        array_of_q_values = q_table[current_state_hash]
        q_value = array_of_q_values[:,A]
        updated_q_value = q_value + alpha*(reward + gamma*0.0 - q_value)
        array_of_q_values[:,A] = updated_q_value # update Q(S,A)
        q_table[current_state_hash] = array_of_q_values # store updated Q(S,A)
    
    return q_table

class agent:
    
    def __init__(self,q_table):
        self.q_table = q_table
        
    def act(self,grid):
        
        grid_hash = get_hash(grid)
        availables_positions = get_available_positions(grid)
        
        #-- for states in the q_table
        if grid_hash in self.q_table.keys():
            A = get_best_action(availables_positions,self.q_table[grid_hash])
        
        #-- For any other state not discovered, any available action can be taken
        else :
            np.random.shuffle(availables_positions)
            A = availables_positions[0].item()
            
        return A

def test_policy(eps_optimalplayer,q_table=None,verbose=False,DQN_policy_net=None):
    
    env = TictactoeEnv() # environment
    
    if DQN_policy_net is None:
        assert q_table is not None, "Provide q_table"
        agent_player = agent(q_table) # agent    
        
    else:
        agent_player=deep_Q_player()
    
    #-- Holders 
    wins_count = dict()
    wins_count['optimal_player']=0
    wins_count['agent']=0
    wins_count['draw']=0 
    players = dict()
    players[None] = 'draw'
    turns = np.array(['X','O'])
    agent_symbol = None # 'X' or 'O'
    optimal_symbol = None
    num_illegal_steps = 0
    
    for episode in range(500):
        
        env.reset()
        np.random.seed(episode) 
        
        if episode < 250 :
            agent_symbol = turns[0]
            optimal_symbol = turns[1]
            
        else:
            agent_symbol = turns[1]
            optimal_symbol = turns[0]
        
        player_opt = OptimalPlayer(epsilon=eps_optimalplayer,player=optimal_symbol)
        players[optimal_symbol]=(player_opt,'optimal_player')
        players[agent_symbol]=(agent_player,'agent')
        
        for j in range(9):    
            
            #-- Get turn
            turn = env.current_player
            
            #-- observe grid
            grid,end,_ = env.observe() 
            
            #-- Play
            current_player, _ = players[turn]
            
            #-- Playing with DQN-agent
            if DQN_policy_net is not None and turn==agent_symbol:
                state = grid2tensor(grid,agent_symbol)
                move = current_player.act(state,DQN_policy_net,0)
                
                try:
                    env.step(move,print_grid=False)
                    
                except ValueError:
                    env.end = True
                    env.winner = optimal_symbol
                    num_illegal_steps += 1
   
            else:
                move = current_player.act(grid)  
                env.step(move,print_grid=False)
        
            #-- Chek that the game has finished
            if env.end :
                if env.winner is not None :
                    _,winner = players[env.winner]
                    wins_count[winner] = wins_count[winner] + 1
                else :
                    wins_count['draw'] = wins_count['draw'] + 1
                
                break
    
    M = (wins_count['agent']-wins_count['optimal_player'])/500
    
    if verbose :
        string ="M_rand"
        if eps_optimalplayer < 1:
            string = "M_opt"    
        print(string+" : ",M)
        print(wins_count,'\n')
        print('Number of illegal steps',num_illegal_steps)

    
    return M,num_illegal_steps

In [None]:
def q_learning(epsilon,num_episodes:int,env:TictactoeEnv,path_save:str,
               eps_opt=0.5,alpha=0.05,gamma=0.99,render=False,test=False,tag='',wand_name='Tabular-Q'):
    
    q_table = dict()
    turns = np.array(['X','O'])
    
    #-- Holder 
    wins_count = dict()
    wins_count['optimal_player']=0
    wins_count['agent']=0
    wins_count['draw']=0
    players = dict()
    M_opts = list()
    M_rands = list()
    accumulate_reward = 0
    agent_mean_rewards = [0]*int(num_episodes//250)
    
    #-- Init
    #wandb.init(tags=[tag],project='ANN',entity='fadelmamar', 
    #           name=wand_name, config={'alpha':alpha,'gamma':gamma,'eps_opt':eps_opt})
    
    for episode in range(1,num_episodes+1):
        
        if episode % 250 == 0 :
            agent_mean_rewards[int(episode//250)-1] = accumulate_reward/250
            #wandb.log({'average_reward':accumulate_reward/250})
            accumulate_reward = 0 # reset

            if test:
                M_opt = test_policy(0,q_table=q_table,verbose=False)
                M_rand = test_policy(1,q_table=q_table,verbose=False)
                M_opts.append(M_opt)
                M_rands.append(M_rand)
                #wandb.log({'M_opt':M_opt,'M_rand':M_rand})
                    
        env.reset()     
        #-- Permuting player every 2 games
        if episode % 2 == 0:
            turns[0] = 'X'
            turns[1] = 'O'
        else:
            turns[0] = 'O'
            turns[1] = 'X'
        
        player_opt = OptimalPlayer(epsilon=eps_opt,player=turns[0])
        agent_learner = turns[1]       
        players[turns[0]]='optimal_player'
        players[agent_learner]='agent'
        
        current_state = None
        A = None
        
        for j in range(9) : # The game takes at most 9 steps to finish
            
            #-- Agent plays
            if env.current_player == agent_learner :                 
                #-- Learning agent updates q_table
                current_state = env.observe()[0]
                current_state_hash = get_hash(current_state) 
                
                # Add current_state in q_table if needed
                if not(current_state_hash in q_table.keys()): 
                    q_table[current_state_hash]=np.zeros((1,9))                           
                
                A = get_action(epsilon(episode), current_state,q_table) #-- Choose action A with epsilon greedy
                _,_,_ = env.step(A,print_grid=False) #-- Take action A 
                
            #-- Optimal player plays 
            if not env.end :
                grid,end,_ = env.observe() #-- observe grid
                move = player_opt.act(grid) #-- get move
                env.step(move,print_grid=False) # optimal player takes a move
                
                #-- update agent q_table after optimal player takes a step
                if current_state is not None :
                    agent_reward = env.reward(agent_learner)
                    next_state = env.observe()[0]
                    q_table = update_player(q_table,current_state,next_state,A,alpha,agent_reward,gamma) 
              
            #-- Chek that the game has finished
            if env.end :
                
                agent_reward = env.reward(agent_learner)
                q_table = update_player(q_table,current_state,None,A,alpha,agent_reward,gamma) #-- Update q_table when game ends
                
                if env.winner is None :
                    wins_count['draw'] = wins_count['draw'] + 1   
                else :
                    winner = players[env.winner]
                    wins_count[winner] = wins_count[winner] + 1
                    
                if render : 
                    print(f"Episode {episode} ; Winner is {winner}.")
                    env.render()

                #-- accumulate rewards
                accumulate_reward += env.reward(agent_learner)
                
                break
                    
            
        if episode % 5000 == 0 :
            print(f"\nEpisode : {episode}")
            print(wins_count)
            
    #--save
    if path_save is not None:
        with open(path_save,'wb') as file:
            pickle.dump(q_table, file)
            
    #wandb.finish()
    
    return q_table,wins_count,agent_mean_rewards,M_opts,M_rands


In [None]:
#-- Question 1
eps_1=lambda x : 0.3
if True:
    env = TictactoeEnv()
    q_table,wins_count,agent_mean_rewards,M_opts,M_rands = q_learning(epsilon=eps_1,num_episodes=int(20e3),eps_opt=0.5,
                                                                      env=env,path_save=None,alpha=0.05,tag='Q.1',
                                                                      gamma=0.99,render=False,test=False,wand_name='Tabular-Q')

In [None]:
#-- Question 2 and 3
eps_min=0.1
eps_max=0.8
if True :
    env = TictactoeEnv()
    test=True
    for N_star in [1,10e3,20e3,30e3,40e3]:
        print('--'*20,'>',N_star)
        eps_2=lambda x : max([eps_min,eps_max*(1-x/N_star)])
        q_table,wins_count,agent_mean_rewards,M_opts,M_rands = q_learning(epsilon=eps_2,num_episodes=int(20e3),env=env,tag='Q.2-Q.3',
                                                                          wand_name=f'TabularQ--Nstar-{N_star}',
                                                                          path_save=None,alpha=0.05,gamma=0.99,render=False,test=test)

In [None]:
#-- Additional : plotting epsilon profiles
eps_min=0.1
eps_max=0.8
for N_star in [1,10e3,20e3,30e3,40e3]:
    eps_2=lambda x : max([eps_min,eps_max*(1-x/N_star)])
    plt.plot(list(map(eps_2,range(1,20000+1))),label=f'N_star {N_star}')
plt.title("Epsilons")
plt.legend()
plt.show()

In [None]:
#-- Q.4 Select best N_star
if True :
    env = TictactoeEnv()
    test=True
    N_star = 10e3 # to update
    Mopts=list()
    Mrands=list()
    for eps in [0,0.25,0.5,0.75,1]:
        eps_2=lambda x : max([eps_min,eps_max*(1-x/N_star)])
        q_table,wins_count,agent_mean_rewards,M_opts,M_rands = q_learning(epsilon=eps_2,eps_opt=eps,num_episodes=int(20e3),tag='Q.4--Nstar-{N_star}',wand_name=f'TabularQ--eps_opt-{eps}',
                                                                          env=env,path_save=None,alpha=0.05,gamma=0.99,render=False,test=test)

In [None]:
#-- Q.5
print(f"The maximum M_opt is {max([max(a[1]) for a in data_q4.values()])}  \nThe maximum M_rand is {max([max(a[0]) for a in data_q4.values()])}")

## Learning by self-practice

In [None]:
#define state table with all possible combinations.
boards = []
for i in range(0 , 19683) : 
    c=i
    temp_boards = []
    for ii in range(0 , 9) : 
        temp_boards.append(c % 3)
        c = c // 3
    boards.append(temp_boards)
boards=np.array(boards)
boards[boards == 0]= -1
boards[boards == 2]= 0
boards=boards.tolist()

In [3]:
#################################################################################
#-------------------------------------------- Helpers for self-learning

def dec_exp(n, n_, e_min=.1, e_max=.8):
    #helper function for decreasing exploration rate.
    return max(e_min,e_max*(1- n/n_))


def get_state(grid,player,boards=boards):
    #helper function to get the state from the grid. the sign changes for each player.
    #the state hash is index of the board in the boards table
    sign={'X':1,'O':-1}
    return boards.index((sign[player.player]*grid.flatten().astype('int32')).tolist())

class Q_player:

    def __init__(self,player='X', epsilon=0):

        self.player=player
        self.epsilon=epsilon #exploration rate
        
    def empty(self, grid):
        #function taken from the tic_env.py provided script
        #return all empty positions
        grid=grid.flatten()
        avail = []
        for i in range(9):
            if grid[i] == 0:
                avail.append(i)
        return avail

    
        
    def act(self,state, grid, q_table):
        avail=self.empty(grid)
        
        action_scores = q_table[state,:].copy()
        
        mask=np.ones(action_scores.shape,dtype=bool)
        mask[avail]=False
        action_scores[mask]= -np.inf    #not available positions Q_values are set to -inf
                                        #so they will never be chosen by the agent
        
        action = np.random.choice(np.where(action_scores == np.max(action_scores))[0])

        if np.random.random() < self.epsilon:
            action = np.random.choice(avail)
        
        return action

def Metric_selfQ(policy,Q_table,optimal=False):
    #computes the metrics from a given policy. optimal argument set the optimal or random opponent
    N_wins=0
    N_losses=0
    N=0
    Turns = np.array([['X','O']]*250+[['O','X']]*250)
    for i in range(500):
        np.random.seed()

        
        if optimal: 
            player_test = OptimalPlayer(epsilon=0., player=Turns[i,1])
        if not optimal:
            player_test = OptimalPlayer(epsilon=1., player=Turns[i,1])

        player_new = policy(player=Turns[i,0],epsilon=0)
        env=TictactoeEnv()
        while not env.end:
            if env.current_player == player_new.player:
                state=get_state(env.grid,player_new)
                move = player_new.act(state,env.grid,Q_table)       
            else:
                move = player_test.act(env.grid)

            if not isinstance(move,tuple): 
                    move=(int(move/3),move%3)
            env.step(move, print_grid=False)
                
        if env.winner==player_new.player:
            N_wins+=1
        if env.winner==player_test.player:
            N_losses+=1
        N+=1
        env.reset()               
    return (N_wins - N_losses)/N

def play_game_selfQ(env,Q_table,p1,p2,alpha=0.1,gamma=0.99):
    #play a self-learning game  and updates the Q-table
    state1,new_state1,state2,new_state2=(None,None,None,None)

    while not env.end:

        if env.current_player == p1.player:

            state1=get_state(env.grid,p1)
            action1=p1.act(state1,env.grid,Q_table)

            env.step(int(action1), print_grid=False)
            if state2 is not None:
                new_state2=get_state(env.grid,p2)
                reward2=env.reward(p2.player)
                Q_table[state2,action2] += alpha*(reward2 + gamma*(np.max(Q_table[new_state2,:])) - Q_table[state2,action2])
                if env.end:
                    reward1=env.reward(p1.player)
                    Q_table[state1,action1] += alpha*(reward1 - Q_table[state1,action1])
        else:
            state2=get_state(env.grid,p2)
            action2=p2.act(state2,env.grid,Q_table)
            env.step(int(action2), print_grid=False)
            if state1 is not None:
                new_state1=get_state(env.grid,p1)
                reward1=env.reward(p1.player)
                Q_table[state1,action1] += alpha*(reward1 + gamma*(np.max(Q_table[new_state1,:])) - Q_table[state1,action1])

            if env.end:
                reward2=env.reward(p2.player)
                Q_table[state2,action2] += alpha*(reward2 - Q_table[state2,action2])
        
    return env


NameError: name 'boards' is not defined

In [None]:
#this cell trains the agent by self-pratice for 20'000 games

decreasing_exp_rate=False #change this variable to true for decreasing exploration rate


n_states=3**9
n_actions=9
M_opts=pd.DataFrame()
M_rands=pd.DataFrame()
exp_rates=[0,0.25,0.5,0.75,0.98]
dec_speeds=[1,10000,20000,30000,40000]
for i in range(len(exp_rates)):
    M_opt=[]
    M_rand=[]
    p1=Q_player(player='X')
    p2=Q_player(player='O')
    lr=.05
    Q_table=np.zeros((n_states,n_actions))
    env=TictactoeEnv()
    for game in tqdm_notebook(range(10),unit='game'):
        if decreasing_exp_rate==False:
            eps=exp_rates[i]
        else:
            eps=dec_exp(game, dec_speeds[i])
        
        p1.epsilon=eps    
        p2.epsilon=eps
        
        if game%250==0:
            #log metrics every 250 games
            print(game,'eps',eps)
            M_opt.append(Metric_selfQ(Q_player, Q_table, optimal=True))
            M_rand.append(Metric_selfQ(Q_player, Q_table, optimal=False))
            print(f'M_opt = {M_opt[-1]}, M_rand = {M_rand[-1]}')

    
        #play game
        env_end=play_game_selfQ(env,Q_table,p1,p2,alpha=lr)
        env.reset()

    #log scores
    if decreasing_exp_rate==False:   
        M_opts[eps]=M_opt
        M_rands[eps]=M_rand
    else:
        M_opts[dec_speeds[i]]=M_opt
        M_rands[dec_speeds[i]]=M_rand

    #save Q_table
    Q_table_pd=pd.DataFrame(Q_table)
    if decreasing_exp_rate==False:
         Q_table_pd.to_pickle(f'Q_table_fixed_eps_{eps}.pkl')
    else:
        Q_table_pd.to_pickle(f'Q_table_dec_exp_nstar_{dec_speeds[i]}.pkl')
#save scores in pickle format
if decreasing_exp_rate==False:
    M_opts.to_pickle('Q7_m_opts_fixed_eps.pkl')
    M_rands.to_pickle('Q7_m_rands_fixed_eps.pkl')
else:
    M_opts.to_pickle('Q8_m_opts_dec_eps.pkl')
    M_rands.to_pickle('Q8_m_rands_dec_eps.pkl')

print('max m_rands: \n',M_rands.max())
print('max m_opts:\n', M_opts.max())
    


# DQN

The policy update cells were adapted from the pytorch DQN tutorial.

## Learning from Expert

In [4]:
##################################### Helpers ###################################
class DQN(nn.Module):
    def __init__(self):
        super(DQN,self).__init__()

        self.fc=nn.Sequential(nn.Linear(3*3*2,128),nn.ReLU(),
                              nn.Linear(128,128),nn.ReLU(),
                              nn.Linear(128,128),nn.ReLU(),
                              nn.Linear(128,9))
    
    def forward(self,x):
        x=self.fc(x)
        return x

class ReplayMemory(object):

    def __init__(self, capacity,batch_size):
        self.memory = deque([],maxlen=capacity)
        self.Transition = namedtuple('Transition',('state', 'action', 'next_state', 'reward'))
        self.device='cpu'
        self.batch_size=batch_size
        self.step = 0
        self.accumulated_loss = 0
        self.optimizer = None
        
    def push(self, *args):
        """Save a transition"""
        self.memory.append(self.Transition(*args))

    def sample(self):
        return random.sample(self.memory, self.batch_size)

    def __len__(self):
        return len(self.memory)

class deep_Q_player:
    def __init__(self):
        self.player=''
        
    def act(self,state,model:nn.Module,epsilon:float):
        
        with torch.no_grad():
            action_scores = model(state)
        
        action = action_scores.max(1)[1].item()
        
        if np.random.random() < epsilon:
            action = np.random.choice(range(9)).item()
        
        return action

def grid2tensor(grid:np.array,player:str):
    
    state=np.zeros((3,3,2))
    a = 2*(player=='X')-1 #  1 if player='X' and -1 otherwise
    
    grid1 = np.where(grid==a,1,0)
    grid2 = np.where(grid==-a,1,0)
    
    state[:,:,0]=grid1
    state[:,:,1]=grid2
    state = torch.tensor(state)
    
    flatten_arrays = (state[:,:,0].flatten(),state[:,:,1].flatten())
    
    return torch.cat(flatten_arrays).view(-1,18).float()

def test_policy(eps_optimalplayer,q_table=None,verbose=False,DQN_policy_net=None):
    
    env = TictactoeEnv() # environment
    
    if DQN_policy_net is None:
        assert q_table is not None, "Provide q_table"
        agent_player = agent(q_table) # agent    
        
    else:
        agent_player=deep_Q_player()
    
    #-- Holders 
    wins_count = dict()
    wins_count['optimal_player']=0
    wins_count['agent']=0
    wins_count['draw']=0 
    players = dict()
    players[None] = 'draw'
    turns = np.array(['X','O'])
    agent_symbol = None # 'X' or 'O'
    optimal_symbol = None
    num_illegal_steps = 0
    
    for episode in range(500):
        
        env.reset()
        np.random.seed(episode) 
        
        if episode < 250 :
            agent_symbol = turns[0]
            optimal_symbol = turns[1]
            
        else:
            agent_symbol = turns[1]
            optimal_symbol = turns[0]
        
        player_opt = OptimalPlayer(epsilon=eps_optimalplayer,player=optimal_symbol)
        players[optimal_symbol]=(player_opt,'optimal_player')
        players[agent_symbol]=(agent_player,'agent')
        
        for j in range(9):    
            
            #-- Get turn
            turn = env.current_player
            
            #-- observe grid
            grid,end,_ = env.observe() 
            
            #-- Play
            current_player, _ = players[turn]
            
            #-- Playing with DQN-agent
            if DQN_policy_net is not None and turn==agent_symbol:
                state = grid2tensor(grid,agent_symbol)
                move = current_player.act(state,DQN_policy_net,0)
                
                try:
                    env.step(move,print_grid=False)
                    
                except ValueError:
                    env.end = True
                    env.winner = optimal_symbol
                    num_illegal_steps += 1
   
            else:
                move = current_player.act(grid)  
                env.step(move,print_grid=False)
        
            #-- Chek that the game has finished
            if env.end :
                if env.winner is not None :
                    _,winner = players[env.winner]
                    wins_count[winner] = wins_count[winner] + 1
                else :
                    wins_count['draw'] = wins_count['draw'] + 1
                
                break
    
    M = (wins_count['agent']-wins_count['optimal_player'])/500
    
    if verbose :
        string ="M_rand"
        if eps_optimalplayer < 1:
            string = "M_opt"    
        print(string+" : ",M)
        print(wins_count,'\n')
        print('Number of illegal steps',num_illegal_steps)

    
    return M,num_illegal_steps

def update_policy(policy_net:nn.Module,
                  target_net:nn.Module,
                  memory:ReplayMemory,
                  criterion=nn.SmoothL1Loss(),# F.huber_loss,
                  gamma=0.99,
                  online_update=False,online_state_action_reward=(None,None,None,None)):
    
    
    if online_update :
        #assert None not in online_state_action_reward,'provide these values.'
        
        #-- Compute Q values
        state,next_state,action,reward = online_state_action_reward
        state=state.to(memory.device)
        state_action_values = policy_net(state)[:,action] # take Q(state,action)
        
        next_state_values=torch.tensor([0.0])
        if next_state is not None:
            next_state = next_state.to(memory.device)
            next_state_values = target_net(next_state).max(1)[0].detach() # take max Q(state',action')
        
        #-- Compute target
        target = reward + gamma*next_state_values
        
        #-- Update gradients
        memory.optimizer.zero_grad()
        loss = criterion(state_action_values,target)
        loss.backward()
        memory.optimizer.step
        memory.step += 1

        #-- Log
        #wandb.log({'loss':loss.item(),'reward':reward,'Step':memory.step})

    else:
        if len(memory) < memory.batch_size:
            return False
        
        #-- Sample Transitions
        transitions = memory.sample()
        
        #-- GetTransition of batch-arrays
        batch = memory.Transition(*zip(*transitions))
        
        non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), device=memory.device, dtype=torch.bool)
        non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
        
        state_batch = torch.cat(batch.state).to(memory.device)
        action_batch = torch.cat(batch.action).to(memory.device)
        reward_batch = torch.cat(batch.reward).to(memory.device)

        #-- Compute Q values
        memory.optimizer.zero_grad()
        state_action_values = policy_net(state_batch).gather(1, action_batch.unsqueeze(1))
        
        #-- Compute target
        next_state_values = torch.zeros(memory.batch_size,device=memory.device)
        next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
        target = reward_batch + gamma*next_state_values
        
        #-- Update gradients
        loss = criterion(state_action_values,target.unsqueeze(1))#,reduction='mean')
        loss.backward()
        #for p in policy_net.parameters():
        #    p.grad.data.clamp_(-1,1)
        memory.optimizer.step()
        memory.step += 1

        #-- Log
        #wandb.log({'loss':loss,'mean_batch_reward':reward_batch.float().mean(),'Step':memory.step})
    
    memory.accumulated_loss += loss.item()
    return True # loss.item()

In [None]:
def deep_q_learning(epsilon,num_episodes:int,
                    env:TictactoeEnv,
                    path_save:str,
                    eps_opt=0.5,gamma=0.99,
                    render=False,test=False,online_update=False,wandb_tag="DQN"):
    #-- agent
    agent = deep_Q_player()
    
    #-- Initialize Q networks
    policy_net = DQN()
    target_net = DQN()
    target_net.load_state_dict(policy_net.state_dict())

    #-- Initialize hyperparameters
    batch_size=64
    gamma=0.99
    lr=5e-4
    memory = ReplayMemory(10000,batch_size)
    memory.optimizer = torch.optim.Adam(policy_net.parameters(),lr=lr) # optimizer
    args={'gamma':gamma,'batch_size':batch_size,'replay_buffer':int(1e4),'lr':lr,'eps_opt':eps_opt,'online_update':online_update}
    policy_net.to(memory.device)
    target_net.to(memory.device)

    #-- Holder 
    wins_count = dict()
    wins_count['optimal_player']=0
    wins_count['agent']=0
    wins_count['draw']=0
    players = dict()
    M_opts = list()
    M_rands = list()
    accumulate_reward = 0
    agent_mean_rewards = [0]*int(num_episodes//250)
    num_illegal_actions = 0
    turns = np.array(['X','O'])
    
    for episode in range(1,num_episodes+1):
        
        if episode % 250 == 0 :
            agent_mean_rewards[int(episode//250)-1] = accumulate_reward/250
            
            accumulate_reward = 0 # reset
            memory.accumulated_loss = 0 # reset
            
            if test:
                M_opt,num_illegal_opt = test_policy(0,q_table=None,DQN_policy_net=policy_net,verbose=False)
                M_rand,num_illegal_rand = test_policy(1,q_table=None,DQN_policy_net=policy_net,verbose=False)
                M_opts.append(M_opt)
                M_rands.append(M_rand)
            
        env.reset()
        #-- Permuting player every 2 games
        if episode % 2 == 0 :
            turns[0] = 'X'
            turns[1] = 'O'
        else:
            turns[0] = 'O'
            turns[1] = 'X'
        
        player_opt = OptimalPlayer(epsilon=eps_opt,player=turns[0])
        agent_learner = turns[1]
        players[turns[0]]='optimal_player'
        players[turns[1]]='agent'
        
        #--
        current_state = None
        A = None # action
        
        for j in range(9):
  
            #-- Agent plays 
            if env.current_player == turns[1] :
                
                current_state = grid2tensor(env.observe()[0],agent_learner)                          
                A = agent.act(current_state, policy_net,epsilon(episode))  #----- Choose action A with epsilon greedy
                #wandb.log({'action':A})  
                
                #----- Take action A
                try :
                    _,_,_ = env.step(A,print_grid=False)
                                        
                #----- End game when agent moves illegaly
                except ValueError :                                
                    num_illegal_actions += 1
                    #wandb.log({'num_illegal_actions':num_illegal_actions})
                    env.end = True #-- Terminating game
                    env.winner = turns[0] # optimal player
                    
            #-- Optimal player plays 
            if not env.end :
                
                grid,end,_ = env.observe() #-- observe grid
                move = player_opt.act(grid) #-- get move
                env.step(move,print_grid=False) # optimal player takes a move
  
                #-- Update agent and Replay buffer
                if current_state is not None :   
                    next_state = grid2tensor(env.observe()[0],agent_learner)    
                    agent_reward = env.reward(agent_learner)
                    
                    if not env.end : 
                        memory.push(current_state, torch.tensor([A]), next_state, torch.tensor([agent_reward]))
                        
                    if online_update:
                        update_policy(policy_net,target_net,memory,gamma=gamma,
                                      online_update=True,online_state_action_reward=(current_state,next_state,A,agent_reward))

            #-- Update policy offline if applicable
            if online_update == False :
                success = update_policy(policy_net,target_net, memory,gamma=gamma, online_update=False)

            #-- Chek that the game has finished
            if env.end :
                agent_reward = env.reward(agent_learner)
                memory.push(current_state, torch.tensor([A]), None, torch.tensor([agent_reward])) #-- Store in Replay buffer
                
                if online_update:
                    update_policy(policy_net,target_net,memory,gamma=gamma,
                                  online_update=True,online_state_action_reward=(current_state,None,A,agent_reward))
                #-- Logging
                if env.winner is not None :
                    winner = players[env.winner]
                    wins_count[winner] = wins_count[winner] + 1            
                else :
                    wins_count['draw'] = wins_count['draw'] + 1  
                accumulate_reward += agent_reward
                
                #-- Render
                if render : 
                    print(f"Episode {episode} ; Winner is {winner}.")
                    env.render()
                    
                break # stop for-loop
        
        #-- Log results            
        if episode % 5000 == 0 :
            print(f"\nEpisode : {episode}")
            print(wins_count)
            
        #-- Upddate target network
        if episode % 500 == 0:
            target_net.load_state_dict(deepcopy(policy_net.state_dict()))
            target_net.eval()
    

    return wins_count,agent_mean_rewards,M_opts,M_rands


In [None]:
#-- Q.11 & Q.12
eps_1=lambda x : 0.3

test=False
for do in [False,True]:
    env = TictactoeEnv()
    wins_count,agent_mean_rewards,M_opts,M_rands = deep_q_learning(epsilon=eps_1,num_episodes=int(20e3),
                                                                        eps_opt=0.5,env=env,path_save=None,
                                                                        gamma=0.99,render=False,test=test,
                                                                        wandb_tag="V3",online_update=do)

In [None]:
#-- Q.13
eps_min=0.1
eps_max=0.8

env = TictactoeEnv()
test=True
for N_star in [1,10e3,20e3,30e3,40e3]:
    print('-'*20,' N_star : ',N_star,'-'*20)
    eps_2=lambda x : max([eps_min,eps_max*(1-x/N_star)])
    wins_count,agent_mean_rewards,M_opts,M_rands = deep_q_learning(epsilon=eps_2,num_episodes=int(20e3),
                                                                        eps_opt=0.5,env=env,path_save=None,
                                                                        gamma=0.99,render=False,test=test,
                                                                        wandb_tag=f"V3--{int(N_star)}",online_update=False)

In [None]:
#- Q.14
eps_min=0.1
eps_max=0.8

env = TictactoeEnv()
test=True
N_star=1 # best N_star from Q.13
eps_2=lambda x : max([eps_min,eps_max*(1-x/N_star)])
for eps_opt in [0,0.25,0.5,0.75,1]:
    print('-'*20,' eps_opt : ',eps_opt,'-'*20)
    wins_count,agent_mean_rewards,M_opts,M_rands = deep_q_learning(epsilon=eps_2,num_episodes=int(20e3),
                                                                        eps_opt=eps_opt,env=env,path_save=None,
                                                                        gamma=0.99,render=False,test=test,
                                                                        wandb_tag=f"V3--eps_opt:{eps_opt}",online_update=False)         

## Self-learning

In [6]:
class ReplayMemory_self(object):

    def __init__(self,capacity):
        self.memory=deque([],maxlen=capacity)
    
    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))
    def __getitem__(self,idx):
        return self.memory[idx]
    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

Transition = namedtuple('T',
                        ('player','state', 'action','validity','reward','new_state'))
def dec_exp(n, n_, e_min=.1, e_max=.8):
    #helper function for decreasing exploration rate.
    return max(e_min,e_max*(1- n/n_))

def dec_exp(n, n_, e_min=.1, e_max=.8):
    #helper function for decreasing exploration rate.
    return max(e_min,e_max*(1- n/n_))
    
def Metric_deep_Q_self(policy,model,optimal=False):
    N_wins=0
    N_losses=0
    N=0
    Turns = np.array([['X','O']]*250+[['O','X']]*250)
    for i in range(500):
        np.random.seed()

        
        if optimal: 
            player_test = OptimalPlayer(epsilon=0., player=Turns[i,1])
        if not optimal:
            player_test = OptimalPlayer(epsilon=1., player=Turns[i,1])

        player_new = policy(player=Turns[i,0],epsilon=0)
        env=TictactoeEnv()
        while not env.end:
            if env.current_player == player_new.player:
                state=grid2tensor_self(env.grid,player_new.player)
                move = player_new.act(state,model)  
                if not env.check_valid((int(move/3), move % 3)):
                    env.winner=player_test.player
                    env.end=True
                    break 
            else:
                move = player_test.act(env.grid)

            if not isinstance(move,tuple): 
                    move=(int(move/3),move%3)
            env.step(move, print_grid=False)
                
        if env.winner==player_new.player:
            N_wins+=1
        if env.winner==player_test.player:
            N_losses+=1
        N+=1
        env.reset()               
    return (N_wins - N_losses)/N

class deep_Q_player_self:
    def __init__(self,player='X', epsilon=.0):
        self.player=player
        self.epsilon=epsilon
    
        
    def act(self,state,model):
        
        action = model(state).max(1)[1].view(-1,1)
        if np.random.random() < self.epsilon:
            action = torch.tensor([[random.randrange(9)]])
        
        return action


def grid2tensor_self(grid,player):
    tens=torch.zeros(3,3,2)
    grid=torch.from_numpy(grid)
    if player=='X':
        player_index=torch.nonzero(grid==1,as_tuple=False)
        op_index=torch.nonzero(grid==-1,as_tuple=False)
    if player=='O':
        player_index=torch.nonzero(grid==-1,as_tuple=False)
        op_index=torch.nonzero(grid==1,as_tuple=False)
    tens[player_index[:,0],player_index[:,1],0]=1
    tens[op_index[:,0],op_index[:,1],1]=1
    return tens

def tensor2grid(tensor,player='X'):

    if tensor is None:
        return None
    grid_new=torch.zeros(3,3)
    player='X'
    if player=='X':
        player_index=torch.nonzero(tensor[:,:,0]==1,as_tuple=False)
        op_index=torch.nonzero(tensor[:,:,1]==1,as_tuple=False)
    if player=='O':
        player_index=torch.nonzero(grid==-1,as_tuple=False)
        op_index=torch.nonzero(grid==1,as_tuple=False)
    grid_new[player_index[:,0],player_index[:,1]]=1
    grid_new[op_index[:,0],op_index[:,1]]=-1
    return grid_new.int().numpy()


def play_game_self_deepQ(env,model,p1,p2):
    R_t=[]
    while not env.end:
        if env.current_player == p1.player:
            state1=grid2tensor(env.grid,p1.player)
            action1=p1.act(state1,model)
            if env.end: break
            if env.check_valid((int(action1/3), action1 % 3)):
                env.step((int(action1/3), action1 % 3), print_grid=False)
                reward1=torch.tensor([env.reward(p1.player)])
                new_state1=grid2tensor_self(env.grid,p1.player)
                R_t.append((state1,action1,reward1,new_state1))
            else:
                reward1=torch.tensor([-1])
                R_t.append((state1,action1,reward1,torch.tensor([])))
                break

        if env.current_player == p2.player:
            state2=grid2tensor(env.grid,p2.player)
            action2=p2.act(state2,model)
            if env.end: break
            if env.check_valid((int(action2/3), action2 % 3)):
                env.step((int(action2/3), action2 % 3), print_grid=False)
                reward2=torch.tensor([env.reward(p2.player)])
                new_state2=grid2tensor(env.grid,p2.player)
                R_t.append((state2,action2,reward2,new_state2))
            else:
                reward2=torch.tensor([-1])
                R_t.append((state2,action2,reward2,torch.tensor([])))
                break
        
    return env, R_t

def render(grid):
    if grid is None:
        print(None)
        return
    # print current grid
    value2player = {0: '-', 1: 'X', -1: 'O'}
    for i in range(3):
        print('|', end='')
        for j in range(3):
            print(value2player[int(grid[i,j])], end=' ' if j<2 else '')
        print('|')
class DQN_self(nn.Module):
    def __init__(self):
        super(DQN_self,self).__init__()

        self.fc=nn.Sequential(
            nn.Linear(3*3*2,256),
            nn.ReLU(),
            nn.Linear(256,256),
            nn.ReLU(),
            nn.Linear(256,9)
            )
            
    def forward(self,x):
        x=x.view(-1,18)
        x=self.fc(x)
        return x   


In [7]:
decreasing_exp_rate=True #change this variable to true for decreasing exploration rate



exp_rates=[0.1,0.25,0.5,0.75,0.98]
dec_speeds=[10000,20000,30000,40000]


M_rands=pd.DataFrame()
M_opts=pd.DataFrame()

for i in range(len(exp_rates)):
    
    policy=DQN_self()
    target=DQN_self()
    target.load_state_dict(policy.state_dict()) #target is initialized as a copy of policy
    target.eval()
    criterion=torch.nn.HuberLoss()
    optimizer=torch.optim.Adam(policy.parameters(),lr=5e-4)
    gamma=.99
    batch_size=64
    game=0

    if decreasing_exp_rate==False:
        eps=exp_rates[i]
    else:
        eps=dec_exp(game,dec_speeds[i])

    p1=deep_Q_player_self('X',epsilon=eps)
    p2=deep_Q_player_self('O',epsilon=eps)

    env=TictactoeEnv()
    
    state1, state2, next_state1, next_state2=(None,None,None,None)

    m_rand=[]
    m_opt=[]
    losses=[]
    updates=0
    test,update=(False,False)
    t_test,t_updt=(0,0)

    R=ReplayMemory_self(10000)
    
    while game <=2000:
        
        #update exploration rates
        if decreasing_exp_rate==False:
            eps=exp_rates[i]
        else:
            eps=dec_exp(game,dec_speeds[i])
        
        p1.epsilon=eps
        p2.epsilon=eps

        #test and update conditions setup
        if (game-1)%250==0: 
            t_test=0
        if (game%250==0) & (t_test==0):
            test=True
        if (game-1)%250==0: 
            t_updt=0
        if (game%250==0) & (t_updt==0):
            update=True
            
        if test:
            print(eps,'game',game)
            m_opt.append(Metric_deep_Q_self(deep_Q_player_self,policy,optimal=True))
            m_rand.append(Metric_deep_Q_self(deep_Q_player_self,policy,optimal=False)) 
            print(f'M_opt = {m_opt[-1]}, M_rand = {m_rand[-1]}')
            t_test+=1
            test=False

        #-------------------------play action--------------------------------#
        invalid=False   #bool variable to know when an illegal move is done

        if env.current_player==p1.player:
            
            state1=grid2tensor_self(env.grid,player=p1.player)
            action1=p1.act(state1,model=policy)
            
            if env.check_valid(int(action1)):   #make move only if valid
                env.step(int(action1))
                next_state2=grid2tensor_self(env.grid,player=p2.player)   #next_state2 is computed when it is its turn to play

                if state2 is not None:
                    reward2=torch.tensor([env.reward(p2.player)]) 
                    R.push(2,state2,action2,'valid',reward2,next_state2)
                    
                    
            else:
                #handling invalid moves
                env.end=True
                invalid=True

                reward1=torch.tensor([-1])
                R.push(1,state1,action1,'invalid',reward1,None)
                
                reward2=torch.tensor([env.reward(p2.player)])
                next_state2=None
                R.push(2,state2,action2,'valid',reward2,next_state2)
            

        if (env.current_player==p2.player) & (not env.end):
            
            state2=grid2tensor_self(env.grid,player=p2.player)
            action2=p2.act(state2,policy)
            
            if env.check_valid(int(action2)):   #make move only if valid
                env.step(int(action2))
                next_state1=grid2tensor_self(env.grid,player=p1.player) #next_state2 is computed when it is its turn to play

            
                reward1=torch.tensor([env.reward(p1.player)])
                R.push(1,state1,action1,'valid',reward1,next_state1)
                
            else:
                env.end=True
                invalid=True

                reward2=torch.tensor([-1])
                R.push(2,state2,action2,'invalid',reward2,None)
                
                reward1=torch.tensor([env.reward(p1.player)])
                next_state1=None
                R.push(1,state1,action1,'valid',reward1,next_state1)


        if env.end & (invalid==False):
            
            #tie
            if env.winner==None:
                if env.current_player==p1.player:
                    next_state2=None
                else: 
                    next_state1=None
                reward1=torch.tensor([env.reward(p1.player)]) #should be 0
                reward2=torch.tensor([env.reward(p2.player)]) #should be 0
                R.push(1,state1,action1,'valid',reward1,next_state1)
                R.push(2,state2,action2,'valid',reward2,next_state2)
                
            #one winner
            else:
                if env.winner == p1.player:
                    next_state1=None
                if env.winner == p2.player:
                    next_state2=None

                reward1=torch.tensor([env.reward(p1.player)])
                reward2=torch.tensor([env.reward(p2.player)])
                R.push(1,state1,action1,'valid',reward1,next_state1)
                R.push(2,state2,action2,'valid',reward2,next_state2)
                

        if env.end:
            state1, state2, next_state1, next_state2=(None,None,None,None)
            env.reset()
            game+=1
            
#-------------------------------------update policy----------------------------------------#            
        if len(R) > 64:
            sample=R.sample(batch_size)
            sample=Transition(*zip(*sample))

            non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                                sample.new_state)))
            non_final_new_states = torch.cat([s for s in sample.new_state
                                                if s is not None])
            states=torch.cat([state.unsqueeze(0)for state in sample.state])
            actions=torch.cat(sample.action)
            rewards=torch.cat(sample.reward)
            
            s_a_vals=policy(states).gather(1, actions)
            
            new_state_vals=torch.zeros(batch_size)
            new_state_vals[non_final_mask]=target(non_final_new_states).max(1)[0]

            
            expected=new_state_vals*gamma+rewards
            loss=criterion(s_a_vals,expected.unsqueeze(1))
            losses.append(loss.item())
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        if update:  #update target
            target.load_state_dict(policy.state_dict())
            t_updt+=1
            update=False
            updates+=1

    if decreasing_exp_rate==False:   
        M_opts[eps]=m_opt
        M_rands[eps]=m_rand
    else:
        M_opts[dec_speeds[i]]=m_opt
        M_rands[dec_speeds[i]]=m_rand

    #save policy
    if decreasing_exp_rate==False:
         torch.save(policy.state_dict(),f'DQN_self_lr_fixed_{eps}.pkl')
    else:
        torch.save(policy.state_dict(),f'DQN_self_lr_decreasing_{dec_speeds[i]}.pkl')

#save scores in pickle format
if decreasing_exp_rate==False:
    M_opts.to_pickle('Q16_m_opts_fixed_eps.pkl')
    M_rands.to_pickle('Q16_m_rands_fixed_eps.pkl')
else:
    M_opts.to_pickle('Q17_m_opts_dec_eps.pkl')
    M_rands.to_pickle('Q17_m_rands_dec_eps.pkl')

print('max m_rands: \n',M_rands.max())
print('max m_opts:\n', M_opts.max())
    #bug wtf

0.8 game 0
M_opt = -1.0, M_rand = -1.0
0.78 game 250
M_opt = -1.0, M_rand = 0.316
0.76 game 500


KeyboardInterrupt: 