#  Introduction

This is proof-of-concept bare-bones implementation of **AlphaZero** algorithm. In this notebook algorithm is tested on **TicTacToe** board game. 

Outside of this notebook it was tested on more complex **Othello** game, where after about one week of training it started to achieve reasonable performance even with some "human like" behaviours.

Main components of the algorithm are:
* _Monte-Carlo Tree Search_ used for look ahead
* _Convolutional Neural Network_ with Policy and Value heads

<div class="alert alert-warning">

**NOTE:** this code works, but is not tidied in any way. All debug, logging, assert, testing etc. code was stripped down when copying to this notebook.

When I have some time I plan to cleanup this code, provide appropriate description and include Othello training code.
    
</div>

You should be able to run this notebook end-to-end and play against trained agent at the end. Worked on mine :)

# Imports

In [1]:
import os
import sys
import time
import math
import datetime
import collections

import numpy as np

from keras.models import *
from keras.layers import *
from keras.optimizers import *

Using TensorFlow backend.


# Board

No dependencies so far

In [2]:
class Board():
    """Represents single game board state, pieces, currently available moves etc.
    
    Attributes:
        n (int): size of the board, default: 3
        pieces (2-nested-list-of-int): board state, with following properties:
            allowed values are: -1 is black (X), +1 is white (O), 0 is empty
            e.g. 3-size empty board would be: [[0, 0, 0], [0, 0, 0], [0, 0, 0]]
    """
    
    def __init__(self, n=3):
        "Set up initial board configuration."

        self.n = n
        # Create the empty board array.
        self.pieces = [None]*self.n
        for i in range(self.n):
            self.pieces[i] = [0]*self.n

    # add [][] indexer syntax to the Board
    def __getitem__(self, index): 
        return self.pieces[index]
    
    def get_legal_moves(self, color):
        """Returns all the legal moves for the given color.
        
        Params:
            color: unused
            
        Returns:
            list-of-tuple-of-int: all legal moves in form: [(0, 0), ...]
        """
        moves = set()  # stores the legal moves.

        # Get all the empty squares (color==0)
        for y in range(self.n):
            for x in range(self.n):
                if self[x][y]==0:
                    newmove = (x,y)
                    moves.add(newmove)
        return list(moves)
    
    def has_legal_moves(self):
        for y in range(self.n):
            for x in range(self.n):
                if self[x][y]==0:
                    return True
        return False
    
    def is_win(self, color):
        """Check whether the given player has collected a triplet in any direction; 
        
        Params:
            color (int): -1 for black, 1 for white
            
        Returns:
            bool: True if 'color' won the game, False if game ongoing, draw or loss
        """
        win = self.n
        # check y-strips
        for y in range(self.n):
            count = 0
            for x in range(self.n):
                if self[x][y]==color:
                    count += 1
            if count==win:
                return True
        # check x-strips
        for x in range(self.n):
            count = 0
            for y in range(self.n):
                if self[x][y]==color:
                    count += 1
            if count==win:
                return True
        # check two diagonal strips
        count = 0
        for d in range(self.n):
            if self[d][d]==color:
                count += 1
        if count==win:
            return True
        count = 0
        for d in range(self.n):
            if self[d][self.n-d-1]==color:
                count += 1
        if count==win:
            return True
        
        return False
    
    def execute_move(self, move, color):
        """Perform the given move on the board; 
        color gives the color pf the piece to play (1=white,-1=black)
        """

        (x,y) = move

        # Add the piece to the empty square.
        assert self[x][y] == 0
        self[x][y] = color

**Examples**

In [3]:
board = Board()

In [4]:
board.pieces

[[0, 0, 0], [0, 0, 0], [0, 0, 0]]

In [5]:
np.array(board.pieces)

array([[0, 0, 0],
       [0, 0, 0],
       [0, 0, 0]])

In [6]:
board[2][2]

0

In [7]:
board.get_legal_moves(-1)

[(0, 1), (1, 2), (0, 0), (0, 2), (2, 1), (2, 0), (2, 2), (1, 0), (1, 1)]

In [8]:
board.pieces

[[0, 0, 0], [0, 0, 0], [0, 0, 0]]

In [9]:
board.execute_move((1, 1), -1)

In [10]:
board.pieces

[[0, 0, 0], [0, -1, 0], [0, 0, 0]]

# Game

Internally uses Board class

