<figure>
  <img
  src="https://www.wei-siyi.com/posts/mcts_port/header.png"
  alt="The beautiful MDN logo.">
  <figcaption>Asymmetric tree growth</figcaption>
</figure>

## Monte Carlo Tree Search with board games

Monte Carlo Tree Search is one famous technique which has been widely used in zero-sum games. MCTS was first introduced in 2006, it is a heuristic search algorithm for decision processes. It combines knowledge from Tree Data Structure, Game Theory and the Monte Carlo Sampling.

MCTS is highly similar to MinMax Tree Search. MinMax Tree Search is also a search algorithm that "Maximize the minimum gain". In short, it was designed for zero-sum games to maximize the minimum reward at every decisions. However, MinMax Tree Search is a greedy algorithm, it will search every possible end node of the game states, then justify who is the winner and return the status to the parents' nodes. MinMax could achieve a very high winning rate. But it also has one critical flaw. MinMax Tree has a quadratic time complexity $O(b^m)$. This flaw limits its usage in lots of board games like GO. To deal with this critical flaw, MCTS then been introduced to reduce time complexity. One of the most famous example, AlphaGO, combined both MCTS and deep learning to improve Monte Carlo Sampling and achieved astounding results.

The core idea of Monte Carlo Tree is simple: Use Monte Carlo to reduce time complexity. Traditionally, MinMax Tree will expand every possible end state and evaluate all of them. However, we could improve this process using Monte Carlo Methods. Since at each incomplete game state, we could expand all the possible moves, then justify the winning rate by running simulations at each move. Then the estimated winning rate will converge to the true winning rate as the amount of simulations increase.

## Simple Demo of MCTS with 4x4 Gomoku

In the following example we could see MCTS could plays really well with human players.



## Principle of Operation for MCTS

We want to formulate the process of Monte Carlo Tree first. If we treat a Monte Carlo Tree search as a black box function. The input of this function will be the current game state. The output of this function will be the next move. Given an example of Tik-Tak-Toe. The input will be the game board and the output will be the estimated optimum move for the given game board. There are 4 main steps in Monte Carlo Tree Search:
* **Selection Process:** The selection start from root node R. Then keep select the successive child nodes **strategically** until one leaf node L is reached. We treat the root node R as the input (current game state) of the function. The leaf node L could be any possible successive child node where no simulations has been initialized.
* **Expansion Process:** After we select the valid child nodes L. Unless L already be the end state, which either represent WIN=1, LOSE=-1 or a DRAW=0. We will expand node L by creating all possible child nodes of L. Then we randomly select one child node C represent the game state after any possible move from L.
* **Simulation Process:** Once we randomly select a child node C. We want to get the estimated winning rate from the current game state C. We then run the Monte Carlo simulations on current game state until the game ends.
* **Back Propagation:** We update the information C to R using the results of the simulation.
* **Optimization:** We then return the move of current game state with the highest estimated winning rate.

