Implementation of Alpha Zero General.
This implementation is based on https://github.com/suragnair/alpha-zero-general/blob/master/Coach.py
Python shogi library is here https://github.com/gunyarakun/python-shogi

In [1]:
class Game():
    """
    This class specifies the base Game class. To define your own game, subclass
    this class and implement the functions below. This works when the game is
    two-player, adversarial and turn-based.
    Use 1 for player1 and -1 for player2.
    See othello/OthelloGame.py for an example implementation.
    """
    def __init__(self):
        pass

    def getInitBoard(self):
        """
        Returns:
            startBoard: a representation of the board (ideally this is the form
                        that will be the input to your neural network)
        """
        pass

    def getBoardSize(self):
        """
        Returns:
            (x,y): a tuple of board dimensions
        """
        pass

    def getActionSize(self):
        """
        Returns:
            actionSize: number of all possible actions
        """
        pass

    def getNextState(self, board, player, action):
        """
        Input:
            board: current board
            player: current player (1 or -1)
            action: action taken by current player
        Returns:
            nextBoard: board after applying action
            nextPlayer: player who plays in the next turn (should be -player)
        """
        pass

    def getValidMoves(self, board, player):
        """
        Input:
            board: current board
            player: current player
        Returns:
            validMoves: a binary vector of length self.getActionSize(), 1 for
                        moves that are valid from the current board and player,
                        0 for invalid moves
        """
        pass

    def getGameEnded(self, board, player):
        """
        Input:
            board: current board
            player: current player (1 or -1)
        Returns:
            r: 0 if game has not ended. 1 if player won, -1 if player lost,
               small non-zero value for draw.
               
        """
        pass

    def getCanonicalForm(self, board, player):
        """
        Input:
            board: current board
            player: current player (1 or -1)
        Returns:
            canonicalBoard: returns canonical form of board. The canonical form
                            should be independent of player. For e.g. in chess,
                            the canonical form can be chosen to be from the pov
                            of white. When the player is white, we can return
                            board as is. When the player is black, we can invert
                            the colors and return the board.
        """
        pass

    def getSymmetries(self, board, pi):
        """
        Input:
            board: current board
            pi: policy vector of size self.getActionSize()
        Returns:
            symmForms: a list of [(board,pi)] where each tuple is a symmetrical
                       form of the board and the corresponding pi vector. This
                       is used when training the neural network from examples.
        """
        pass

    def stringRepresentation(self, board):
        """
        Input:
            board: current board
        Returns:
            boardString: a quick conversion of board to a string format.
                         Required by MCTS for hashing.
        """
        pass


In [12]:
import shogi
import numpy as np

NUM_BOARD_CHANNEL = 28
NUM_PRISONER_CHANNEL = 14
NUM_REPETITION_CHANNEL = 3
NUM_BOARD_SIZE = 9

NUM_PRISONER = 7

