# Imports

In [68]:
#! pip install keras==2.4.0
#! pip install tensorflow==2.4.1

import numpy as np
from itertools import combinations
from collections import deque
from copy import deepcopy
import random

import pickle

import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten, Conv2D

from PIL import Image, ImageDraw
import imageio
import matplotlib.image as mpimg
import matplotlib.pyplot as plt

from tqdm import tqdm_notebook

# Hex Visualizer

In [50]:
class Styles:
    WHITE = (255, 255, 255)
    RED = (255,0,0,255)
    BLUE = (0,0,255,255)
    BLACK = (0,0,0,255)
    GREEN = (0,255,0,255)
    
    PLAYER1 = RED
    PLAYER2 = BLUE
    EMPTY = BLACK

    # cell value corresponds to index in GAME_COLORS
    GAME_COLORS = [EMPTY, PLAYER1, PLAYER2]
    BGCOLOR = WHITE

    LINEWIDTH = 10
    LINECOLOR = BLACK

    CELLRADIUS = 50
    #Must be minimum the 2xcellradius to prevent overlaps
    CELLMARGIN = 2*CELLRADIUS + 100
    #Must be minimum the cellradius to prevent grid from overflowing the image
    IMAGEPADDING = CELLRADIUS + 20

    GIF_FRAME_DELAY = 0.5


class HexVisualizer:

    def render_image(self, board, file_path):

        grid_dimentions = (len(board) - 1) * Styles.CELLMARGIN
        image_dimentions = grid_dimentions + 2 * Styles.IMAGEPADDING
        image_center = image_dimentions / 2

        # Calculate coordinates for all the board cells.
        cell_coordinates = get_cell_coordinates(board, image_center)

        image = create_image(image_dimentions, image_dimentions)
        canvas = ImageDraw.Draw(image)

        render_lines(cell_coordinates, canvas)
        render_dots(cell_coordinates, canvas)

        # rotate image -45deg to get the diamond shape
        image = image.rotate(-45, Image.NEAREST, expand=1, fillcolor=Styles.WHITE)
        # print(f'BOARD IMAGE SAVED AS {file_path}')
        image.save(file_path)
        image.close()

        # Display image in Jupyter Notebook by using matplotlib.image
        # img = mpimg.imread(file_path)
        # imgplot = plt.imshow(img)
        # imgplot.axes.get_xaxis().set_visible(False)
        # imgplot.axes.get_yaxis().set_visible(False)

    def render_gif(self, state_history, file_path):
        board_history = [s[0] for s in state_history]

        images = []
        for i, board in enumerate(board_history):
            filename = f'out/tmp_img/img{i}.png'
            hex_viz = HexVisualizer()
            hex_viz.render_image(board, filename)
            images.append(imageio.imread(filename))
        imageio.mimsave(file_path, images, duration=Styles.GIF_FRAME_DELAY)
        print(file_path)


# returns a 2d-array where every grid row is an array of tuples
# containing the node, x-, and y-coordinate: (the node, x, y)
def get_cell_coordinates(board, image_center):
    node_coordinates = []
    for i in range(len(board)):
        # y coordinate for current row
        y = i * Styles.CELLMARGIN + Styles.IMAGEPADDING
        row_width = (len(board[i]) - 1) * Styles.CELLMARGIN
        row_start = image_center - (row_width / 2)

        cell_coordinates_row = []
        for j in range(len(board[i])):
            # x coordinate for current dot
            x = row_start + (j * Styles.CELLMARGIN)
            cell_coordinates_row.append((board[i][j], x, y))
        node_coordinates.append(cell_coordinates_row)

    return node_coordinates