In [11]:
"""
Game class implementation for the game of TicTacToe.
Based on the OthelloGame then getGameEnded() was adapted to new rules.
"""
class TicTacToeGame:
    """
    This class specifies the base Game class. To define your own game, subclass
    this class and implement the functions below. This works when the game is
    two-player, adversarial and turn-based.

    Use 1 for player1 and -1 for player2.

    See othello/OthelloGame.py for an example implementation.
    """
    def __init__(self, n=3):
        self.n = n

    def getInitBoard(self):
        """
        Returns:
            startBoard: a representation of the board (ideally this is the form
                        that will be the input to your neural network)
        """
        # return initial board (numpy board)
        b = Board(self.n)
        return np.array(b.pieces)

    def getBoardSize(self):
        """
        Returns:
            (x,y): a tuple of board dimensions
        """
        # (a,b) tuple
        return (self.n, self.n)

    def getActionSize(self):
        """
        Returns:
            actionSize: number of all possible actions
        """
        # return number of actions
        return self.n*self.n + 1

    def getNextState(self, board, player, action):
        """
        Input:
            board: current board
            player: current player (1 or -1)
            action: action taken by current player

        Returns:
            nextBoard: board after applying action
            nextPlayer: player who plays in the next turn (should be -player)
        """
        # if player takes action on board, return next (board,player)
        # action must be a valid move
        if action == self.n*self.n:
            return (board, -player)
        b = Board(self.n)
        b.pieces = np.copy(board)
        move = (int(action/self.n), action%self.n)
        b.execute_move(move, player)
        return (b.pieces, -player)

    def getValidMoves(self, board, player):
        """
        Input:
            board: current board
            player: current player

        Returns:
            validMoves: a binary vector of length self.getActionSize(), 1 for
                        moves that are valid from the current board and player,
                        0 for invalid moves
        """
        # return a fixed size binary vector
        valids = [0]*self.getActionSize()
        b = Board(self.n)
        b.pieces = np.copy(board)
        legalMoves =  b.get_legal_moves(player)
        if len(legalMoves)==0:
            valids[-1]=1
            return np.array(valids)
        for x, y in legalMoves:
            valids[self.n*x+y]=1
        return np.array(valids)

    def getGameEnded(self, board, player):
        """
        Input:
            board: current board
            player: current player (1 or -1)

        Returns:
            r: 0 if game has not ended. 1 if player won, -1 if player lost,
               small non-zero value for draw.
               
        """
        # return 0 if not ended, 1 if player 1 won, -1 if player 1 lost
        # player = 1
        b = Board(self.n)
        b.pieces = np.copy(board)

        if b.is_win(player):
            return 1
        if b.is_win(-player):
            return -1
        if b.has_legal_moves():
            return 0
        # draw has a very little value 
        return 1e-4

    def getCanonicalForm(self, board, player):
        """
        Input:
            board: current board
            player: current player (1 or -1)

        Returns:
            canonicalBoard: returns canonical form of board. The canonical form
                            should be independent of player. For e.g. in chess,
                            the canonical form can be chosen to be from the pov
                            of white. When the player is white, we can return
                            board as is. When the player is black, we can invert
                            the colors and return the board.
        """
        # return state if player==1, else return -state if player==-1
        return player*board

    def getSymmetries(self, board, pi):
        """
        Input:
            board: current board
            pi: policy vector of size self.getActionSize()

        Returns:
            symmForms: a list of [(board,pi)] where each tuple is a symmetrical
                       form of the board and the corresponding pi vector. This
                       is used when training the neural network from examples.
        """
        # mirror, rotational
        assert(len(pi) == self.n**2+1)  # 1 for pass
        pi_board = np.reshape(pi[:-1], (self.n, self.n))
        l = []

        for i in range(1, 5):
            for j in [True, False]:
                newB = np.rot90(board, i)
                newPi = np.rot90(pi_board, i)
                if j:
                    newB = np.fliplr(newB)
                    newPi = np.fliplr(newPi)
                l += [(newB, list(newPi.ravel()) + [pi[-1]])]
        return l

    def stringRepresentation(self, board):
        """
        Input:
            board: current board

        Returns:
            boardString: a quick conversion of board to a string format.
                         Required by MCTS for hashing.
        """
        # 8x8 numpy array (canonical board)
        return board.tostring()  # np.tostring() returns raw memory view as string

def display(board):
    n = board.shape[0]

    print("   ", end="")
    for y in range(n):
        print (y,"", end="")
    print("")
    print("  ", end="")
    for _ in range(n):
        print ("-", end="-")
    print("--")
    for y in range(n):
        print(y, "|",end="")    # print the row #
        for x in range(n):
            piece = board[y][x]    # get the piece to print
            if piece == -1: print("X ",end="")
            elif piece == 1: print("O ",end="")
            else:
                if x==n:
                    print("-",end="")
                else:
                    print("- ",end="")
        print("|")

    print("  ", end="")
    for _ in range(n):
        print ("-", end="-")
    print("--")


In [12]:
game = TicTacToeGame(3)

In [13]:
game.getInitBoard()

array([[0, 0, 0],
       [0, 0, 0],
       [0, 0, 0]])

In [14]:
game.getBoardSize()

(3, 3)

In [15]:
game.getActionSize()

10

In [16]:
board.pieces

[[0, 0, 0], [0, -1, 0], [0, 0, 0]]

In [17]:
action = 4
n = 3
(int(action/n), action%n)

(1, 1)

In [18]:
tmp = game.getInitBoard()
nextBoard, nextState = game.getNextState(board=tmp, player=1, action=4)
print('nb', nextBoard)
print('ns', nextState)

nb [[0 0 0]
 [0 1 0]
 [0 0 0]]
ns -1


In [19]:
tmp = game.getInitBoard()
game.getValidMoves(board=tmp, player=-1)

array([1, 1, 1, 1, 1, 1, 1, 1, 1, 0])

In [20]:
tmp = np.array([[1, -1, 1],
                [1, -1, 1],
                [-1, 1, -1]])
game.getGameEnded(board=tmp, player=-1)

0.0001

In [21]:
tmp = np.array([[ 0,  0,  0],
                [ 0, -1,  0],
                [ 0, -1,  1]])
game.getCanonicalForm(board=tmp, player=-1)

array([[ 0,  0,  0],
       [ 0,  1,  0],
       [ 0,  1, -1]])

In [22]:
tmp = np.array([[ 0,  0,  0],
                [ 0, -1,  0],
                [ 0, -1,  1]])
pi = np.array([1, 0, 0, 0, 0, 0, 0, 0, 0, 0])
syms = game.getSymmetries(board=tmp, pi=pi)
for b, p in syms:
    print('board')
    print(b)
    print('pi', p)
    print('---')

board
[[ 1  0  0]
 [-1 -1  0]
 [ 0  0  0]]
pi [0, 0, 0, 0, 0, 0, 0, 0, 1, 0]
---
board
[[ 0  0  1]
 [ 0 -1 -1]
 [ 0  0  0]]
