In [1]:
import numpy as np
import matplotlib.pyplot as plt
import torch
from torch.utils.tensorboard import SummaryWriter

from board import TicTacToe_Board
from DQN import DQN
from Agent import RandomAgent, AIAgent
from hard_code import hardcode

# Replay memory

In [2]:
from collections import namedtuple
import random
# Define replay memory:
Transition = namedtuple('Transition',\
        ('state', 'action', 'reward', 'next_state', 'done'))

class ReplayMemory:
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []

    def add(self, *args):
        self.memory.append(Transition(*args))

    def sample(self, batch):
        return random.sample(self.memory, batch)

    def __len__(self):
        return len(self.memory)

# Optimizer procedure

In [3]:
def optimize(policy, target, memory, BATCH, optimizer):
    if memory.__len__() < BATCH:
        return
    
    transitions = memory.sample(BATCH)
    batch = Transition(*zip(*transitions))
    
    # Separate states, rewards,...
    states = torch.cat(batch.state).reshape(BATCH, -1)
    actions = torch.cat(batch.action)
    rewards = torch.cat(batch.reward)
    next_state = torch.cat(batch.next_state).reshape(BATCH, -1)
    done = batch.done
    
    # Evaluate policy on states:
    out_policy = policy(states).gather(1, actions)
    
    # Evaluate target:
    out_target = target(next_state).max(1)[0]
    
    for elem in range(out_target.size(0)):
        if done[elem]: out_target *= 0.
    
    target = (rewards + GAMMA * out_target).view(BATCH,1)
    loss = torch.mean(torch.pow(out_policy - target,2))
    
    optimizer.zero_grad()
    loss.backward()
    for param in policy.parameters():
        param.grad.data.clamp_(-1, 1)
     
    optimizer.step()
    return loss.item()

In [4]:
CAPACITY = 10000
BATCH = 100
EPISODES = 200000
GAMMA = 0.99
UPDATE = 10
IT_PRINT = 100

In [5]:
# Greddy parameters:
EPSmax = 6e-1
EPSmin = 1e-1
decay = 5e-3

In [6]:
def greedyPolicy(epoch, EPSmax, EPSmin, decay):
    return  (EPSmax - EPSmin) * np.exp(-decay * epoch) + EPSmin

def select_move(policy, state, EPS=0.):
    if random.random() < EPS:
        return torch.randint(0,5,(1,1))
    else:
        policy_value = policy(torch.Tensor(state))
        return policy_value.max(0)[1].view(1,1)

In [7]:
# Define policy and target functions.
policy = DQN(9,105,60,9)
target = DQN(9,105,60,9)
target.load_state_dict(policy.state_dict())
target.eval()

# Declare optimizer:
optimizer = torch.optim.RMSprop(policy.parameters(), lr=0.0025)

In [8]:
from IPython.display import clear_output

# Initialize board:
board = TicTacToe_Board()

# Initialize AI player:
player1 = AIAgent(policy, target=target, train=True)

# Initialize hardcode player:
player2 = hardcode()
player3 = RandomAgent()

# Initialize replaymemory.
replaymemory = ReplayMemory(CAPACITY)

loss_list, reward_list = [], []
epoch = 0
win, drawn, lost, cheat, total = 0, 0, 0, 0, 0

# Settings for Tensorboard:
writer = SummaryWriter(log_dir='tictactoe_train')
# Start training loop:
for episode in range(1, EPISODES):
    total_reward = 0
    board.reset()
    state = board.get_state().copy()
    
    while not board.end:
        # Player one makes move:
        EPS = greedyPolicy(epoch, EPSmax, EPSmin, decay)
        action = player1.select_action_train(state, EPS=EPS)
        index = board.action2index(action.item())
        new_state, reward = board.play(index[0], index[1])
        if board.end or reward == -10:
            # The agent either won or tried to cheat. Save movement to replaymemory.
            replaymemory.add(torch.Tensor(state),\
                             action,\
                             torch.Tensor([reward]),\
                             torch.Tensor(new_state),\
                             board.end)
            board.end = True
            
        if not board.end:
            # Second player makes its move:
            
            # Get available actions from board:
            avail_act = board.avail_actions(new_state)
            # With certain probability execute random action.
            if random.random() < 5e-2:
                rnd_act = player3.select_action(new_state, avail_act)
            else:
                # Hardcode action
                rnd_act = player2.select_action(new_state, avail_act)
            index = board.action2index(rnd_act)
            new_state, reward = board.play(index[0], index[1])
            
            # Save point to replaymemory:
            replaymemory.add(torch.Tensor(state),\
                             action,\
                             torch.Tensor([reward]),\
                             torch.Tensor(new_state),\
                             board.end)
        
        state[:] = new_state
        
        # Optimization step:
        loss = optimize(player1.policy, player1.target, replaymemory, BATCH,optimizer)
            
        epoch += 1
        
        total_reward += reward
        
    if episode % UPDATE == 0:
        player1.target.load_state_dict(player1.policy.state_dict())
        
    # Statistics after episode.
    total += 1
    if reward == -10:
        cheat += 1
    elif reward == 100:
        win += 1
    elif reward == -1:
        lost += 1
    elif reward == 0:
        drawn += 1
        
    if episode % IT_PRINT == 0:
        writer.add_scalars('Statistics', {'Win':torch.Tensor([win / total]),\
                                          'Loss': torch.Tensor([lost / total]),\
                                          'Drawn': torch.Tensor([drawn / total]),\
                                          'Cheat': torch.Tensor([cheat / total])}, episode)
        writer.add_scalar('Loss', torch.Tensor([loss]), episode)

writer.close()

# Save model
torch.save(player1.policy.state_dict(), 'AIagent.pth')