def render_lines(cell_coordinates, canvas):
    for i in range(len(cell_coordinates)):
        for j in range(len(cell_coordinates[i])):
            #render horizontal lines
            if j+1 < len(cell_coordinates[i]):
                color = Styles.LINECOLOR
                #render PLAYER1 color for first and last horizontal line
                if i == 0 or i == len(cell_coordinates)-1:
                    color = Styles.PLAYER1
                cell_connecting_line(canvas, cell_coordinates[i][j], cell_coordinates[i][j + 1], color)

                #render diagonal lines
                if i+1 < len(cell_coordinates):
                    cell_connecting_line(canvas, cell_coordinates[i + 1][j], cell_coordinates[i][j + 1], Styles.LINECOLOR)
            
            #render vertical lines
            if i+1 < len(cell_coordinates):
                color = Styles.LINECOLOR
                #render PLAYER2 color for first and last vertical line
                if j == 0 or j == len(cell_coordinates[i])-1:
                    color = Styles.PLAYER2
                cell_connecting_line(canvas, cell_coordinates[i][j], cell_coordinates[i + 1][j], color)


def render_dots(cell_coordinates, canvas):
    for row in cell_coordinates:
        for (cell, x, y) in row:
            color = Styles.GAME_COLORS[cell]
            draw_dot(canvas, [x, y], Styles.CELLRADIUS, color)            


def create_image(width, height):
    return Image.new("RGB", (width, height), Styles.BGCOLOR)


def draw_dot(drawing, center_coordinates, r, color, is_filled=True, width=1):
    x, y = center_coordinates

    upper_left_point = (x-r, y-r)
    lower_right_point = (x+r, y+r)

    # coordinates of upper left and lower right of circle/dot
    coordinates = [upper_left_point, lower_right_point]

    # possible to make a hollow dot by setting is_filled to false
    if is_filled:
        drawing.ellipse(coordinates, fill=color)
    else:
        drawing.ellipse(coordinates, fill=Styles.BGCOLOR, outline=color, width=width)


def cell_connecting_line(drawing, cell1, cell2, color):
    (_, x1, y1) = cell1
    (_, x2, y2) = cell2
    coordinates = [(x1, y1),(x2, y2)]
    drawing.line(coordinates, fill=color, width=Styles.LINEWIDTH)

# State Manager

In [51]:
class State_Manager:

    def __init__(self, board_size):
        self.board_size = board_size

    def get_initial_board(self):
        empty_board = np.zeros(shape=(self.board_size, self.board_size), dtype=int)
        return empty_board

    def get_legal_actions(self, state):
        """
        return: a list of legal actions
        """
        board, player = state
        board_size = len(board)
        actions = []    

        for row in range(board_size):
            for col in range(board_size):
                if board[row][col] == 0:
                    actions.append(((row, col), player))
        return actions

    def get_score(self, state) -> float:
        """
        returns: the score of the state. If winning: +++, if loosing: ---
        """
        score = 0
        if self.check_if_winning(state):
            _, next_player = state
            player = change_player(next_player)
            score = 1
        else:
            raise Exception('Game not scored')
        return score


    # action is a coordinate tuple of the cell to put the players piece: (row,col)
    def get_child_state(self, state, action):
        """
        paaram: state = tuple(game_board, player)
        return: the child state that is reached when applying action to state

        Gets a game board with player_id and coordinates for action
        Returns a board with a peg put at that spot, and the player id for the next player
        """
        board, _ = state
        child_board = deepcopy(board)
        # print(action)
        (row, col), player = action
        child_board[row][col] = player

        return (child_board, change_player(player))

    def check_if_winning(self, state):
        board, next_player = state
        player = change_player(next_player)
        board_size = len(board)

        # make the initial BFS queue for either player 1 or player 2
        if player == 1:
            queue = [(0, i) for i in range(board_size)]
        elif player == 2:
            queue = [(i, 0) for i in range(board_size)]
        else:
            raise Exception("player must be either 1 or 2")

        return bfs(state, queue)



## Help Functions