pi [0, 0, 0, 0, 0, 0, 1, 0, 0, 0]
---
board
[[ 0 -1  1]
 [ 0 -1  0]
 [ 0  0  0]]
pi [0, 0, 0, 0, 0, 0, 1, 0, 0, 0]
---
board
[[ 1 -1  0]
 [ 0 -1  0]
 [ 0  0  0]]
pi [0, 0, 0, 0, 0, 0, 0, 0, 1, 0]
---
board
[[ 0  0  0]
 [ 0 -1 -1]
 [ 0  0  1]]
pi [1, 0, 0, 0, 0, 0, 0, 0, 0, 0]
---
board
[[ 0  0  0]
 [-1 -1  0]
 [ 1  0  0]]
pi [0, 0, 1, 0, 0, 0, 0, 0, 0, 0]
---
board
[[ 0  0  0]
 [ 0 -1  0]
 [ 1 -1  0]]
pi [0, 0, 1, 0, 0, 0, 0, 0, 0, 0]
---
board
[[ 0  0  0]
 [ 0 -1  0]
 [ 0 -1  1]]
pi [1, 0, 0, 0, 0, 0, 0, 0, 0, 0]
---


In [23]:
tmp = np.array([[ 0,  0,  0],
                [ 0, -1,  0],
                [ 0, -1,  1]])
game.stringRepresentation(board=tmp)

b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x01\x00\x00\x00\x00\x00\x00\x00'

# Players

In [24]:
"""
Random and Human-ineracting players for the game of TicTacToe.

Author: Evgeny Tyurin, github.com/evg-tyurin
Date: Jan 5, 2018.

Based on the OthelloPlayers by Surag Nair.

"""
class RandomPlayer():
    def __init__(self, game):
        self.game = game

    def play(self, board):
        a = np.random.randint(self.game.getActionSize())
        valids = self.game.getValidMoves(board, 1)
        while valids[a]!=1:
            a = np.random.randint(self.game.getActionSize())
        return a


class HumanTicTacToePlayer():
    def __init__(self, game):
        self.game = game

    def play(self, board):
        # display(board)
        valid = self.game.getValidMoves(board, 1)
        for i in range(len(valid)):
            if valid[i]:
                print(int(i/self.game.n), int(i%self.game.n))
        while True: 
            # Python 3.x
            a = input()
            # Python 2.x 
            # a = raw_input()

            x,y = [int(x) for x in a.split(' ')]
            a = self.game.n * x + y if x!= -1 else self.game.n ** 2
            if valid[a]:
                break
            else:
                print('Invalid')

        return a


In [25]:
rp = RandomPlayer(game)

In [26]:
tmp = game.getInitBoard()
desired_action = rp.play(tmp)
print(desired_action)

1


# TicTacToeNNet

In [27]:
class dotdict(dict):
    def __getattr__(self, name):
        return self[name]

In [28]:
"""
NeuralNet for the game of TicTacToe.

Author: Evgeny Tyurin, github.com/evg-tyurin
Date: Jan 5, 2018.

Based on the OthelloNNet by SourKream and Surag Nair.
"""
class TicTacToeNNet():
    def __init__(self, game, args):
        # game params
        self.board_x, self.board_y = game.getBoardSize()
        self.action_size = game.getActionSize()
        self.args = args

        # Neural Net
        self.input_boards = Input(shape=(self.board_x, self.board_y))    # s: batch_size x board_x x board_y

        x_image = Reshape((self.board_x, self.board_y, 1))(self.input_boards)                # batch_size  x board_x x board_y x 1
        # input -> Conv2D -> BatchNorm -> Activation
        h_conv1 = Activation('relu')(BatchNormalization(axis=3)(Conv2D(args.num_channels, 3, padding='same')(x_image)))         # batch_size  x board_x x board_y x num_channels
        h_conv2 = Activation('relu')(BatchNormalization(axis=3)(Conv2D(args.num_channels, 3, padding='same')(h_conv1)))         # batch_size  x board_x x board_y x num_channels
        h_conv3 = Activation('relu')(BatchNormalization(axis=3)(Conv2D(args.num_channels, 3, padding='same')(h_conv2)))        # batch_size  x (board_x) x (board_y) x num_channels
        h_conv4 = Activation('relu')(BatchNormalization(axis=3)(Conv2D(args.num_channels, 3, padding='valid')(h_conv3)))        # batch_size  x (board_x-2) x (board_y-2) x num_channels
        h_conv4_flat = Flatten()(h_conv4)       
        # flat -> Dense -> BatchNorm -> Activation -> Dropout
        s_fc1 = Dropout(args.dropout)(Activation('relu')(BatchNormalization(axis=1)(Dense(1024)(h_conv4_flat))))  # batch_size x 1024
        s_fc2 = Dropout(args.dropout)(Activation('relu')(BatchNormalization(axis=1)(Dense(512)(s_fc1))))          # batch_size x 1024
        # in -> Dense -> Softmax
        self.pi = Dense(self.action_size, activation='softmax', name='pi')(s_fc2)   # batch_size x self.action_size
        self.v = Dense(1, activation='tanh', name='v')(s_fc2)                    # batch_size x 1

        self.model = Model(inputs=self.input_boards, outputs=[self.pi, self.v])
        self.model.compile(loss=['categorical_crossentropy','mean_squared_error'], optimizer=Adam(args.lr))


In [29]:
# class Game:
#     def getBoardSize(self):
#         return (3, 3)
#     def getActionSize(self):
#         return 3 * 3 + 1

In [30]:
args = dotdict({
    'lr': 0.001,
    'dropout': 0.3,
    'epochs': 10,
    'batch_size': 64,
    'cuda': False,
    'num_channels': 512,
})