![](https://www.wei-siyi.com/posts/mcts_port/MCTS_process.png)

In [1]:
import numpy as np
from copy import deepcopy
import matplotlib.pyplot as plt
import time

class Imcts(object):
    def select(self):
        """
        Selection Process: Select the next node for expansion
        """
        pass

    def expand(self):
        """
        Expansion Process: Expand the current node
        """
        pass

    def simulate(self):
        '''
        Simulation Process: Simulate the from current state for multiple times
        until we get a reasonable estimate of the winning probability
        '''
        pass

    def backprop(self):
        '''
        Back Propagation: Update the simulate information from current node to root node
        '''
        pass

    def run(self):
        '''
        Optimization: Run the whole process and return the optimized move
        '''
        pass

Then we want to define a proper data structure to store the leaf node for the tree. Remember the childrens of each node are only limited by the games, which means there could be multiple children for each node.
We also need to store the node's parent. The current board and player state for this node.
MCTS also needs the times this node getting visited and the times this node wins. We will explain why we need them later. They are very important.

In [2]:
class node():
    def __init__(self, parent, board, player, action = None, n = 0, w = 0, q = 0):
        self.child = {} # The child of this node
        self.parent = parent # The parent of this node
        self.board = board # THe game board (nxn np array) of this board)
        self.player = player # Current player of this board state (Who should play at this step)
        self.action = action # The action that could get you from previous board to the current board
        self.n = n # The times this node getting visited
        self.w = w # The times this node wins (updated from the child node)
        self.q = q # The winning rate

**MCTS Constructor**: 

When we create the MCTS object. We need to set some important parameters which we will need them later. Those important parameters include the maximum simulation times for each decision. The maximum depth for each simulation, and **the explore constant**. We will leave the usage of those parameters later.

In [3]:
class MCTS(Imcts):
    def __init__(self, iterations=100, max_depth=5, game_board=None, win_mark=3, player=None, explore_constant=np.sqrt(2)):
        self.iterations = iterations #The amount of simulations for each decision
        self.max_depth = max_depth #The maximum depth for each simulation
        self.explore_constant = explore_constant #Explore constant for UCT

        self.win_mark = win_mark #How many chess in a line wins
        self.game_board = game_board #Current game board
        self.player = player #Which player plays
        self.total_n = 0 #Totol simulation times

        self.size = len(game_board) #The size of the game board
        
        self.tree = node(None, self.game_board, self.player) #Tree structure to store the data

**Selection**:

Next, follow the previous documentation we created for selection. We want to select the next node for expansion. However, this is actually a hard question.

Think about it, we have run a lot of simulations. We might have these scenarios. We have our first node which has 10000 simulations and the winning rate is about 80%. For the second node which has 100 simulations but only with 60% winning rate. If we want to select one of them to explore more, which one will be the best choice?

The answer is hard to tell. You may want to say the first one is definitely better. However, for the first node it has been explored a lot, which means there will be little change of its expected winning rate. However, for the second node, even though the current winning rate is not as good as the first node, it is still worthy to explore consider the little amount of simulations.

This tradeoff is called exploitation and exploration tradeoff. To solve this problem, Levente Kocsis and Csaba Szepesvári introduced the UCB1 formula to solve this problem. Assume $w_i$ to be the winning times and $n_i$ to be the simulation times of making movement from node $i$, and $N_i$ be the total simulation times of node $i$. Then then UCB1 formula could be derived below:


$$UCB = \frac{w_i}{n_i} + c\sqrt{\frac{log(N_i)}{n_i}}$$

$c$ represents the exploration parameter, which is usually being set to root of 2. This formula is quite intuitive. The first term represents the winning rate of taking the current movement, the second term represents the inverse of the simulation proportion of current movements. Overall we want both of the terms to be large, which gives us a movement with large winning probability and few simulations. Such movements are more worthy to explore.

In [4]:
    def select(self):
            """
            Selection Process: Select the next node for expansion
            """
            node_founded = False
            current_node = self.tree
            depth = 0

            while not node_founded:
                child_nodes = current_node.child
                depth += 1

                if(len(child_nodes) == 0): #If this node is not yet explored
                    node_founded = True
                else:                     #Else, select one from the child (UCT) node and keep searching
                    #Choose the node with maximum UCB value
                    max_uct_value = -100.0
                    for child_node in child_nodes:
                        w = child_node.w
                        n = child_node.n
                        total_n = self.total_n
                        if(n == 0): n = 1e-4 #Avoid numerical error

                        #UCB formula
                        exploitation_value = w / n
                        exploration_value  = np.sqrt(np.log(total_n)/n)
                        uct_value = exploitation_value + self.explore_constant * exploration_value

                        if uct_value > max_uct_value:
                            max_uct_value = uct_value
                            current_node = child_node
            return depth, current_node

**Expansion**:

For the expansion process, we will follow our documention. We expand node L by creating all possible child nodes of L. Then we randomly select one child node C represent the game state after some possible move from L.

In [5]:
    def expand(self, leaf_node):
        """
        Expansion Process: Expand the current node
        """
        leaf_board = leaf_node.board
        winner = get_winners(leaf_board, self.win_mark)
        avaliable_actions, _ = get_valid_actions(leaf_board)

        current_node = leaf_node
        if (winner is None): # Not a terminal state
            avaliable_childs = []
            for action in avaliable_actions: #Grow all the avaliable actions
                board = deepcopy(leaf_node.board)
                current_player = leaf_node.player
                
                #Update the childnode's board status
                if(current_player == 'o'):
                    next_player = 'x'
                    board[action] = 1
                else:
                    next_player = 'o'
                    board[action] = -1

                new_child = node(current_node, board, next_player, action=action)
                avaliable_childs.append(new_child)
            
            #Randomly selection one child to be the node for simulation
            current_node.child = avaliable_childs
            random_index = np.random.randint(low=0, high=len(avaliable_childs), size=1)
            current_node = avaliable_childs[random_index[0]]
        
        return current_node 

**Simulation**:

Once we randomly select a child node C. We want to get the estimated winning rate from the current game state C. We then run the Monte Carlo simulations on current game state until the game ends. Here we used the most simple rollout policy: simply assign the chess randomly until one player to be the winner. However, this policy is not very effective, and we could replaced with some more efficient policy.

In [6]:
    def simulate(self, child_node):
        self.total_n += 1 #Total simulation coutner
        board = deepcopy(child_node.board)
        previous_player = child_node.player
        winner = get_winners(board, self.win_mark)
        simulate_time = 0
        
        #Randomly assign chess until someone win
        while(winner is None):
            #Different simulate policy
            if(self.IS):
                avaliable_actions, runtime = get_neighbor_actions(board, False) #Improved MCTS
            else:
                avaliable_actions, runtime = get_valid_actions(board, False) #Naive MCTS

            #Roll out trick random assign the chesses
            random_index = np.random.randint(low=0, high=len(avaliable_actions), size=1)[0]
            action = avaliable_actions[random_index]
            simulate_time += runtime

            if previous_player == 'o':
                current_player = 'x'
                board[action] = -1
            else:
                current_player = 'o'
                board[action] = 1
            
            previous_player = current_player
            winner = get_winners(board, self.win_mark)
        
        return winner, simulate_time

**Backprop**:

Finally, when we want to update the from the current simulated node. We need to update the simulation times, winning times and the winning rate for each node on this path.

In [7]:
    def backprop(self, child_node, winner):
        player = self.tree.player

        if(winner == "draw"):
            reward = 0
        elif(winner == player):
            reward = 1
        else:
            reward = -1
        
        #Update gaming info from child node all the way to the root node
        finished = False
        while(not finished):
            child_node.n += 1
            child_node.w += reward
            child_node.q = child_node.w / child_node.n 
            parent_node = child_node.parent
            if(parent_node is not None):
                child_node = parent_node
            else:
                finished = True

**Optimization**:

Then we just need to simulate for given times following selection, expansion, simulation and backpropagation process. At the end we will return the optimum action with the maximum winning probability. In this function you could see some helper function that we will define later.

In [8]:
    def run(self, self_compete = False): #Self compete on/off, the return type is different.
        start_time = time.time()
        simulate_time = 0

        for i in range(self.iterations):
            depth, selected_node = self.select()
            child_node = self.expand(selected_node)
            winner, runtime = self.simulate(child_node)
            simulate_time += runtime

            self.backprop(child_node, winner)

            if(depth > self.max_depth): #If depth over max depth, kill the loop and assess based on current information
                break
        
        #Select the best action
        current_node = self.tree
        possible_nodes = current_node.child
        best_q = -100
        for node in possible_nodes:
            if(node.q > best_q):
                best_q = node.q
                best_action = node.action
        self.p_plot(possible_nodes) #Probability plots
        run_time = time.time() - start_time - simulate_time

        if(not self_compete): #Visulization require int action type
            best_action = best_action[0] * self.size + best_action[1]

        return best_action, best_q, depth, run_time

## Some helper functions

Now we have defined the tree nodes and the MCTS algorithm class for our game board. However, there are other helper functions we need to define. 

For example, to assess the winner of current game state. We need to assess whether there are n chess lies in a line. Which could be horizontal, vertical or diagonal. It would be easy to assess on a nxn board. However, what if we want to assess (n-1) win marks on a nxn board. To solve this question, we could use a shift window to help us determine the winner.

Another helper function would be the get the valid actions for the game state. You surely do not want someone place a chess on a position you already placed. To avoid this, we need to get all the avaliable positions of the current state.

In [9]:
def get_winners(board, win_mark):
    """
    get the winner of this board
    arg:
    - board state
    return:
    "o", "x", "draw", None (not ended)
    """

    #return who wins
    def __who_wins(sums, win_mark):
        if np.any(sums == win_mark): return 'o'
        if np.any(sums == -win_mark): return 'x'
        return None

    #jusify the winner
    def __is_terminal_in_conv(leaf_state, win_mark):
        # check row/col
        for axis in range(2):
            sums = np.sum(leaf_state, axis=axis)
            result = __who_wins(sums, win_mark)
            if result is not None: return result
        # check diagonal
        for order in [-1,1]:
            diags_sum = np.sum(np.diag(leaf_state[::order]))
            result = __who_wins(diags_sum, win_mark)
            if result is not None: return result
        return None

    n_rows_board = len(board)
    #Use a shift window to check the winner
    window_positions = range(n_rows_board - win_mark + 1)

    #Jusify the winner looping board by win_mark 
    for row in window_positions:
        for col in window_positions:
            window = board[row:row+win_mark, col:col+win_mark]
            winner = __is_terminal_in_conv(window, win_mark)
            if winner is not None:
                return winner

    if not np.any(board == 0):
        return 'draw'
    return None

def get_valid_actions(board, timer = False):
    '''
    return all possible action in current leaf state
    in:
    - board
    out:
    - set of possible actions (row,col) - start from 0
    '''
    start_time = time.time()
    actions = []
    state_size = len(board)

    for i in range(state_size):
        for j in range(state_size):
            if board[i][j] == 0:
                actions.append((i, j))
    runtime = time.time() - start_time

    if(timer): return actions, runtime
    else: return actions, 0

However, to get all the valid actions may not be an effective selection. Some positions have much less value to explore. For example, in tic-tak-toe or gomoku, we only have two strategy: either block opponents movement or grow your own line of chess. To do both of the activities, you had to have place a chess where its neighbor is not empty. In another word, we could select the avaliable actions where only its neighbor is not empty:

In [10]:
def get_neighbor_actions(board, timer = False):
    '''
    return all neighbot actions in current leaf state
    in:
    - board
    out:
    - set of possible actions (row,col) - start from 0
    '''
    start_time = time.time()
    actions = set()
    valid_actions, _ = get_valid_actions(board)
    state_size = len(board)

    for i in range(state_size):
        for j in range(state_size):
            if board[i][j] != 0:
                for index_a in range(i-1, i+2):
                    for index_b in range(j-1,j+2):
                        if((index_a, index_b) in valid_actions):
                            actions.add((index_a, index_b))
    run_time = time.time() - start_time

    if(timer): return list(actions), run_time
    else: return list(actions), 0

## Full code of MCTS

Now, with the full code of MCTS, we could play some tic-tak-toe.

In [11]:
class MCTS(Imcts):
    def __init__(self, iterations=100, max_depth=5, game_board=None, win_mark=3, player=None, explore_constant=np.sqrt(2), IS = True):
        self.iterations = iterations
        self.max_depth = max_depth
        self.explore_constant = explore_constant

        self.win_mark = win_mark
        self.game_board = game_board
        self.player = player
        self.total_n = 0

        self.size = len(game_board)
        
        self.tree = node(None, self.game_board, self.player)
        self.IS = IS #Importance sampling on / off

    def select(self):
        """
        Select the next node for expansion
        """
        node_founded = False
        current_node = self.tree
        depth = 0
        
        while not node_founded:
            child_nodes = current_node.child
            depth += 1

            if(len(child_nodes) == 0): #If this node is not yet explored
                node_founded = True
            else:                     #Else, select one from the child (UCT) node and keep searching
                max_uct_value = -100.0
                for child_node in child_nodes:
                    w = child_node.w
                    n = child_node.n
                    total_n = self.total_n
                    if(n == 0): n = 1e-4 #Avoid numerical error

                    #UCT formula
                    exploitation_value = w / n
                    exploration_value  = np.sqrt(np.log(total_n)/n)
                    uct_value = exploitation_value + self.explore_constant * exploration_value

                    if uct_value > max_uct_value:
                        max_uct_value = uct_value
                        current_node = child_node
        return depth, current_node

    def expand(self, leaf_node):
        """
        Expand the current node
        """
        leaf_board = leaf_node.board
        winner = get_winners(leaf_board, self.win_mark)
        avaliable_actions, _ = get_valid_actions(leaf_board)

        current_node = leaf_node
        if (winner is None): # Not a terminal state
            avaliable_childs = []
            for action in avaliable_actions: #Grow all the avaliable actions
                board = deepcopy(leaf_node.board)
                current_player = leaf_node.player

                if(current_player == 'o'):
                    next_player = 'x'
                    board[action] = 1
                else:
                    next_player = 'o'
                    board[action] = -1

                new_child = node(current_node, board, next_player, action=action)
                avaliable_childs.append(new_child)
            current_node.child = avaliable_childs
            random_index = np.random.randint(low=0, high=len(avaliable_childs), size=1)
            current_node = avaliable_childs[random_index[0]]
        
        return current_node 


    def simulate(self, child_node):
        self.total_n += 1
        board = deepcopy(child_node.board)
        previous_player = child_node.player
        winner = get_winners(board, self.win_mark)
        simulate_time = 0

        while(winner is None):
            if(self.IS): avaliable_actions, runtime = get_neighbor_actions(board, False) #Improved MCTS
            else: avaliable_actions, runtime = get_valid_actions(board, False) #Naive MCTS

            #Roll out trick random assign the chesses
            random_index = np.random.randint(low=0, high=len(avaliable_actions), size=1)[0]
            action = avaliable_actions[random_index]
            simulate_time += runtime

            if previous_player == 'o':
                current_player = 'x'
                board[action] = -1
            else:
                current_player = 'o'
                board[action] = 1
            
            previous_player = current_player
            winner = get_winners(board, self.win_mark)
        
        return winner, simulate_time

    def backprop(self, child_node, winner):
        player = self.tree.player

        if(winner == "draw"):
            reward = 0
        elif(winner == player):
            reward = 1
        else:
            reward = -1
        
        #Update gaming info from child node all the way to the root node
        finished = False
        while(not finished):
            child_node.n += 1
            child_node.w += reward
            child_node.q = child_node.w / child_node.n 
            parent_node = child_node.parent
            if(parent_node is not None):
                child_node = parent_node
            else:
                finished = True

    def run(self, self_compete = False): #Self compete on/off, the return type is different.
        start_time = time.time()
        simulate_time = 0

        for i in range(self.iterations):
            depth, selected_node = self.select()
            child_node = self.expand(selected_node)
            winner, runtime = self.simulate(child_node)
            simulate_time += runtime
            self.backprop(child_node, winner)

            if(depth > self.max_depth): #If depth over max depth, kill the loop and assess based on current information
                break
        
        #Select the best action
        current_node = self.tree
        possible_nodes = current_node.child
        best_q = -100
        for node in possible_nodes:
            if(node.q > best_q):
                best_q = node.q
                best_action = node.action
        run_time = time.time() - start_time - simulate_time

        if(not self_compete): #Visulization require int action type
            best_action = best_action[0] * self.size + best_action[1]

        return best_action, best_q, depth, run_time


For a simple visualization. We could let the MCTS with the rollout policy play with the MCTS with the improved position selection process. You could see the MCTS Improved often has the higher winning probability. But you are free to check it your self. For the better visualization and interactive play showed in the video. You could view my code at github.

In [12]:
current_player = 1
mcts_player = -1
win_mark = 4
board_shape = [5, 5]
winner_map = {"x": "mcts_rollout", "o": "mcts_improved"}
winner = None

game_board = np.zeros(board_shape, dtype=int)


#Self Competence
while winner is None:
    print("----------------------------")
    if current_player == mcts_player: #IS MC player
        print("MCTS Rollout Turn")
        mcts = MCTS(iterations=400, max_depth=30, win_mark=win_mark, game_board=game_board, player=current_player)
        best_action, best_q, depth, run_time = mcts.run(self_compete=True)
        game_board[best_action] = -1
    else: #Naive MC player
        mcts = MCTS(iterations=400, max_depth=30, win_mark=win_mark, game_board=game_board, player=current_player, IS=False)
        print("MCTS Improved Position Turn")
        best_action, best_q, depth, run_time = mcts.run(self_compete=True)
        game_board[best_action] = 1
    
    print(game_board)
    winner = get_winners(game_board, win_mark)
    current_player = -current_player

print("The winner is "+winner_map[winner])

----------------------------
MCTS Improved Position Turn
[[0 0 0 0 0]
 [0 0 0 0 0]
 [0 0 1 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]]
----------------------------
MCTS Rollout Turn
[[ 0  0  0  0  0]
 [ 0  0  0 -1  0]
 [ 0  0  1  0  0]
 [ 0  0  0  0  0]
 [ 0  0  0  0  0]]
----------------------------
MCTS Improved Position Turn
[[ 0  0  0  0  0]
 [ 0  0  0 -1  0]
 [ 0  0  1  0  0]
 [ 0  0  0  0  0]
 [ 1  0  0  0  0]]
----------------------------
MCTS Rollout Turn
[[ 0 -1  0  0  0]
 [ 0  0  0 -1  0]
 [ 0  0  1  0  0]
 [ 0  0  0  0  0]
 [ 1  0  0  0  0]]
----------------------------
MCTS Improved Position Turn
[[ 0 -1  0  0  0]
 [ 0  0  0 -1  0]
 [ 0  0  1  0  0]
 [ 0  0  0  0  0]
 [ 1  0  1  0  0]]
----------------------------
MCTS Rollout Turn
[[ 0 -1  0  0  0]
 [ 0  0  0 -1  0]
 [ 0  0  1  0  0]
 [ 0  0  0  0  0]
 [ 1  0  1 -1  0]]
----------------------------
MCTS Improved Position Turn
[[ 0 -1  0  0  0]
 [ 0  0  0 -1  0]
 [ 0  0  1  0  0]
 [ 1  0  0  0  0]
 [ 1  0  1 -1  0]]
-------------------