In [52]:
# returns true if the player is in a winning state, else false
def bfs(state, queue):
    board, next_player = state
    player = change_player(next_player)
    # remove the nodes which doesn't contain player's piece from the initial queue
    queue = [(row, col) for (row, col) in queue if board[row,col]==player]
    # use collections.deque to get O(1) append and pop operations
    queue = deque(queue)

    board_size = len(board)
    visited = []

    while len(queue) > 0:
        node = queue.popleft()
        neighbors = get_neighbors(board, node)
        for (row, col) in neighbors:
            # all unvisited legal moves to a node containing one of the players pieces
            if (row,col) not in visited and board[row][col] == player:
                #check for win for player 1 and 2
                if player == 1 and row == board_size-1:
                    return True
                elif player == 2 and col == board_size-1:
                    return True
                #if not win, append node to queue
                queue.append((row, col))
        visited.append(node)
    return False        
        

# returns a list of all neighbouring coordinates from a given coordinate on the board
def get_neighbors(board, coordinate):
    legal_moves = []
    board_size = len(board)
    row, col = coordinate

    if col < board_size-1:
        # east
        legal_moves.append((row,col+1))
        if row > 0:
            #north east
            legal_moves.append((row-1,col+1))
    if row > 0:
        #north
        legal_moves.append((row-1, col))
    if col > 0:
        #west
        legal_moves.append((row, col-1))
        if row < board_size-1:
            #south west
            legal_moves.append((row+1, col-1))
    if row < board_size-1:
        #south
        legal_moves.append((row+1, col))
    return legal_moves

def change_player(player):
    if player == 1:
        return 2
    else:
        return 1

## Gameplay functions

In [53]:
def play_game(sm, p1_policy, p2_policy):
    board = sm.get_initial_board()
    # Node.clear_memo()
    player = 1
    state = (board, player)

    winner = None
    node=None

    move_counter = 0
    state_history = []

    while len(sm.get_legal_actions(state)) > 0 and winner is None:
        move_counter += 1

        state_history.append(state)
        _, player = state
        if player == 1:
            move, root_node = p1_policy(state, node)
            # print(f'p1 move: {move}')
        else:
            move, root_node = p2_policy(state, node)

        child_state = sm.get_child_state(state, move)

        node = root_node

        if sm.check_if_winning(child_state):
            winner = player
            state_history.append(child_state)

        # set state as the board state and the other player
        state = child_state

    winning_board, _ = state
    return winner, state_history


def random_player_move(state, root_node=None):
    sm = State_Manager(4)
    legal_actions = sm.get_legal_actions(state)
    return random.choice(legal_actions), None

def human_input_move(state, root_node=None):
    curr_board, player = state
    # HexVisualizer(curr_board, 'tmp.png')
    print(curr_board)
    print(state)
    input_move = input('place piece at coordinate i.e: 2,0 >> ')
    x,y = input_move.strip(' ').split(',')
    move = ((int(x),int(y)), player)
    return move, None

# def mcts_player_move(state, root_node=None):
#     mcts = MCTS(sm, 100, default_policy, uct, state, root_node)
#     best_move, prob_distribution, child_node = mcts.choose_move()
#     # print("\n\nState:\n", state, "\nPlayer:", player, "Best Move: ", best_move, "\nProb_dist:\n", np.round(prob_distribution, 3))
#     return best_move, child_node


# def play_multiple_games(n_games, p1_policy, p2_policy):
#     history = []
    
#     for i in range(n_games):
#         winner = play_game(sm, p1_policy, p2_policy)
#         history.append(winner)

#         p1_percent = 100*history.count(1)/len(history)
#         p2_percent = 100*history.count(2)/len(history)

#         print(f'GAME {i}: {winner} WINS  ==>  {round(p1_percent, 1)}% - {round(p2_percent, 1)}%')
#     return history

In [54]:
# play_game(State_Manager(4), random_player_move, human_input_move)

# Monte Carlo Tree Search
## MCTS Tree