In [31]:
game = TicTacToeGame(n=3)
nnet = TicTacToeNNet(game, args)

Instructions for updating:
Colocations handled automatically by placer.
Instructions for updating:
Please use `rate` instead of `keep_prob`. Rate should be set to `rate = 1 - keep_prob`.


In [32]:
x = np.array([[[0, 0, 0],
               [0, 0, 0],
               [0, 0, 0]]])

x = np.random.randn(1, 3, 3)

pi, v = nnet.model.predict(x)
print('pi', pi)
print('v', v)

pi [[0.09965038 0.10018878 0.09965935 0.10008251 0.09986302 0.09960129
  0.10009368 0.10007407 0.10069098 0.10009595]]
v [[-0.00376045]]


# NNETWrapper

In [33]:
class NNetWrapper:
    """
    This class specifies the base NeuralNet class. To define your own neural
    network, subclass this class and implement the functions below. The neural
    network does not consider the current player, and instead only deals with
    the canonical form of the board.

    See othello/NNet.py for an example implementation.
    """
    def __init__(self, game, args):
        self.nnet = TicTacToeNNet(game, args)
        self.board_x, self.board_y = game.getBoardSize()
        self.action_size = game.getActionSize()

    def train(self, examples):
        """
        This function trains the neural network with examples obtained from
        self-play.

        Input:
            examples: a list of training examples, where each example is of form
                      (board, pi, v). pi is the MCTS informed policy vector for
                      the given board, and v is its value. The examples has
                      board in its canonical form.
        """
        # examples: list of examples, each example is of form (board, pi, v)
        input_boards, target_pis, target_vs = list(zip(*examples))
        input_boards = np.asarray(input_boards)
        target_pis = np.asarray(target_pis)
        target_vs = np.asarray(target_vs)
        self.nnet.model.fit(x = input_boards, y = [target_pis, target_vs], batch_size = args.batch_size, epochs = args.epochs)

    def predict(self, board):
        """
        Input:
            board: current board in its canonical form.

        Returns:
            pi: a policy vector for the current board- a numpy array of length
                game.getActionSize
            v: a float in [-1,1] that gives the value of the current board
        """
        # board: np array with board
        
        # timing
        start = time.time()

        # preparing input
        board = board[np.newaxis, :, :]

        # run
        pi, v = self.nnet.model.predict(board)

        #print('PREDICTION TIME TAKEN : {0:03f}'.format(time.time()-start))
        return pi[0], v[0]

    def save_checkpoint(self, folder='checkpoint', filename='checkpoint.pth.tar'):
        """
        Saves the current neural network (with its parameters) in
        folder/filename
        """
        filepath = os.path.join(folder, filename)
        if not os.path.exists(folder):
            print("Checkpoint Directory does not exist! Making directory {}".format(folder))
            os.mkdir(folder)
        else:
            print("Checkpoint Directory exists! ")
        self.nnet.model.save_weights(filepath)

    def load_checkpoint(self, folder='checkpoint', filename='checkpoint.pth.tar'):
        """
        Loads parameters of the neural network from folder/filename
        """
        # https://github.com/pytorch/examples/blob/master/imagenet/main.py#L98
        filepath = os.path.join(folder, filename)
        if not os.path.exists(filepath):
            raise("No model in path '{}'".format(filepath))
        self.nnet.model.load_weights(filepath)


In [34]:
args = dotdict({
    'lr': 0.001,
    'dropout': 0.3,
    'epochs': 10,
    'batch_size': 64,
    'cuda': False,
    'num_channels': 512,
})

In [35]:
game = TicTacToeGame(n=3)

In [36]:
nnet_wrap = NNetWrapper(game, args)

In [37]:
nnet_wrap.board_x

3

In [38]:
nnet_wrap.action_size

10

In [39]:
bb = game.getInitBoard()
p, v = nnet_wrap.predict(bb)
print(p)
print(v)

[0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1]
[0.]


# MCTS

In [40]:
EPS = 1e-8