class ShogiGame(Game):
    def __init__(self):
        self.board = shogi.Board()
    
    
    def getInitState(self):
        # return initialized state
        self.board.reset()
        return self.getState()
        
    
    def getStateShape(self):
        return (NUM_BOARD_CHANNEL + NUM_PRISONER_CHANNEL + NUM_REPETITION_CHANNEL, NUM_BOARD_SIZE, NUM_BOARD_SIZE)
    
    
    def getNextState(self, current_player, tensor_action):
        # TODO : DECIDE the representation of actions to aplly it into usi forms
        
        # string_action = self.getStringAction(tensor_action)
        # self.board.push(shogi.Move.from_usi(string_action)
        # return self.getState()
        """
            The action output of neural net is represented as 3-D tensor like [CHANNEL,ROW,COL] filled with possibility of each action.
            The representation is divided into two way.
            For first (64 + 2) * 2(For Promotion) CHANNELs, CHANNEL specify the Movement(QUEEN MOVE + KNIGHT MOVE). And other 7 CHANNELs, CHANNEL corresponds to prisoner.
            
        """
        
    def getStringAction(self, tensor_action):
        """
        args tensor_action :3-D tensor like [CHANNEL, ROW, COL]
        return :action(like "7g7f")
        """
        pass
    
    
    def getState(self, current_player=shogi.BLACK):
        """
        return: State as binary expression with numpy 3-dimensional array.
                First dimension stands for channels which corresponds to piece_type, 
                prisoners or special rules like repetation
                Second is for rows and third is for columns.
                
        """
        # First dimension size is 28 for pieces on the board, 14 for prisoners, 3 for repetision
        state = np.zeros(self.getStateShape(), dtype=np.int8)
        
        # BOARD STATE (from 0 to 27 CHANNELS)
        for cell in shogi.SQUARES:
            piece = self.board.piece_at(cell)
            if piece:
                # calculate channel number for the piece_type
                channel = piece.color * len(shogi.PIECE_TYPES) + (piece.piece_type - 1)
                # NOTE: col index starts from right to left. 
                row = cell // 9
                col = cell - row * 9
                state[channel,row,col] = 1

        # PRISONER CHANNEL (from 28 to 41 CHANNELS)
        prison = self.board.pieces_in_hand
        for color in shogi.COLORS:
            for piece_type in range(1, NUM_PRISONER + 1):
                channel = NUM_BOARD_CHANNEL + color * NUM_PRISONER + (piece_type - 1) 
                if prison[color][piece_type]:
                    print(channel)
                    state[channel,0,:prison[color][piece_type]] = 1
                
        # REPETITION CHANNEL (from 42 to 44 CHANNELS)
        repetition = self.board.transpositions[self.board.zobrist_hash()] - 1
        for repeat in range(repetition):
            state[NUM_BOARD_CHANNEL + NUM_PRISONER_CHANNEL + repeat] += 1
            if repeat > 4:
                print("GAME IS OVER")
                sys.exit(1)
        if current_player:
            state = self.changePerspective(state)
            
        return state
    
    
    def changePerspective(self, state):
        # return reversed state to represent it from opponent's perspective
        
        # reverse the perspective of board
        state[:NUM_BOARD_CHANNEL,:,:] = state[:NUM_BOARD_CHANNEL,::-1,::-1]
        temp = state[:len(shogi.PIECE_TYPES),:,:]
        state[:len(shogi.PIECE_TYPES),:,:] = state[len(shogi.PIECE_TYPES):NUM_BOARD_CHANNEL,:,:]
        state[len(shogi.PIECE_TYPES):NUM_BOARD_CHANNEL,:,:] = temp
        
        # reverse prisonors information
        temp = state[NUM_BOARD_CHANNEL:NUM_BOARD_CHANNEL + NUM_PRISONER,:,:]
        state[NUM_BOARD_CHANNEL:NUM_BOARD_CHANNEL + NUM_PRISONER,:,:] = state[NUM_BOARD_CHANNEL + NUM_PRISONER:NUM_BOARD_CHANNEL + NUM_PRISONER_CHANNEL]
        state[NUM_BOARD_CHANNEL + NUM_PRISONER:NUM_BOARD_CHANNEL + NUM_PRISONER_CHANNEL] = temp
        return state
    
    
    def __str__(self):
        return self.board.__str__()
        
game = ShogiGame()
game.board.push(shogi.Move.from_usi("7g7c+"))
print(game)
print(game.getState(current_player=shogi.BLACK)[28 + 14])


 l  n  s  g  k  g  s  n  l
 .  r  .  .  .  .  .  b  .
 p  p +P  p  p  p  p  p  p
 .  .  .  .  .  .  .  .  .
 .  .  .  .  .  .  .  .  .
 .  .  .  .  .  .  .  .  .
 P  P  .  P  P  P  P  P  P
 .  B  .  .  .  .  .  R  .
 L  N  S  G  K  G  S  N  L

 P*1