In [55]:
class MCTS:
    """
    Monte Carlo Tree Search:

    Following pseudo code:
    1. Traverse - Follow tree policy
    2. Expand leaf
    3. Rollout
    4. Backpropagate
    5. Choose best move
    """
    
    def __init__(self, state_manager, n_sim, default_policy, utility_function, state, root_node=None, c=1.5):
        """
        :param state_manager:    the state manager for the game
        :param n_sim:            number of simulations
        :param default_policy:   default policy for rollout       default_policy(state, action)
        :param utility_function: used for scoring moves
        :param state:            =(game_board, player)
        :param root_node:        root_nfplode when tree is pruned
        :param c:                exploratory constant for utility function

        action = ((row, col), player)
        """
        self.state_manager = state_manager
        self.n_sim = n_sim
        self.default_policy = default_policy
        self.utility_function = utility_function
        self.state = state
        self.root_node = root_node
        self.c = c

    def choose_move(self):
        """
        :return best_move, prob_distribution, child_node

        Using the 4 steps of the MCTS to choose the best move. 
        """
        #Setting root_node
        if self.root_node is None:
            root_node = Node(self.state, self.state_manager)
        else:
            root_node = self.root_node
            
        #Performing n simulations of the MCTS tree
        for i in range(self.n_sim):
            
            # 1. Traversing
            #---------------
            node_action_list = self.traverse(root_node)
            
            #Setting next_node as the node that will be expanded. In the initial state that is the root_node
            if len(node_action_list) > 0:
                last_node, last_action = node_action_list[-1]
                next_node = last_node.children[last_action]
            else:
                next_node = root_node
                
            # 2. Expanding
            #---------------
            #Appends the expanded node an the action made at last node to node_action_list
            #Returns nothing if node has no child states
            node_action = self.expand(next_node)
            if node_action is not None:
                node_action_list.append(node_action)

            # 3. Rolling out
            #---------------
            #Obtaining the score from the final state
            if len(node_action_list)>0:
                score = self.rollout(node_action_list[-1])
            #if root_node is end state
            else:
                score = self.state_manager.get_score(self.state)
                
            
            # 4. Backpropagating
            #---------------
            #Updating the score and visitcounts for all nodes and actions
            self.back_propagate(node_action_list, score)
            
        # 5. Choosing the best move
        #---------------
        #Get the action with most visits
        best_move = max(root_node.edges, key=(lambda k:root_node.edges[k][1]))
        child_node = root_node.children[best_move]
        
        #Normalized probability distribution based on visit counts
        prob_distribution = self.get_prob_distribution(root_node)
        
        return best_move, prob_distribution, child_node


    def traverse(self, node):
        """
        :param node: root node of the traversal
        :return: List of tuples (node, action) in cronological order

        From the root state (R) use the tree policy to choose the next pre-existing nodes
        """
        node_action_list = []
        
        #Traversing down to the bottom of the existing tree and adding every step to the node_action_list
        while len(node.children) > 0:
            best_action = self.choose_action(node)
            node_action_list.append((node, best_action))
            node = node.get_child(best_action)
    
        return node_action_list
    
    def expand(self, node):
        """
        :param node: root node of the traversal
        :return: node, best_action

        Generates children for the chosen node and returns the node along with the best action.
        """
        
        #No expansion if the node is an end state
        if self.state_manager.check_if_winning(node.state):
            return

        #Expanding the node        
        actions = self.state_manager.get_legal_actions(node.state)
        node.set_child_states(actions, self.state_manager)

        #Using the default policy to choose the best move from the expansion
        #TODO: Check if works
        best_action = self.default_policy.get_action(node.state, actions, stochastic=True)
        
        return node, best_action
        
    
    def rollout(self, node_action):
        """
        :param node_action: tuple(node, action)
        :return score

        Follow the default policy from the node in the node_action pair all the way to an end state
        returning the score from the end state
        """
        node, action = node_action
        state = self.state_manager.get_child_state(node.state, action)

        #If the incoming child_state is winning
        if self.state_manager.check_if_winning(state):
            score = self.state_manager.get_score(state)
            return score
        
        #Else - using counter to get the correct value of the score (+/-)
        winner = None
        winning_state = None
        counter = 0

        #Rollout until winning state
        while True:
            if self.state_manager.check_if_winning(state):            
                _, winner = state
                winning_state = child_state
                break  
            
            legal_actions = self.state_manager.get_legal_actions(state)
            
            #TODO: Check if works
            best_action = self.default_policy.get_action(state, legal_actions, stochastic=True)
            child_state = self.state_manager.get_child_state(state, best_action)

            state = child_state
            counter += 1
        
        #Adjusting the score depending on the number of steps until the root node
        score = self.state_manager.get_score(winning_state)*(-1)**counter
        return score


    def back_propagate(self, node_action_list, score):
        """
        Backpropagating the score through the visited node-action pairs
        """
        for i, (node, action) in enumerate(reversed (node_action_list)):
            #Alternating (+/-) score depending on which players turn it is
            node.visit(action, score * (-1)**i)


    def choose_action(self, node):
        """
        Choose the best action from the current node, based on the default policy and utility function
        :return argmax(q_sa + u_sa)
        """
        actions = node.children.keys()
        n_s = node.visit_count
        node_state = node.state

        _, player = node_state
 
        return max(actions, key=lambda a: self.score_move(node=node, state=node_state, n_s=n_s, action=a))
    
    def score_move(self, node, state, n_s, action):
        """
        Score Q-values
        :return (q_sa + u_sa)
        """
        # visit count
        n_sa = node.edges[action][1]

        u_sa = self.utility_function(n_s=n_s, n_sa=n_sa, c=self.c)
        q_sa = node.get_q(action)           
        return q_sa + u_sa
   

    def get_prob_distribution(self, node):
        """
        Returning normalized probability distribution for possible actions, based on the visitcounts
        Shape of prob_distribution equals board shape
        """
        visit_count = np.zeros_like(self.state[0])
        w = np.zeros_like(self.state[0])
        for key in node.edges.keys():
            move = key[0]
            visit_count[move] = node.edges[key][1]
            w[move] = node.edges[key][0]
        
        prob_distribution = visit_count/sum(sum(visit_count))
        return prob_distribution

