# Initialize a game

In [None]:
%matplotlib notebook
import random
from collections import deque
from copy import copy
from math import *

import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from ConnectN import ConnectN
from Play import Play
import MCTS
from MCTS import device

In [None]:
game_setting = {'size':(6,6), 'N':4, 'pie_rule':True}
game = ConnectN(**game_setting)

In [None]:
gameplay=Play(ConnectN(**game_setting),
              player1=None,
              player2=None)

# Define our policy

Please go ahead and define your own policy! See if you can train it under 1000 games and with only 1000 steps of exploration in each move.

In [None]:
class Policy(nn.Module):

    def __init__(self, game):
        super(Policy, self).__init__()

        # input = 6x6 board
        # convert to 5x5x8
        self.conv1 = nn.Conv2d(1, 16, kernel_size=2, stride=1, bias=False)
        # 5x5x16 to 3x3x32
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=1, bias=False)

        self.size = 3 * 3 * 32
        
        # the part for actions
        self.fc_action1 = nn.Linear(self.size, self.size//4)
        self.fc_action2 = nn.Linear(self.size//4, 36)
        
        # the part for the value function
        self.fc_value1 = nn.Linear(self.size, self.size//6)
        self.fc_value2 = nn.Linear(self.size//6, 1)
        self.tanh_value = nn.Tanh()
        
    def forward(self, x):

        y = F.leaky_relu(self.conv1(x))
        y = F.leaky_relu(self.conv2(y))
        y = y.view(-1, self.size)
        
        # action head
        a = self.fc_action2(F.leaky_relu(self.fc_action1(y)))
        
        avail = (torch.abs(x.squeeze())!=1).type(torch.FloatTensor)
        avail = avail.view(-1, 36)
        maxa = torch.max(a)
        exp = avail*torch.exp(a-maxa)
        prob = exp/torch.sum(exp)
        
        # value head
        value = self.tanh_value(self.fc_value2(F.leaky_relu( self.fc_value1(y) )))
        return prob.view(6,6), value

# Define a MCTS player for Play

In [None]:
game_setting = {'size':(6,6), 'N':4}
game = ConnectN(**game_setting)
policy = Policy(game)

def Policy_Player_MCTS(game):
    mytree = MCTS.Node(copy(game))
    for _ in range(1000):
        mytree.explore(policy)       
    mytreenext, (v, nn_v, p, nn_p) = mytree.next(temperature=0.1)
    return mytreenext.game.last_move

def Random_Player(game):
    return random.choice(game.available_moves())

# Play a game against a random policy

In [None]:
gameplay=Play(ConnectN(**game_setting),
              player1=Policy_Player_MCTS,
              player2=None)

# Training

In [None]:
# initialize our alphazero agent and optimizer
game=ConnectN(**game_setting)
policy = Policy(game)
optimizer = optim.Adam(policy.parameters(), lr=0.01, weight_decay=1.0e-5)

Beware, training is **VERY VERY** slow!!

In [None]:
# train our agent
episodes = 2000
outcomes = []
policy_loss = []
Nmax = 1000

with tqdm(total=episodes, unit='episode') as pbar:
    for episode in range(episodes):

        mytree = MCTS.Node(game)
        logterm = []
        vterm = []

        while mytree.outcome is None:
            for _ in range(Nmax):
                mytree.explore(policy)
                if mytree.N >= Nmax:
                    break

            current_player = mytree.game.player
            mytree, (v, nn_v, p, nn_p) = mytree.next()
            mytree.detach_mother()

            loglist = torch.log(nn_p)*p
            constant = torch.where(p>0, p*torch.log(p), torch.tensor(0.).to(device))
            logterm.append(-torch.sum(loglist-constant))

            vterm.append(nn_v*current_player)

        # we compute the "policy_loss" for computing gradient
        outcome = mytree.outcome
        outcomes.append(outcome)
        loss = torch.sum((torch.stack(vterm)-outcome)**2 + torch.stack(logterm))
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        policy_loss.append(float(loss))
        if (episode%500) == 0:
            torch.save(policy, f'6-6-4-pie-{episode:d}.mypolicy')

        del loss

# Challenge
Pit the AI you just trained against the provided policy ('6-6-4-pie.policy')

In [None]:
challenge_policy = torch.load('6-6-4-pie.policy')

def Challenge_Player_MCTS(game):
    mytree = MCTS.Node(copy(game))
    for _ in range(1000):
        mytree.explore(challenge_policy)
    mytreenext, (v, nn_v, p, nn_p) = mytree.next(temperature=0.1)
    return mytreenext.game.last_move

# Let the game begin!

In [None]:
gameplay=Play(ConnectN(**game_setting),
              player2=Policy_Player_MCTS,
              player1=Challenge_Player_MCTS)