In [41]:
class MCTS():
    """
    This class handles the MCTS tree.
    """

    def __init__(self, game, nnet, args):
        self.game = game
        self.nnet = nnet
        self.args = args
        self.Qsa = {}       # stores Q values for s,a (as defined in the paper)
        self.Nsa = {}       # stores #times edge s,a was visited
        self.Ns = {}        # stores #times board s was visited
        self.Ps = {}        # stores initial policy (returned by neural net)

        self.Es = {}        # stores game.getGameEnded ended for board s
        self.Vs = {}        # stores game.getValidMoves for board s

    def getActionProb(self, canonicalBoard, temp=1):
        """
        This function performs numMCTSSims simulations of MCTS starting from
        canonicalBoard.

        Returns:
            probs: a policy vector where the probability of the ith action is
                   proportional to Nsa[(s,a)]**(1./temp)
        """
        for i in range(self.args.numMCTSSims):
            self.search(canonicalBoard)

        s = self.game.stringRepresentation(canonicalBoard)
        counts = [self.Nsa[(s,a)] if (s,a) in self.Nsa else 0 for a in range(self.game.getActionSize())]

        if temp==0:
            bestA = np.argmax(counts)
            probs = [0]*len(counts)
            probs[bestA]=1
            return probs

        counts = [x**(1./temp) for x in counts]
        probs = [x/float(sum(counts)) for x in counts]
        return probs


    def search(self, canonicalBoard):
        """
        This function performs one iteration of MCTS. It is recursively called
        till a leaf node is found. The action chosen at each node is one that
        has the maximum upper confidence bound as in the paper.

        Once a leaf node is found, the neural network is called to return an
        initial policy P and a value v for the state. This value is propogated
        up the search path. In case the leaf node is a terminal state, the
        outcome is propogated up the search path. The values of Ns, Nsa, Qsa are
        updated.

        NOTE: the return values are the negative of the value of the current
        state. This is done since v is in [-1,1] and if v is the value of a
        state for the current player, then its value is -v for the other player.

        Returns:
            v: the negative of the value of the current canonicalBoard
        """

        s = self.game.stringRepresentation(canonicalBoard)

        if s not in self.Es:
            self.Es[s] = self.game.getGameEnded(canonicalBoard, 1)
        if self.Es[s]!=0:
            # terminal node
            return -self.Es[s]

        if s not in self.Ps:
            # leaf node
            self.Ps[s], v = self.nnet.predict(canonicalBoard)
            valids = self.game.getValidMoves(canonicalBoard, 1)
            self.Ps[s] = self.Ps[s]*valids      # masking invalid moves
            sum_Ps_s = np.sum(self.Ps[s])
            if sum_Ps_s > 0:
                self.Ps[s] /= sum_Ps_s    # renormalize
            else:
                # if all valid moves were masked make all valid moves equally probable
                
                # NB! All valid moves may be masked if either your NNet architecture is insufficient or you've get overfitting or something else.
                # If you have got dozens or hundreds of these messages you should pay attention to your NNet and/or training process.   
                print("All valid moves were masked, do workaround.")
                self.Ps[s] = self.Ps[s] + valids
                self.Ps[s] /= np.sum(self.Ps[s])

            self.Vs[s] = valids
            self.Ns[s] = 0
            return -v

        valids = self.Vs[s]
        cur_best = -float('inf')
        best_act = -1

        # pick the action with the highest upper confidence bound
        for a in range(self.game.getActionSize()):
            if valids[a]:
                if (s,a) in self.Qsa:
                    u = self.Qsa[(s,a)] + self.args.cpuct*self.Ps[s][a]*math.sqrt(self.Ns[s])/(1+self.Nsa[(s,a)])
                else:
                    u = self.args.cpuct*self.Ps[s][a]*math.sqrt(self.Ns[s] + EPS)     # Q = 0 ?

                if u > cur_best:
                    cur_best = u
                    best_act = a

        a = best_act
        next_s, next_player = self.game.getNextState(canonicalBoard, 1, a)
        next_s = self.game.getCanonicalForm(next_s, next_player)

        v = self.search(next_s)

        if (s,a) in self.Qsa:
            self.Qsa[(s,a)] = (self.Nsa[(s,a)]*self.Qsa[(s,a)] + v)/(self.Nsa[(s,a)]+1)
            self.Nsa[(s,a)] += 1

        else:
            self.Qsa[(s,a)] = v
            self.Nsa[(s,a)] = 1

        self.Ns[s] += 1
        return -v


# Arena

In [42]:
HIDE_CURSOR = '\x1b[?25l'
SHOW_CURSOR = '\x1b[?25h'

In [43]:
class WritelnMixin(object):
    hide_cursor = False

    def __init__(self, message=None, **kwargs):
        super(WritelnMixin, self).__init__(**kwargs)
        if message:
            self.message = message

        if self.file.isatty() and self.hide_cursor:
            print(HIDE_CURSOR, end='', file=self.file)

    def clearln(self):
        #if self.file.isatty():
            print('\r\x1b[K', end='', file=self.file)

    def writeln(self, line): 
        #if self.file.isatty():
            self.clearln()
            print(line, end='', file=self.file)
            self.file.flush()

    def finish(self):
        #if self.file.isatty(): - snair, to print to stderr logfile
            print(file=self.file)
            if self.hide_cursor:
                print(SHOW_CURSOR, end='', file=self.file)

In [44]:
class Infinite(object):
    file = sys.stdout
    sma_window = 10         # Simple Moving Average window

    def __init__(self, *args, **kwargs):
        self.index = 0
        self.start_ts = time.time()
        self.avg = 0
        self._ts = self.start_ts
        self._xput = collections.deque(maxlen=self.sma_window)
        for key, val in kwargs.items():
            setattr(self, key, val)

    def __getitem__(self, key):
        if key.startswith('_'):
            return None
        return getattr(self, key, None)

    @property
    def elapsed(self):
        return int(time.time() - self.start_ts)

    @property
    def elapsed_td(self):
        return datetime.timedelta(seconds=self.elapsed)

    def update_avg(self, n, dt):
        if n > 0:
            self._xput.append(dt / n)
            self.avg = sum(self._xput) / len(self._xput)

    def update(self):
        pass

    def start(self):
        pass

    def finish(self):
        pass

    def next(self, n=1):
        now = time.time()
        dt = now - self._ts
        self.update_avg(n, dt)
        self._ts = now
        self.index = self.index + n
        self.update()

    def iter(self, it):
        try:
            for x in it:
                yield x
                self.next()
        finally:
            self.finish()

In [45]:
class Progress(Infinite):
    def __init__(self, *args, **kwargs):
        super(Progress, self).__init__(*args, **kwargs)
        self.max = kwargs.get('max', 100)

    @property
    def eta(self):
        return int(math.ceil(self.avg * self.remaining))

    @property
    def eta_td(self):
        return datetime.timedelta(seconds=self.eta)

    @property
    def percent(self):
        return self.progress * 100

    @property
    def progress(self):
        return min(1, self.index / self.max)

    @property
    def remaining(self):
        return max(self.max - self.index, 0)

    def start(self):
        self.update()

    def goto(self, index):
        incr = index - self.index
        self.next(incr)

    def iter(self, it):
        try:
            self.max = len(it)
        except TypeError:
            pass

        try:
            for x in it:
                yield x
                self.next()
        finally:
            self.finish()