## Node

In [56]:

class Node:
    """
    Nodes in the Monte Carlo Tree
    
    Hashing nodes to static dictionary memo, in order to reuse them at later states
    """
    memo = dict()

    @staticmethod
    def clear_memo():
        memo = dict()

    def __init__(self, state, state_manager):
        self.state = state

        #children = {action: Node(child_state)}
        #action = ((row, col), player)
        self.children = {}
        #edges = {action: (value, visit_count)}
        self.edges = {}
        #self.set_child_states(state_manager.get_legal_actions(state), state_manager)
        self.visit_count = 1


    def get_child(self, action):
        return self.children[action]

    def set_child_states(self, actions, state_manager):
        """
        Used when Expanding nodes.
        If any child nodes have bbeen generated at an earlier stage in the game, they are reloaded
        """
        if not actions:
            return
        for action in actions:
            child_state = state_manager.get_child_state(self.state, action)
            hash_ = hash(child_state[0].tobytes())
            if hash_ in Node.memo.keys():
                self.children[action] = Node.memo.get(hash_)
            else:
                node = Node(child_state, state_manager)
                Node.memo[hash_] = node
                self.children[action] = node
            
            #Set edge_visits initially to 1 to avoid divi
            self.edges[action] = (0 ,1)

    def is_final_state(self):
        if len(self.children) == 0:
            return True
        return False

    def visit(self, action, value):
        self.visit_count +=1
        edge_value, edge_visits = self.edges[action]
        self.edges[action] = (edge_value + value, edge_visits + 1) 
    
    def get_q(self, action):
        edge_value, edge_visits = self.edges[action]
        return (1/edge_visits)*edge_value

# ANET

In [57]:

"""
One-hot encode the board before entering it into the NN

Create a convolutionary 2D network with 2d input;
    1. One Hot Encoded board for player 1
    2. One Hot Encoded board for player 2
"""