28
[[0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0]]


In [18]:
import shogi
class tempShogi():
    def __init__(self,):
        self.board = shogi.Board()
        
    def move(self, usi):
        self.board.push(shogi.Move.from_usi(usi))
        
        
    def __str__(self,):
        return self.board.__str__()

game = tempShogi()
game.move("7g7c+")
print(game)
game.board.piece_at(shogi.A5).piece_type

 l  n  s  g  k  g  s  n  l
 .  r  .  .  .  .  .  b  .
 p  p +P  p  p  p  p  p  p
 .  .  .  .  .  .  .  .  .
 .  .  .  .  .  .  .  .  .
 .  .  .  .  .  .  .  .  .
 P  P  .  P  P  P  P  P  P
 .  B  .  .  .  .  .  R  .
 L  N  S  G  K  G  S  N  L

 P*1


8

In [9]:
for i in range(0):
    print(i)

In [2]:
from collections import deque
import numpy as np
import torch
import time


class Coach():
    """
    This class executes the self-play + learning.
    It uses the functions defined in Game and NeuralNet.
    """
    
    def __init__(self, game, nnet, args):
        self.game = game
        self.nnet = nnet
        self.pnet = self.nnet.__class__(self.game)
        self.args = args
        self.mcts = MCTS(self.game, self.nnet, self.args)
        self.trainExamplesHistory = []
        self.skipFirstSelfPlay = False
        
    
    def executeEpisode(self):
        """
        This function executes one episode of self-play, starting with player 1.
        As the game is played, each turn is added as a training example to trainExamples.
        The game is played till the game ends.
        AFter game ends the outcome of the game is used to assign values to each Example in trainExamples
        
        
        It uses a temp(erature)=1 if episodeStep < tempThreshold, and thereafter uses temp=0
        
        Returns:
            trainExamples: a list of examples of the form(canonicalBoard, pi, v)
                            pi is the MCTS informed policy vector, v is +1 if the player won, else -1.
        """
        
        trainExamples = []
        state = self.game.getInitState()
        self.curPlayer = 0
        episodeStep = 0
        
        while True:
            episodeStep += 1
            canonicalState = self.game.getCanonicalForm(state, self,curPlayer)
            temp = int(episodeStep < self.args.tempThreshold)
            
            pi = self.mcts.getActionProb(canonicalState, temp=temp)
            sym = self.game.getSymmetries(canonicalState, pi)
            for s, p in sym:
                trainExamples.append([s, self.curPlayer, p, None])
                
            action = np.random.choice(len(pi), p=pi)
            state, self.curPlayer = self.game.getNextState(state, self.curPlayer, action)
            
            r = self.game.getGameEnded(state, self.curPlayer)
            
            if r!=0:
                return [(x[0],x[2], r*((-1) ** (x[1]!=self.curPlayer))) for x in trainExamples]
            
            
    def learn(self):
        """
        Performs numIters iterations with numEps episodes of self-play in each iteration.
        After Every iteration, it retrains neural network with examples in trainExamples.
        It then pits the new neural network against the old one and accepts it only if it wins >= updateThreshold fraction of games
        """
        
        for i in range(1, self.args.numIter+1):
            print('------ITER ' + str(i) + '------')
            if not self.skipFirstSelfPlay or i>1:
                iterationTrainExamples = deque([], maxlen=self.args.maxlenOfQueue)
                
                for eps in range(self.args.numEps):
                    # reset our MCTS by each episode.
                    self.mcts = MCTS(self.game, self.nnet, self.args)
                    iterationTrainExamples += self.executeEpisode()
                    
                # save the iteration examples to the history
                self.trainExamplesHistory.append(iterationTrainExamples)
            
            if len(self.trainExamplesHistory) > self.args.numItersForTrainExamplesHistory:
                print("len(trainExamplesHistory) =", len(self.trainExamplesHistory), " => remove the oldest trainExamples")
                self.trainExampleHistory.pop(0)
            # backup history to a file
            # warning! the examples were collected using the model from the previous iteration, so (i-1)
            self.saveTrainExamples(i-1)
            
            # shuffle examples before training
            trainExamples = []
            for e in self.trainExamplesHistory:
                trainExamples.extend(e)
            shuffle(trainExamples)
            
            # training new network, keeping a copy of the old one
            self.nnet.save_chackpoint(folder=self.args.checkpoint, filename='temp.pth.tar')
            self.pnet.load_chackpoint(folder=self.args.checkpoint, filename='temp.pth.tar')
            pmcts = MCTS(self.game, self.pnet, self.args)
            
            self.nnet.train(trainExamples)
            nmcts = MCTS(self.game, self.nnet, self.args)
            
            print('PITTING AGAINST PREVIOUS VERSION')
            arena = Arena(lambda x: np.argmax(pmcts.getActionProb(x, temp=0)),
                          lambda x: np.argmax(nmcts.getActionProb(x, temp=0)), self.game)
            pwins, nwins, draws = arena.playGames(self.args.arenaCompare)
            print('NEW/PREV WINS : %d / %d ; DRAWS : %d' % (nwins, pwins, draws))
            
            if pwins+nwins > 0 and float(nwins)/(pwins+nwins) < self.args.updateThreshold:
                print('REJECTING NEW MODEL')
                self.nnet.load_checkpoint(folder=self.args.checkpoint, filename='temp.pth.tar')
            else:
                print('ACCEPTING NEW MODEL')
                self.nnet.save_checkpoint(folder=self.args.checkpoint, filename=self.getCheckpointFile(i))
                self.nnet.save_checkpoint(folder=self.args.checkpoint, filename='best.pth.tar')
                
        
    def getCheckpointFile(self, iteration):
        return 'checkpoint_' + str(iteration) + '.pth.tar'
            
    
    def saveTrainExamples(self, iteration):
        folder = self.args.checkpoint
        if not os.path.exists(folder):
            os.makedirs(folder)
        filename = os.path.join(folder, self.getCheckpointFile(iteration) + '.examples')
        with open(filename, 'wb+') as f:
            Pickler(f).dump(self.trainExamplesHistory)
        f.closed
        
    
    def loadTrainExamples(self):
        modelFile = os.path.join(self.args.load_folder_file[0], self.args.load_folder_file[1])
        examplesFile = modelFile + '.examples'
        if not os.path.isfile(examplesFile):
            print(examplesFile)
            r = input("File with trainExamples not found. Continue? [y|n]")
            if r != "y":
                sys.exit()
        else:
            print("File with trainExamples found. Read it.")
            with open(examplesFile, "rb") as f:
                self.trainExamplesHistory = Unpickler(f).load()
            f.closed

            # examples based on the model were already collected (loaded)
            self.skipFirstSelfPlay = True

