#Training
This notebook trains a model to play extended version of TicTacToe (aka gomoku).
The approach chosen for this problem is a reinforcement learning method known as policy gradient (with the help of discounted rewards).

###Setup
Setting up the notebook, necessary python modules and google drive - where the file with the model will be saved.

In [0]:
%reload_ext autoreload
%autoreload 2
%matplotlib inline

In [0]:
from google.colab import drive
from pathlib import Path
import numpy as np
import torch
import random
import math

print(torch.__version__)

1.4.0


In [0]:
drive.mount("/content/gdrive", force_remount = True)
path = Path("/content/gdrive/My Drive/Projects/TicTacToe")
path.mkdir(parents=True, exist_ok=True)
modelPath = Path(str(path) + "/model.pt")
modelPath

Mounted at /content/gdrive


PosixPath('/content/gdrive/My Drive/Projects/TicTacToe/model.pt')

###Win detection
Function for detecting whether or not someone has won.

In [0]:
def winDetection(board, x, y, inALine, value):

    moveX = [1, 0, 1, 1]   #move in x direction
    moveY = [0, 1, 1, -1]  #move in y direction

    for i in range(4):
        sum = 1
        for j in range(1, inALine):
            if(board[y + j * moveY[i]][x + j * moveX[i]] != value):
               break
            sum += 1
        for j in range(1, inALine):
            if(board[y - j * moveY[i]][x - j * moveX[i]] != value):
               break
            sum += 1
        if sum >= inALine:
            return True
    
    #no winner yet
    return False

###Parameters
Meta paramters for the game.

In [0]:
boardGameSize = 5
boardTotalSize = boardGameSize + 2
inALine = 4
discountRate = 0.9

###Transform board to input into nn
Data from board must be transformed to match model's input.

In [0]:
def transformInput(board, turn):
  boardGameSize = len(board) - 2
  boardExtended = np.repeat(board[1:boardGameSize + 1, 1:boardGameSize + 1].flatten()[:, np.newaxis], 2, axis=1) #create new np array from two copies of sliced board
  boardExtended = boardExtended.transpose()
  
  if turn != 1: 
    boardExtended *= -1; #change perspective for red's move
  
  boardExtended[0][boardExtended[0] < 0] = 0 #filter first copy
  boardExtended[1][boardExtended[1] > 0] = 0 #filter second copy

  boardExtended[1] *= -1
  
  boardExtended = boardExtended.flatten()
  #boardExtended = boardExtended.reshape(1, len(boardExtended))
  
  return boardExtended

###Define model
Class with the model definition. In current version - a simple fully connected neural network.

In [0]:
class Model(torch.nn.Module):
  def __init__(self, sizeFirst, sizeHidden, sizeLast):
    super(Model, self).__init__()
    self.linear1 = torch.nn.Linear(sizeFirst, sizeHidden, bias = True)
    self.linear2 = torch.nn.Linear(sizeHidden, sizeLast, bias = True)

  def forward(self, x):
    hidden = self.linear1(x)
    hidden = torch.nn.functional.relu(hidden)

    y_pred = self.linear2(hidden)
    y_pred = torch.nn.functional.softmax(y_pred, dim=-1)
    
    return y_pred


model = Model((boardGameSize**2) * 2, 32, boardGameSize**2)

###Get action from nn's output
Filtering results, normalising and additionaly sampling at random.

In [0]:
def filterResults(res, board):
  res = res.reshape(boardGameSize**2)
  for i in range(boardGameSize):
    for j in range(boardGameSize):
      if board[i + 1][j + 1] != 0:
        res[i * boardGameSize + j] = 0
  return res

In [0]:
def normalize(res):
  temp = res.sum()
  for i in range(len(res)):
    res[i] /= temp
  return res

In [0]:
def sampleRandom(res, board):
  boardGameSize = len(board) - 2
  tempArray = np.random.permutation(boardGameSize**2)
  for i in range(len(tempArray)):
    if res[tempArray[i]] > 0:
      return tempArray[i]

###Play game
Big function with the game loop.