class ANET:
    def __init__(self, epochs):
        self.model = Sequential()
        self.epochs = epochs
        pass
        
    
    def initialize(self, board_shape, filters=[32,64, 64], kernel_sizes=[(3,3), (2,2), (1,1)], dense_shape=[15, 30], 
                   activation='relu', optimizer='SGD', padding='same', lossfunction='categorical_crossentropy'):
        """
        Initializing the Neural Network Model

        :param board_shape: (nRows, nCols)
        :param filters: filter sizes in Conv2D
        :param kernel_sizes: kernel_sizes in Conv2D
        :param dense_shape: shape of dense layers
        :param activation_function: string identifier of built-in Keras activation function
        :param optimizer: string identifier of built-in Keras activation function
        :param padding: string identifier of built-in Keras padding function
        :param lossfunction string identifier of built-in Keras loss function
            
        input_shape = (nRows, nCols, nDims) nDims=2 because we have one perspective of the board from each player
        """ 

        input_shape = (board_shape[0], board_shape[1], 2)

        #Convolutional layers
        for i in range (len(filters)):
            self.model.add(Conv2D(filters[i], kernel_size=kernel_sizes[i], activation=activation, padding=padding))

        self.model.add(Flatten())
        
        #Dense layers
        for i in range(len(dense_shape)):
            self.model.add(Dense(dense_shape[i] , activation=activation))

        self.model.add(Dense(board_shape[0]*board_shape[1], activation='softmax'))
        self.model.compile(loss=lossfunction, optimizer=optimizer, metrics=['accuracy'])


    def get_action(self, state, legal_actions, stochastic=False):
        board, player = state
        board_shape = board.shape
       
        ohe_boards = np.array([self.one_hot_encode(state)])
        stack = tf.stack(ohe_boards)
        
        prediction = self.model(stack).numpy()
        prediction = self.pred_decode(prediction.reshape(board_shape), player)
        
        #Used for playing games
        if stochastic:
            prediction = {a:prediction[a[0]] for a in legal_actions}
            action = (random.choices(population=list(prediction.keys()), weights=list(prediction.values()), k=1)[0])
        
        else:
            max_pred = 0
            move = None
            for a in legal_actions:
                if prediction[a[0]] > max_pred:
                    max_pred = prediction[a[0]]
                    move = a[0]
            player = legal_actions[0][1]
            action = (move, player)
            
        return action
        
    def train(self, minibatch):
        """
        :param minibatch = (state, prob_distribution)

        fitting the states and prob_distributions from training
        """
        
        X = [self.one_hot_encode(s) for s, _ in minibatch]
        y = [self.pred_decode(p, s[1]).flatten() for s, p in minibatch]

        self.model.fit(np.stack(X), np.stack(y), batch_size=len(minibatch), epochs=self.epochs)


    def save_model(self, filepath):
        self.model.save(filepath=filepath)
    
    def load_model(self, filepath):
        self.model = keras.models.load_model(filepath=filepath)

    def set_model(self, model):
        self.model = model

    """
    Encoding and decoding of NN input and output
    """
    def pred_decode(self, prediction, player):
        if player == 1:
            return prediction
        else:
            return prediction.T
        
    def one_hot_encode(self, state):
        """
        :param state on format tuple(game_board, player)
        :return one-hot-encoded state on the form [[bin 2D-array of the current players pegs],
        [bin 2D-array of the other players pegs]]
 
        """
        board, player = state
        board_size = len(board[0])
        p1_board = np.where(board == 1, 1, 0)
        p2_board = np.where(board == 2, 1, 0)
        
        ohe = np.zeros(shape=(board_size, board_size, 2))        
        for i in range(board_size):
            for j in  range(board_size):
                if player == 1:
                    ohe[i,j] = [p1_board[i,j], p2_board[i,j]]
                else:
                    ohe[i,j] = [p2_board.T[i,j], p1_board.T[i,j]]

        return ohe

# Actor
For training the ANET