In [3]:
import math
import numpy as np
EPS = 1e-8

class MCTS():
    """
    This class handles the MCTS tree.
    """
    
    def __init__(self, game, nnet, args):
        self.game = game
        self.nnet = nnet
        self.args = args
        self.Qsa = {}
        salf.Nsa = {}
        self.Ns = {}
        self.Ps = {}
        
        self.Es = {}
        self.Vs = {}
        
    def getActionProb(self, canonicalState, temp=1):
        """
        This function performs numMCTSSims simulations of MCTS starting from canonicalState.
        
        Returns:
            probs: a policy vector where the probability of the ith action is proportional to Nsa[(s,a)]**(1./temp)
        """
        
        for i in range(self.args.numMCTSSims):
            self.search(canonicalState)
            
        s = self.game.stringRepresentation(canonicalState)
        counts = [self.Nsa[(s,a)] if (s,a) in self.Nsa else 0 for a in range(self.game.getActionSize())]

        if temp==0:
            bestA = np.argmax(counts)
            probs = [0] * len(counts)
            probs[bestA] = 1
            return probs
        
        counts = [x ** (1./temp) for x in counts]
        probs = [x/float(sum(counts)) for x in counts]
        return probs
    
    
    def search(self, canonicalState):
        """
        This function perfroms one iteration of MCTS. It is recursively called
        till a leaf node is found. The action chosen at each node is one that
        has the maximum upper confidence bound as in the paper.
        
        Once a leaf node is found, the neural network is called to return an 
        initial policy P and a value V for the state. This value is propagated
        up the search path. In case the leaf node is a terminal state, the 
        outcome is propagated up the search path. The value of Ns, Nsa, Qsa are 
        updated.
        
        NOTE: the return values are the negative of the value of the current
        state. This is done since v is in [-1, 1] and if v is the value of a
        state for the current player, then its value is -v for the other player.
        
        Returns:
            v: the negative of the value of the current canonicalState
        """
        
        
        s = self.game.stringRepresentation(canonicalState)
        
        if s not in self.Es.keys():
            self.Es[s] = self.game.getGameEnded(canonicalState, 1)
        if self.Es[s] != 0:
            # terminal state
            return -self.Es[s]
        
        
        if s not in self.Ps.keys():
            """
            When you expand a leaf node
            """
            self.Ps[s], v = self.nnet.predict(canonicalState)
            valids = self.game.getValidMoves(canonicalState, 1)
            self.Ps[s] = self.Ps[s]*valids
            sum_Ps_s = np.sum(self.Ps[s])
            if sum_Ps_s > 0:
                self.Ps[s] /= sum_Ps_s
            else:
                """
                If all the valid moves were masked make all valid moves equally probable.
                
                NOTE: All valid moves may be masked if either your NNet architecture is insufficient or you've get overfitting
                If you have got dozens or hundreds of these messages you should pay attention to your NNet and or training process
                """
                print("All valid moves were masked, do wokaround.")
                self.Ps[s] = self.Ps[s] + valids
                self.Ps[s] /= np.sum(self.Ps[s])
                
            self.Vs[s] = valids
            self.Ns[s] = 0
            return -v
        
        """
        When you arrive at a leaf node which you have been to.
        self.Vs[s] is already assigned because you have ever been to this node before.
        """
        valids = self.Vs[s]
        cur_best = -float('inf')
        best_act = -1
        
        # pick the action with highest upper confidence bound
        for a in range(self.game.getActionSize()):
            if valids[a]:
                if (s, a) in self.Qsa:
                    u = self.Qsa[(s,a)] + self.args.cpuct*self.Ps[s][a] * math.sqrt(self.Ns[s]) / (1 + self.Nsa[(s, a)])
                else:
                    u = self.args.cpuct * self.Ps[s][a] * math.sqrt(self.Ns[s] + EPS)
                    
                if u > cur_best:
                    cur_best = u
                    best_act = a
                    
        a = best_act
        next_s, next_player = self.game.getNextState(canonicalState, 1, a)
        next_s = self.game.getCanonicalForm(next_s, next_player)
        
        v = self.search(next_s)
        
        if (s,a) in self.Qsa:
            self.Qsa[(s, a)] = (self.Nsa[(s, a)] * self.Qsa[(s, a)] + v) / (self.Nsa[(s, a)] + 1)
            self.Nsa[(s, a)] += 1
        else:
            self.Qsa[(s,a)] = v
            self.Nsa[(s, a)] = 1
            
        self.Ns[s] += 1
        return -v
    
    

For Test

In [14]:
import torch
import numpy as np
tensor = torch.zeros(139,9,9)
tensor[0][0][0] = 1
print(tensor)

tensor([[[1., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         ...,
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         ...,
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         ...,
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.]],

        ...,

        [[0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         ...,
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0., 