In [46]:
class Bar(WritelnMixin, Progress):
    width = 32
    message = ''
    suffix = '%(index)d/%(max)d'
    bar_prefix = ' |'
    bar_suffix = '| '
    empty_fill = ' '
    fill = '#'
    hide_cursor = True

    def update(self):
        filled_length = int(self.width * self.progress)
        empty_length = self.width - filled_length

        message = self.message % self
        bar = self.fill * filled_length
        empty = self.empty_fill * empty_length
        suffix = self.suffix % self
        line = ''.join([message, self.bar_prefix, bar, empty, self.bar_suffix,
                        suffix])
        self.writeln(line)

In [47]:
class AverageMeter(object):
    """Computes and stores the average and current value
       Imported from https://github.com/pytorch/examples/blob/master/imagenet/main.py#L247-L262
    """
    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

In [48]:
class Arena():
    """
    An Arena class where any 2 agents can be pit against each other.
    """
    def __init__(self, player1, player2, game, display=None):
        """
        Input:
            player 1,2: two functions that takes board as input, return action
            game: Game object
            display: a function that takes board as input and prints it (e.g.
                     display in othello/OthelloGame). Is necessary for verbose
                     mode.

        see othello/OthelloPlayers.py for an example. See pit.py for pitting
        human players/other baselines with each other.
        """
        self.player1 = player1
        self.player2 = player2
        self.game = game
        self.display = display

    def playGame(self, verbose=False):
        """
        Executes one episode of a game.

        Returns:
            either
                winner: player who won the game (1 if player1, -1 if player2)
            or
                draw result returned from the game that is neither 1, -1, nor 0.
        """
        players = [self.player2, None, self.player1]
        curPlayer = 1
        board = self.game.getInitBoard()
        it = 0
        while self.game.getGameEnded(board, curPlayer)==0:
            it+=1
            if verbose:
                assert(self.display)
                print("Turn ", str(it), "Player ", str(curPlayer))
                self.display(board)
            action = players[curPlayer+1](self.game.getCanonicalForm(board, curPlayer))

            valids = self.game.getValidMoves(self.game.getCanonicalForm(board, curPlayer),1)

            if valids[action]==0:
                print(action)
                assert valids[action] >0
            board, curPlayer = self.game.getNextState(board, curPlayer, action)
        if verbose:
            assert(self.display)
            print("Game over: Turn ", str(it), "Result ", str(self.game.getGameEnded(board, 1)))
            self.display(board)
        return self.game.getGameEnded(board, 1)

    def playGames(self, num, verbose=False):
        """
        Plays num games in which player1 starts num/2 games and player2 starts
        num/2 games.

        Returns:
            oneWon: games won by player1
            twoWon: games won by player2
            draws:  games won by nobody
        """
        eps_time = AverageMeter()
        bar = Bar('Arena.playGames', max=num)
        end = time.time()
        eps = 0
        maxeps = int(num)

        num = int(num/2)
        oneWon = 0
        twoWon = 0
        draws = 0
        for _ in range(num):
            gameResult = self.playGame(verbose=verbose)
            if gameResult==1:
                oneWon+=1
            elif gameResult==-1:
                twoWon+=1
            else:
                draws+=1
            # bookkeeping + plot progress
            eps += 1
            eps_time.update(time.time() - end)
            end = time.time()
            bar.suffix  = '({eps}/{maxeps}) Eps Time: {et:.3f}s | Total: {total:} | ETA: {eta:}'.format(eps=eps+1, maxeps=maxeps, et=eps_time.avg,
                                                                                                       total=bar.elapsed_td, eta=bar.eta_td)
            bar.next()

        self.player1, self.player2 = self.player2, self.player1
        
        for _ in range(num):
            gameResult = self.playGame(verbose=verbose)
            if gameResult==-1:
                oneWon+=1                
            elif gameResult==1:
                twoWon+=1
            else:
                draws+=1
            # bookkeeping + plot progress
            eps += 1
            eps_time.update(time.time() - end)
            end = time.time()
            bar.suffix  = '({eps}/{maxeps}) Eps Time: {et:.3f}s | Total: {total:} | ETA: {eta:}'.format(eps=eps+1, maxeps=num, et=eps_time.avg,
                                                                                                       total=bar.elapsed_td, eta=bar.eta_td)
            bar.next()
            
        bar.finish()

        return oneWon, twoWon, draws


# Coach

In [51]:
import pickle
import random