In [58]:
class Actor:
    """
    Used for training the ANET
    """
    def __init__(self, state_manager, anet, rbuf_lim=100, c=1.5):
        self.sm = state_manager
        self.anet = anet
        self.RBUF = deque([], rbuf_lim)
        self.anet_parameters = []
        self.c = c

    def train_anet(self, n_games, n_sim, utility_function, save_interval, minibatch_size, filepath):
        #self.clear_RBUF()
        for i in range(1, n_games+1):
            print("Game", i)
            #Clearing the Node cache
            Node.clear_memo()

            self.play_game(save_interval, n_sim, utility_function, minibatch_size)

            #if ga modulo is == 0: Save ANET’s current parameters for later use in tournament play.
            if np.mod(i, save_interval) == 0:
                path=filepath +'/model_'+ str(i)
                self.anet.save_model(path)
                pickle.dump( self.RBUF, open( "rbuf.p", "wb" ))
        path=filepath + '/final_model'
        self.anet.save_model(path)
        print("Finished training")


    def play_game(self, save_interval, n_sim, utility_function, minibatch_size):
        board = self.sm.get_initial_board()
        player = 1
        state = (board, player)

        
        root_node = None
        winner = None

        while len(self.sm.get_legal_actions(state)) > 0 and winner is None:
            mcts = MCTS(self.sm, n_sim, self.anet, uct, state, root_node=root_node, c=self.c)
            best_move, prob_distribution, child_node = mcts.choose_move()

            self.RBUF.append((state, prob_distribution))
            
            child_state = self.sm.get_child_state(state, best_move)

            if self.sm.check_if_winning(child_state):
                _, next_player = child_state
                winner = change_player(next_player)
        
            # set state as the board state and the other player
            state = child_state
            root_node = child_node

        print("Winner: ", winner)
        #Train ANET on a random minibatch of cases from RBUF
        if len(self.RBUF) <= minibatch_size:
            minibatch = random.sample(self.RBUF, len(self.RBUF))
        else:
            minibatch = random.sample(self.RBUF, minibatch_size)
        self.anet.train(minibatch)

    
    def clear_RBUF(self):
        self.RBUF = []

In [59]:
# board_size = 4
# sm = State_Manager(board_size)
# anet = ANET(epochs=10)
# anet.initialize(board_shape=(board_size, board_size))

# actor = Actor(sm, anet, rbuf_lim=100)
# actor.train_anet(n_games=1, n_sim=75, utility_function=uct, save_interval=1, minibatch_size=10)

In [60]:
class TOPP:
    
    def __init__(self, board_size, model_paths, G):
        self.board_size = board_size
        self.anets = [self.init_anet(path) for path in model_paths]
        self.G = G

    def init_anet(self, path):
        anet = ANET(epochs=None)
        anet.load_model(path)
        return anet

    def run_tournament(self, visualize=False):
        # create pairs of matches between all the players
        match_pairs = combinations(range(len(self.anets)), 2)
        score_table = np.zeros_like(self.anets)

        sm = State_Manager(self.board_size)
        
        for pair in match_pairs:
            p1, p2 = pair

            def p1_move(state, root_node=None):
                legal_actions = sm.get_legal_actions(state)
                anet_action = self.anets[p1].get_action(state, legal_actions, stochastic=True)
                return anet_action, None

            def p2_move(state, root_node=None):
                legal_actions = sm.get_legal_actions(state)
                anet_action = self.anets[p2].get_action(state, legal_actions, stochastic=True)
                return anet_action, None
            
            # Play match G times
            for i in range(self.G):
                winner, state_history = play_game(sm, p1_move, p2_move)
                if visualize:
                    hv = HexVisualizer()
                    hv.render_gif(state_history, f'out/p{p1}-VS-p{p2}-{i}.gif')
                if winner == 1:
                    score_table[p1] += 1
                elif winner == 2:
                    score_table[p2] += 1
                else:
                    raise Exception('PLAYER 1 OR PLAYER 2 SHOULD HAVE WON!')
        return score_table

In [61]:
# model_paths = ['NN_test/model_20', 'NN_test/model_10']
# topp = TOPP(PARAMS.board_size, model_paths, 100)
# scores = topp.run_tournament()
# print(scores)

# Utility Function