In [0]:
#mode=1: selfplay
#mode=2: agent vs random opponent
def playGame(model, boardGameSize, inALine, discountRate = 0.9, mode = 1):
  
  #inits
  
  board = np.zeros([boardTotalSize, boardTotalSize], dtype = int)
  
  nMoves = 0
  isWinnerFound = False
  winner = 0
  turn = 1
  
  listOfBoardStates = []
  listOfDecisions = []
  
  random.seed()
  
  #game loop
  
  while not isWinnerFound:
  
    #check draw
    if nMoves == boardGameSize**2:
      break
    nMoves += 1
    
    #transform and save input
    nnInput = transformInput(board, turn)
    nnInput = torch.from_numpy(nnInput).float()
    listOfBoardStates.append(nnInput)
    
    #calculate probability distribution
    res = model(nnInput)
    res = filterResults(res, board)
    res = normalize(res)
    
    
    #sample move (agent)
    m = torch.distributions.Categorical(res)
    index = m.sample()
    
    #sample move (random opponent)
    if mode == 2 and turn == -1:
      index = sampleRandom(res, board)
    
    #make move
    a = (index // boardGameSize) + 1
    b = (index % boardGameSize) + 1
    if board[a][b] != 0:
      print("Noooo!")
    board[a][b] = turn
    
    #save move
    listOfDecisions.append(index)
    
    #check if someone won
    if winDetection(board, b, a, inALine, turn):
      isWinnerFound = True
      winner = turn
    
    #change turn
    turn *= -1
  
  listOfLabels = torch.tensor(listOfDecisions)
  return listOfBoardStates, listOfLabels, winner

###Calculate rewards and save data from game

In [0]:
def calculateRewards(listOfBoardStates, winner, discountRate = 0.9):
  rewards = torch.zeros([len(listOfBoardStates)], dtype=float)
  if winner != 0:
    rewards[-1] = 1
    rewards[-2] = -1
    for i in range(len(listOfBoardStates) - 3, -1, -1):
      rewards[i] = rewards[i + 2] * discountRate
  
    rewards -= torch.mean(rewards)
    rewards /= torch.std(rewards)
  
  return rewards

###Play against random opponent
Playing against an opponent whose decisions are completely random.

In [0]:
def playAgainstRandom(nRounds):
  winCounter = 0

  for i in range(nRounds):
    _, __, winner = playGame(model, boardGameSize, inALine, mode=2)
    if winner == 1:
      winCounter += 1
  
  return winCounter / float(nRounds)

###Calculate time for a set of games
Benchmarking model's performance across the set of a 100 games against random opponent.

In [0]:
import time

startTime = time.time()
score = playAgainstRandom(100)
endTime = time.time()

score, endTime - startTime

(0.58, 2.4014534950256348)

###Train

In [0]:
#parameters for training
baseLR = 1e-3
epoch = 0
nEpochs = (10 ** 4)
validateEvery = 10 ** 3
nRoundsOfValidation = 10 ** 2
decayEvery = 3 * (10 ** 3)

In [0]:
#optimizer
optimizer = torch.optim.Adam(model.parameters(), lr = baseLR)

In [0]:
#training across one batch - one full game
def train(model, optimizer, data):
  for input, lb, rew in data:
    optimizer.zero_grad()
    result = model(input)
    m = torch.distributions.Categorical(result)
    loss = -m.log_prob(lb) * rew
    
    if loss == loss:
      loss.backward()
      optimizer.step()

In [0]:
#dynamic learning rate
def adjustLR(optimizer, epoch, baseLR, decayEvery):
  for g in optimizer.param_groups:
    g['lr'] = baseLR * (0.1 ** (epoch // decayEvery))

In [0]:
#training
for i in range(nEpochs):
  listOfBoardStates, listOfDecisions, winner = playGame(model, boardGameSize, inALine, discountRate)
  rewards = calculateRewards(listOfBoardStates, winner, discountRate)
  zipper = zip(listOfBoardStates, listOfDecisions, rewards)
  data = list(zipper)
  
  train(model, optimizer, data)
  epoch += 1
  
  adjustLR(optimizer, epoch, baseLR, decayEvery)

  if epoch % validateEvery == 0:
    score = playAgainstRandom(nRoundsOfValidation)
    print(epoch, score)
    for param in optimizer.param_groups:
      print("%.7f" % param['lr'])

1000 0.79
0.0010000
2000 0.75
0.0010000
3000 0.79
0.0001000
4000 0.79
0.0001000
5000 0.84
0.0001000
6000 0.84
0.0000100
7000 0.87
0.0000100
8000 0.86
0.0000100
9000 0.86
0.0000010
10000 0.86
0.0000010


In [0]:
#save model
torch.save(model.state_dict(), modelPath)

In [0]:
#print parameter values
for name, param in model.named_parameters():
  if param.requires_grad:
    print(name, param.data)