In [52]:
class Coach():
    """
    This class executes the self-play + learning. It uses the functions defined
    in Game and NeuralNet. args are specified in main.py.
    """
    def __init__(self, game, nnet, args, args_nnet):
        self.game = game
        self.nnet = nnet
        self.pnet = self.nnet.__class__(self.game, args_nnet)  # the competitor network
        self.args = args
        self.mcts = MCTS(self.game, self.nnet, self.args)
        self.trainExamplesHistory = []    # history of examples from args.numItersForTrainExamplesHistory latest iterations
        self.skipFirstSelfPlay = False # can be overriden in loadTrainExamples()

    def executeEpisode(self):
        """
        This function executes one episode of self-play, starting with player 1.
        As the game is played, each turn is added as a training example to
        trainExamples. The game is played till the game ends. After the game
        ends, the outcome of the game is used to assign values to each example
        in trainExamples.

        It uses a temp=1 if episodeStep < tempThreshold, and thereafter
        uses temp=0.

        Returns:
            trainExamples: a list of examples of the form (canonicalBoard,pi,v)
                           pi is the MCTS informed policy vector, v is +1 if
                           the player eventually won the game, else -1.
        """
        trainExamples = []
        board = self.game.getInitBoard()
        self.curPlayer = 1
        episodeStep = 0

        while True:
            episodeStep += 1
            canonicalBoard = self.game.getCanonicalForm(board,self.curPlayer)
            temp = int(episodeStep < self.args.tempThreshold)

            pi = self.mcts.getActionProb(canonicalBoard, temp=temp)
            sym = self.game.getSymmetries(canonicalBoard, pi)
            for b,p in sym:
                trainExamples.append([b, self.curPlayer, p, None])

            action = np.random.choice(len(pi), p=pi)
            board, self.curPlayer = self.game.getNextState(board, self.curPlayer, action)

            r = self.game.getGameEnded(board, self.curPlayer)

            if r!=0:
                return [(x[0],x[2],r*((-1)**(x[1]!=self.curPlayer))) for x in trainExamples]

    def learn(self):
        """
        Performs numIters iterations with numEps episodes of self-play in each
        iteration. After every iteration, it retrains neural network with
        examples in trainExamples (which has a maximium length of maxlenofQueue).
        It then pits the new neural network against the old one and accepts it
        only if it wins >= updateThreshold fraction of games.
        """

        for i in range(1, self.args.numIters+1):
            # bookkeeping
            print('------ITER ' + str(i) + '------')
            # examples of the iteration
            if not self.skipFirstSelfPlay or i>1:
                iterationTrainExamples = collections.deque([], maxlen=self.args.maxlenOfQueue)
    
                eps_time = AverageMeter()
                bar = Bar('Self Play', max=self.args.numEps)
                end = time.time()
    
                for eps in range(self.args.numEps):
                    self.mcts = MCTS(self.game, self.nnet, self.args)   # reset search tree
                    iterationTrainExamples += self.executeEpisode()
    
                    # bookkeeping + plot progress
                    eps_time.update(time.time() - end)
                    end = time.time()
                    bar.suffix  = '({eps}/{maxeps}) Eps Time: {et:.3f}s | Total: {total:} | ETA: {eta:}'.format(eps=eps+1, maxeps=self.args.numEps, et=eps_time.avg,
                                                                                                               total=bar.elapsed_td, eta=bar.eta_td)
                    bar.next()
                bar.finish()

                # save the iteration examples to the history 
                self.trainExamplesHistory.append(iterationTrainExamples)
                
            if len(self.trainExamplesHistory) > self.args.numItersForTrainExamplesHistory:
                print("len(trainExamplesHistory) =", len(self.trainExamplesHistory), " => remove the oldest trainExamples")
                self.trainExamplesHistory.pop(0)
            # backup history to a file
            # NB! the examples were collected using the model from the previous iteration, so (i-1)  
            self.saveTrainExamples(i-1)
            
            # shuffle examples before training
            trainExamples = []
            for e in self.trainExamplesHistory:
                trainExamples.extend(e)
            random.shuffle(trainExamples)

            # training new network, keeping a copy of the old one
            self.nnet.save_checkpoint(folder=self.args.checkpoint, filename='temp.pth.tar')
            self.pnet.load_checkpoint(folder=self.args.checkpoint, filename='temp.pth.tar')
            pmcts = MCTS(self.game, self.pnet, self.args)
            
            self.nnet.train(trainExamples)
            nmcts = MCTS(self.game, self.nnet, self.args)

            print('PITTING AGAINST PREVIOUS VERSION')
            arena = Arena(lambda x: np.argmax(pmcts.getActionProb(x, temp=0)),
                          lambda x: np.argmax(nmcts.getActionProb(x, temp=0)), self.game)
            pwins, nwins, draws = arena.playGames(self.args.arenaCompare)

            print('NEW/PREV WINS : %d / %d ; DRAWS : %d' % (nwins, pwins, draws))
            if pwins+nwins == 0 or float(nwins)/(pwins+nwins) < self.args.updateThreshold:
                print('REJECTING NEW MODEL')
                self.nnet.load_checkpoint(folder=self.args.checkpoint, filename='temp.pth.tar')
            else:
                print('ACCEPTING NEW MODEL')
                self.nnet.save_checkpoint(folder=self.args.checkpoint, filename=self.getCheckpointFile(i))
                self.nnet.save_checkpoint(folder=self.args.checkpoint, filename='best.pth.tar')                

    def getCheckpointFile(self, iteration):
        return 'checkpoint_' + str(iteration) + '.pth.tar'

    def saveTrainExamples(self, iteration):
        folder = self.args.checkpoint
        if not os.path.exists(folder):
            os.makedirs(folder)
        filename = os.path.join(folder, self.getCheckpointFile(iteration)+".examples")
        with open(filename, "wb+") as f:
            pickle.Pickler(f).dump(self.trainExamplesHistory)
        f.closed

    def loadTrainExamples(self):
        modelFile = os.path.join(self.args.load_folder_file[0], self.args.load_folder_file[1])
        examplesFile = modelFile+".examples"
        if not os.path.isfile(examplesFile):
            print(examplesFile)
            r = input("File with trainExamples not found. Continue? [y|n]")
            if r != "y":
                sys.exit()
        else:
            print("File with trainExamples found. Read it.")
            with open(examplesFile, "rb") as f:
                self.trainExamplesHistory = pickle.Unpickler(f).load()
            f.closed
            # examples based on the model were already collected (loaded)
            self.skipFirstSelfPlay = True


In [53]:
args_net = dotdict({
    'lr': 0.001,
    'dropout': 0.3,
    'epochs': 10,
    'batch_size': 64,
    'cuda': False,
    'num_channels': 512,
})

In [56]:
args_coach = dotdict({
    'numIters': 5,
    'numEps': 25,
    'tempThreshold': 15,
    'updateThreshold': 0.6,
    'maxlenOfQueue': 200000,
    'numMCTSSims': 25,
    'arenaCompare': 40,
    'cpuct': 1,

    'checkpoint': './temp/',
    'load_model': False,
    'load_folder_file': ('./pretrained_models/tictactoe/keras','best.pth.tar'),
    'numItersForTrainExamplesHistory': 10,

})