In [62]:
def uct(n_s, n_sa, c=1):
    return c*np.sqrt((np.log(n_s))/(1 + n_sa))

# Game Controller

In [77]:
class GameController:

    def __init__(self):
        #ANET
        self.epochs = 10
        self.lr_schedule = keras.optimizers.schedules.ExponentialDecay(
                initial_learning_rate=1e-2,
                decay_steps=10000,
                decay_rate=0.9)
        self.optimizer = keras.optimizers.SGD(learning_rate=self.lr_schedule)

        #Format of Conv2D-layers
        self.filters = [32,64, 64]
        self.kernel_sizes = [(3,3), (2,2), (1,1)]

        #Format of dense layers
        self.dense_shape = [15, 30]

        self.activation = 'relu'
        # self.optimizer = 'SGD'
        self.padding = 'same'
        self.lossfunction = 'categorical_crossentropy'
        self.learning_rate = 0.02

        # GENERAL
        self.board_size = 5

        #TOPP   
            # G=Number of games between each model
            # M = Number of saves // number of models
        self.G = 10
        self.M = 1

        self.visualize_TOPP = True
        self.visualize_training = False

        # Actor/Training ANET
        self.rbuf_lim = 100
        self.c = 1.5
        self.n_games = 100
        self.n_sim = 100
        self.utility_function = uct
        self.minibatch_size = 10

        self.file_path = 'DemoModel'
        

    def run_TOPP(self, model_paths=None):
        # If no pre-trained model_paths is passed as parameter, train models for TOPP
        if model_paths is None:
            print(f'----Training ANET ----')
            # interval at which models are saved
            save_interval = int(self.n_games/self.M)
            # list of the paths of the M trained models
            model_paths = [ f'{self.file_path}/model_{str(i)}' for i in range(save_interval,self.n_games+1, save_interval)]
            self.train_model(save_interval=save_interval)
            print("----Training Complete ----")

        topp = TOPP(self.board_size, model_paths, self.G)
        scores = topp.run_tournament(visualize=self.visualize_TOPP)

        # Visualize the scores of the TOPP
        player_names = [x.split('/')[-1] for x in model_paths]
        player_scores = zip(player_names, scores)
        print('==== TOPP RESULTS ====')
        for player, score in player_scores:
            print(f'  {player}: {score} WINS')
        print('======================')

    def train_model(self, save_interval):
        statemanager = State_Manager(self.board_size)
        anet = ANET(self.epochs)
        anet.initialize(board_shape=(self.board_size, self.board_size), filters=self.filters,
                        kernel_sizes=self.kernel_sizes, dense_shape=self.dense_shape, activation=self.activation,
                        optimizer=self.optimizer, padding=self.padding, lossfunction=self.lossfunction)

        actor = Actor(state_manager=statemanager, anet=anet, rbuf_lim=self.rbuf_lim, c=self.c)
        actor.train_anet(self.n_games, self.n_sim, self.utility_function, save_interval,
                         self.minibatch_size, self.file_path)
        if self.visualize_training:
            state_history = [rbuf[0] for rbuf in actor.RBUF]
            hv = HexVisualizer()
            hv.render_gif(state_history, 'out/rbuf.gif')

In [78]:
gc = GameController()
gc.run_TOPP(model_paths=['ProModel-5x5/model_10', 'ProModel-5x5/model_1000'])

out/p0-VS-p1-0.gif
out/p0-VS-p1-1.gif
out/p0-VS-p1-2.gif
out/p0-VS-p1-3.gif
out/p0-VS-p1-4.gif
out/p0-VS-p1-5.gif
out/p0-VS-p1-6.gif
out/p0-VS-p1-7.gif
out/p0-VS-p1-8.gif
out/p0-VS-p1-9.gif
==== TOPP RESULTS ====
  model_10: 1 WINS
  model_1000: 9 WINS


In [79]:
# gc = GameController()
# gc.run_TOPP(model_paths=['ProModel/model_10', 'ProModel/model_100', 'ProModel/model_1000'])