In [57]:
g = TicTacToeGame(n=3)
nnet = NNetWrapper(g, args_net)

# path = '/home/marcin/Repos/alpha-zero-general/pretrained_models/tictactoe/keras/'
# n1.load_checkpoint(path, 'best-25eps-25sim-10epch.pth.tar')

c = Coach(g, nnet, args_coach, args_net)

In [58]:
c.learn()

------ITER 1------
[KSelf Play |################################| (25/25) Eps Time: 0.509s | Total: 0:00:12 | ETA: 0:00:01
[?25hCheckpoint Directory exists! 
Instructions for updating:
Use tf.cast instead.
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
PITTING AGAINST PREVIOUS VERSION
[KArena.playGames |################################| (41/20) Eps Time: 0.308s | Total: 0:00:12 | ETA: 0:00:01
[?25hNEW/PREV WINS : 11 / 7 ; DRAWS : 22
ACCEPTING NEW MODEL
Checkpoint Directory exists! 
Checkpoint Directory exists! 
------ITER 2------
[KSelf Play |################################| (25/25) Eps Time: 0.297s | Total: 0:00:07 | ETA: 0:00:01
[?25hCheckpoint Directory exists! 
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
PITTING AGAINST PREVIOUS VERSION
[KArena.playGames |################################| (41/20) Eps Time: 0.233s | Total: 0:00:09 | ETA: 0:00:01


# PIT

In [59]:
g = TicTacToeGame(n=3)

rp = RandomPlayer(g).play
hp = HumanTicTacToePlayer(g).play

In [60]:
args = dotdict({
    'lr': 0.001,
    'dropout': 0.3,
    'epochs': 10,
    'batch_size': 64,
    'cuda': False,
    'num_channels': 512,
})

In [61]:
n1 = NNetWrapper(g, args)
# path = '/home/marcin/Repos/alpha-zero-general/pretrained_models/tictactoe/keras/'
path = 'temp/'
n1.load_checkpoint(path, 'best.pth.tar')
args1 = dotdict({'numMCTSSims': 25, 'cpuct':1.0})
mcts1 = MCTS(g, n1, args1)
n1p = lambda x: np.argmax(mcts1.getActionProb(x, temp=0))

In [62]:
arena = Arena(n1p, hp, g, display=display)

In [63]:
print(arena.playGames(3, verbose=True))

Turn  1 Player  1
   0 1 2 
  --------
0 |- - - |
1 |- - - |
2 |- - - |
  --------
Turn  2 Player  -1
   0 1 2 
  --------
0 |O - - |
1 |- - - |
2 |- - - |
  --------
0 1
0 2
1 0
1 1
1 2
2 0
2 1
2 2


 1 1


Turn  3 Player  1
   0 1 2 
  --------
0 |O - - |
1 |- X - |
2 |- - - |
  --------
Turn  4 Player  -1
   0 1 2 
  --------
0 |O - - |
1 |- X O |
2 |- - - |
  --------
0 1
0 2
1 0
2 0
2 1
2 2


 2 0


Turn  5 Player  1
   0 1 2 
  --------
0 |O - - |
1 |- X O |
2 |X - - |
  --------
Turn  6 Player  -1
   0 1 2 
  --------
0 |O - O |
1 |- X O |
2 |X - - |
  --------
0 1
1 0
2 1
2 2


 2 2


Turn  7 Player  1
   0 1 2 
  --------
0 |O - O |
1 |- X O |
2 |X - X |
  --------
Game over: Turn  7 Result  1
   0 1 2 
  --------
0 |O O O |
1 |- X O |
2 |X - X |
  --------
[KArena.playGames |##########                      | (2/3) Eps Time: 25.220s | Total: 0:00:25 | ETA: 0:00:00Turn  1 Player  1
   0 1 2 
  --------
0 |- - - |
1 |- - - |
2 |- - - |
  --------
0 0
0 1
0 2
1 0
1 1
1 2
2 0
2 1
2 2


 1 1


Turn  2 Player  -1
   0 1 2 
  --------
0 |- - - |
1 |- O - |
2 |- - - |
  --------
Turn  3 Player  1
   0 1 2 
  --------
0 |- - - |
1 |- O - |
2 |- - X |
  --------
0 0
0 1
0 2
1 0
1 2
2 0
2 1


 0 0


Turn  4 Player  -1
   0 1 2 
  --------
0 |O - - |
1 |- O - |
2 |- - X |
  --------
Turn  5 Player  1
   0 1 2 
  --------
0 |O - - |
1 |- O - |
2 |X - X |
  --------
0 1
0 2
1 0
1 2
2 1


 2 1


Turn  6 Player  -1
   0 1 2 
  --------
0 |O - - |
1 |- O - |
2 |X O X |
  --------
Turn  7 Player  1
   0 1 2 
  --------
0 |O X - |
1 |- O - |
2 |X O X |
  --------
0 2
1 0
1 2


 1 2


Turn  8 Player  -1
   0 1 2 
  --------
0 |O X - |
1 |- O O |
2 |X O X |
  --------
Turn  9 Player  1
   0 1 2 
  --------
0 |O X - |
1 |X O O |
2 |X O X |
  --------
0 2


 0 2


Game over: Turn  9 Result  0.0001
   0 1 2 
  --------
0 |O X O |
1 |X O O |
2 |X O X |
  --------
[KArena.playGames |#####################           | (3/1) Eps Time: 20.361s | Total: 0:00:40 | ETA: 0:00:51
[?25h(1, 0, 1)
