<a href="https://colab.research.google.com/github/lowen2022/AlphaQuoridor/blob/main/quoridor_mid.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

1.dual network

The code defines a neural network architecture for a game (likely a 3x3 grid-based game with 6 input channels). The network is saved to a file (best.keras)

In [28]:
# ====================
# Creating the Dual Network
# ====================

# Importing packages
from tensorflow.keras.layers import Activation, Add, BatchNormalization, Conv2D, Dense, GlobalAveragePooling2D, Input
from tensorflow.keras.models import Model
from tensorflow.keras.regularizers import l2
from tensorflow.keras import backend as K
import os

# Preparing parameters
DN_FILTERS  = 128  # Number of kernels in the convolutional layer (256 in the original version)
DN_RESIDUAL_NUM =  16  # Number of residual blocks (19 in the original version)
DN_INPUT_SHAPE = (3, 3, 6)  # Input shape
DN_OUTPUT_SIZE = 9 + 4 * 2  # Number of actions (placement locations (3*3))

# Creating the convolutional layer
def conv(filters):
    return Conv2D(filters, 3, padding='same', use_bias=False,
        kernel_initializer='he_normal', kernel_regularizer=l2(0.0005))

# Creating the residual block
def residual_block():
    def f(x):
        sc = x
        x = conv(DN_FILTERS)(x)
        x = BatchNormalization()(x)
        x = Activation('relu')(x)
        x = conv(DN_FILTERS)(x)
        x = BatchNormalization()(x)
        x = Add()([x, sc])
        x = Activation('relu')(x)
        return x
    return f

# Creating the dual network
def dual_network():
    # Do nothing if the model is already created
    if os.path.exists('./model/best.keras'):
        return

    # Input layer
    input = Input(shape=DN_INPUT_SHAPE)

    # Convolutional layer
    x = conv(DN_FILTERS)(input)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    # Residual blocks x 16
    for i in range(DN_RESIDUAL_NUM):
        x = residual_block()(x)

    # Pooling layer
    x = GlobalAveragePooling2D()(x)

    # Policy output
    p = Dense(DN_OUTPUT_SIZE, kernel_regularizer=l2(0.0005),
              activation='softmax', name='pi')(x)

    # Value output
    v = Dense(1, kernel_regularizer=l2(0.0005))(x)
    v = Activation('tanh', name='v')(v)

    # Creating the model
    model = Model(inputs=input, outputs=[p, v])

    # Saving the model
    os.makedirs('./model/', exist_ok=True)  # Create folder if it does not exist
    model.save('./model/best.keras')  # Best player's model

    # Clearing the model
    K.clear_session()
    del model

# Running the function
if __name__ == '__main__':
    dual_network()


2.Game

It implements a simplified simulation of the Quoridor game with a board size of 3x3 and each player initially has 1 wall. Quoridor is a strategy game where players need to move pieces to reach opposite targets while placing walls to block their opponents. The code includes game state management, legal action generation, multiple AI strategies such as random action, Alpha-Beta pruning, and Monte Carlo Tree Search MCTS, and provides visual output.

In [30]:
# ====================
# Quoridor (3 x 3), wall = 1
# ====================

# Importing packages
import random
import math
from collections import deque
import copy
from copy import deepcopy

# Game state
class State:
    def __init__(self, board_size=3, num_walls=1, player=None, enemy=None, walls=None, depth=0):
        self.N = board_size
        N = self.N
        if N % 2 == 0:
            raise ValueError('The board size must be an odd number.')
        self.directions = [(-1, 0), (1, 0), (0, -1), (0, 1)]
        self.player = player if player != None else [0] * 2 # Position, number of walls
        self.enemy = enemy if enemy != None else [0] * 2
        self.walls = walls if walls != None else [0] * ((N - 1) ** 2)
        self.depth = depth
        self.draw_depth = 30

        if player == None or enemy == None:
            init_pos = N * (N - 1) + N // 2
            self.player[0] = init_pos
            self.player[1] = num_walls
            self.enemy[0] = init_pos
            self.enemy[1] = num_walls

     # Check if it's a loss
    def is_lose(self):
        if self.enemy[0] // self.N == 0:
            return True
        return False

    # Check if it's a draw
    def is_draw(self):
        return self.depth >= self.draw_depth

    # Check if the game is over
    def is_done(self):
        return self.is_lose() or self.is_draw()

    def pieces_array(self):
        N = self.N
        def pieces_of(pieces):
            tables = []

            table = [0] * (N ** 2)
            table[pieces[0]] = 1
            tables.append(table)

            table = [pieces[1]] * (N ** 2)
            tables.append(table)

            return tables

        def walls_of(walls):
            tables = []

            table_h = [0] * (N ** 2)
            table_v = [0] * (N ** 2)

            for wp in range((N - 1) ** 2):
                x, y = wp // (N - 1), wp % (N - 1)

                if x < (N - 1) // 2 and y < (N - 1) // 2:
                    pos = N * x + y
                elif x > (N - 1) // 2 and y < (N - 1) // 2:
                    pos = N * x + (y + 1)
                elif x < (N - 1) // 2 and y > (N - 1) // 2:
                    pos = N * (x + 1) + y
                else:
                    pos = N * (x + 1) + (y + 1)

                if walls[wp] == 1:
                    table_h[pos] = 1
                elif walls[wp] == 2:
                    table_v[pos] = 1

            tables.append(table_h)
            tables.append(table_v)

            return tables

        return [pieces_of(self.player), pieces_of(self.enemy), walls_of(self.walls)]

    def legal_actions(self):
        """
        0 - (N ** 2 - 1): Move to a position
        N ** 2- (N ** 2 + (N - 1) ** 2 - 1): Place a horizontal wall
        (N ** 2 + (N - 1) ** 2) - (N ** 2 + 2 * (N - 1) ** 2 - 1): Place a vertical wall
        """
        actions = []
        actions.extend(self.legal_actions_pos(self.player[0]))

        if self.player[1] > 0:
            for pos in range((self.N - 1) ** 2):
                actions.extend(self.legal_actions_wall(pos))

        return actions

    def legal_actions_pos(self, pos):
        actions = []

        N = self.N
        walls = self.walls
        ep = self.enemy[0]

        x, y = pos // N, pos % N
        for dx, dy in self.directions:
            nx, ny = x + dx, y + dy
            if 0 <= nx < N and 0 <= ny < N:
                np = N * nx + ny
                wp = (N - 1) * nx + ny

                if nx < x:
                    if y == 0:
                        if walls[wp] != 1:
                            if np + ep != N ** 2 - 1:
                                actions.append(np)
                            else:
                                if nx > 0 and walls[wp - (N - 1)] != 1:
                                    nnp = np - N
                                    actions.append(nnp)
                                elif (nx == 0 and walls[wp] != 2) or (nx > 0 and walls[wp - (N - 1)] != 2 and walls[wp] != 2):
                                    nnp = np + 1
                                    actions.append(nnp)
                    elif y == (N - 1):
                        if walls[wp - 1] != 1:
                            if np + ep != N ** 2 - 1:
                                actions.append(np)
                            else:
                                if nx > 0 and walls[wp - (N - 1) - 1] != 1:
                                    nnp = np -  N
                                    actions.append(nnp)
                                elif (nx == 0 and walls[wp - 1] != 2) or (nx > 0 and walls[wp - (N - 1) - 1] != 2 and walls[wp - 1] != 2):
                                    nnp = np - 1
                                    actions.append(nnp)
                    else:
                        if walls[wp - 1] != 1 and walls[wp] != 1:
                            if np + ep != N ** 2 - 1:
                                actions.append(np)
                            else:
                                if nx > 0 and walls[wp - (N - 1)] != 1 and walls[wp - (N - 1) - 1] != 1:
                                    nnp = np - N
                                    actions.append(nnp)
                                else:
                                    if (nx == 0 and walls[wp - 1] != 2) or (nx > 0 and walls[wp - (N - 1) - 1] != 2 and walls[wp - 1] != 2):
                                        nnp = np - 1
                                        actions.append(nnp)
                                    if (nx == 0 and walls[wp] != 2) or (nx > 0 and walls[wp - (N - 1)] != 2 and walls[wp] != 2):
                                        nnp = np + 1
                                        actions.append(nnp)
                if nx > x:
                    if y == 0:
                        if walls[wp - (N - 1)] != 1:
                            if np + ep != N ** 2 - 1:
                                actions.append(np)
                            else:
                                if nx < (N - 1) and walls[wp] != 1:
                                    nnp = np + N
                                    actions.append(nnp)
                                elif (nx == (N - 1) and walls[wp - (N - 1)] != 2) or (nx < (N - 1) and walls[wp - (N - 1)] != 2 and walls[wp] != 2):
                                    nnp = np + 1
                                    actions.append(nnp)
                    elif y == (N - 1):
                        if walls[wp - (N - 1) - 1] != 1:
                            if np + ep != N ** 2 - 1:
                                actions.append(np)
                            else:
                                if nx < (N - 1) and walls[wp - 1] != 1:
                                    nnp = np + N
                                    actions.append(nnp)
                                elif (nx == (N - 1) and walls[wp - (N - 1) - 1] != 2) or (nx < (N - 1) and walls[wp - (N - 1) - 1] != 2 and walls[wp - 1] != 2):
                                    nnp = np - 1
                                    actions.append(nnp)
                    else:
                        if walls[wp - (N - 1) - 1] != 1 and walls[wp - (N - 1)] != 1:
                            if np + ep != N ** 2 - 1:
                                actions.append(np)
                            else:
                                if nx < (N - 1) and walls[wp - 1] != 1 and walls[wp] != 1:
                                    nnp = np + N
                                    actions.append(nnp)
                                else:
                                    if (nx == (N - 1) and walls[wp - (N - 1) - 1] != 2) or (nx < (N - 1) and walls[wp - (N - 1) - 1] != 2 and walls[wp - 1] != 2):
                                        nnp = np - 1
                                        actions.append(nnp)
                                    if (nx == (N - 1) and walls[wp - (N - 1)] != 2) or (nx < (N - 1) and walls[wp - (N - 1)] != 2 and walls[wp] != 2):
                                        nnp = np + 1
                                        actions.append(nnp)
                if ny < y:
                    if x == 0:
                        if walls[wp] != 2:
                            if np + ep != N ** 2 - 1:
                                actions.append(np)
                            else:
                                if ny > 0 and walls[wp - 1] != 2:
                                    nnp = np - 1
                                    actions.append(nnp)
                                elif (ny == 0 and walls[wp] != 1) or (ny > 0 and walls[wp - 1] != 1 and walls[wp] != 1):
                                    nnp = np + N
                                    actions.append(nnp)
                    elif x == (N - 1):
                        if walls[wp - (N - 1)] != 2:
                            if np + ep != N ** 2 - 1:
                                actions.append(np)
                            else:
                                if ny > 0 and walls[wp - (N - 1) - 1] != 2:
                                    nnp = np - 1
                                    actions.append(nnp)
                                elif (ny == 0 and walls[wp - (N - 1)] != 1) or (ny > 0 and walls[wp - (N - 1) - 1] != 2 and walls[wp - (N - 1)] != 1):
                                    nnp = np - N
                                    actions.append(nnp)
                    else:
                        if walls[wp - (N - 1)] != 2 and walls[wp] != 2:
                            if np + ep != N ** 2 - 1:
                                actions.append(np)
                            else:
                                if ny > 0 and walls[wp - (N - 1) - 1] != 2 and walls[wp - 1] != 2:
                                    nnp = np - 1
                                    actions.append(nnp)
                                else:
                                    if (ny == 0 and walls[wp - (N - 1)] != 1) or (ny > 0 and walls[wp - (N - 1) - 1] != 2 and walls[wp - (N - 1)] != 1):
                                        nnp = np - N
                                        actions.append(nnp)
                                    if (ny == 0 and walls[wp] != 1) or (ny > 0 and (walls[wp - 1] != 1 or walls[wp] != 1)):
                                        nnp = np + N
                                        actions.append(nnp)
                if ny > y:
                    if x == 0:
                        if walls[wp - 1] != 2:
                            if np + ep != N ** 2 - 1:
                                actions.append(np)
                            else:
                                if ny < (N - 1) and walls[wp] != 2:
                                    nnp = np + 1
                                    actions.append(nnp)
                                elif (ny == (N - 1) and walls[wp - 1] != 1) or (ny < (N - 1) and walls[wp - 1] != 1 and walls[wp] != 1):
                                    nnp = np + N
                                    actions.append(nnp)
                    elif x == (N - 1):
                        if walls[wp - (N - 1) - 1] != 2:
                            if np + ep != N ** 2 - 1:
                                actions.append(np)
                            else:
                                if ny < (N - 1) and walls[wp - (N - 1)] != 2:
                                    nnp = np + 1
                                    actions.append(nnp)
                                elif (ny == (N - 1) and walls[wp - (N - 1) - 1] != 1) or (ny < (N - 1) and walls[wp - (N - 1) - 1] != 1 and walls[wp - (N - 1)] != 1):
                                    nnp = np - N
                                    actions.append(nnp)
                    else:
                        if walls[wp - (N - 1) - 1] != 2 and walls[wp - 1] != 2:
                            if np + ep != N ** 2 - 1:
                                actions.append(np)
                            else:
                                if ny < (N - 1) and walls[wp - (N - 1)] != 2 and walls[wp] != 2:
                                    nnp = np + 1
                                    actions.append(nnp)
                                else:
                                    if (ny == (N - 1) and walls[wp - (N - 1) - 1] != 1) or (ny < (N - 1) and walls[wp - (N - 1) - 1] != 1 and walls[wp - (N - 1)] != 1):
                                        nnp = np - N
                                        actions.append(nnp)
                                    if (ny == (N - 1) and walls[wp - 1] != 1) or (ny < (N - 1) and (walls[wp - 1] != 1 or walls[wp] != 1)):
                                        nnp = np + N
                                        actions.append(nnp)

        return actions

    def legal_actions_wall(self, pos):
        N = self.N
        walls = self.walls
        def can_place_wall(orientation, pos):
            if walls[pos] != 0:
                return False
            x, y = pos // (N - 1), pos % (N - 1)
            if orientation == 1:
                if y == 0:
                    if walls[pos + 1] == 1:
                        return False
                elif y == (N - 2):
                    if walls[pos - 1] == 1:
                        return False
                else:
                    if walls[pos - 1] == 1 or walls[pos + 1] == 1:
                        return False
            else:
                if x == 0:
                    if walls[pos + (N - 1)] == 2:
                        return False
                elif x == (N - 2):
                    if walls[pos - (N - 1)] == 2:
                        return False
                else:
                    if walls[pos - (N - 1)] == 2 or walls[pos + (N - 1)] == 2:
                        return False
            return True

        def can_reach_goal(orientation, pos):
            def bfs(state):
                queue = deque([state.player[0]])
                visited = set()
                while queue:
                    pos = queue.popleft()
                    nps = state.legal_actions_pos(pos)
                    for np in nps:
                        x, y = np // N, np % N
                        if y == 0:
                            return True
                        if np not in visited:
                            visited.add(np)
                            queue.append(np)
                return False

            self.walls[pos] = orientation

            player_state = State(board_size=N, player=self.player.copy(), enemy=self.enemy.copy(), walls=deepcopy(self.walls), depth=self.depth)

            can_reach_player = bfs(player_state)

            action = pos
            if orientation == 1:
                action += N ** 2
            else:
                action += N ** 2 + (N - 1) ** 2

            enemy_state = player_state.next(action)

            can_reach_enemy = bfs(enemy_state)

            self.walls[pos] = 0

            return can_reach_player and can_reach_enemy

        actions = []

        if can_place_wall(1, pos) and can_reach_goal(1, pos):
            actions.append(N ** 2 + pos)
        if can_place_wall(2, pos) and can_reach_goal(2, pos):
            actions.append(N ** 2 + (N - 1) ** 2 + pos)

        return actions

    def rotate_walls(self):
        N = self.N
        rotated_walls = [0] * len(self.walls)
        for i in range((N - 1) ** 2):
            rotated_walls[i] = self.walls[(N - 1) ** 2 - 1 - i]
        self.walls = rotated_walls

    def next(self, action):
        N = self.N
        # Create the next state
        state = State(board_size=N, player=self.player.copy(), enemy=self.enemy.copy(), walls=deepcopy(self.walls), depth=self.depth + 1)

        if action < N ** 2:
            # Move piece
            state.player[0] = action
        elif action < N ** 2 + (N - 1) ** 2:
            # Place horizontal wall
            pos = action - N ** 2
            state.walls[pos] = 1
            state.player[1] -= 1
        else:
            # Place vertical wall
            pos = action - N ** 2 - (N - 1) ** 2
            state.walls[pos] = 2
            state.player[1] -= 1

        state.rotate_walls()

        # Swap players
        state.player, state.enemy = state.enemy, state.player

        return state

    # Check if it's the first player's turn
    def is_first_player(self):
        return self.depth % 2 == 0

    def __str__(self):
        """Display the game state as a string."""
        N = self.N
        is_first_player = self.is_first_player()

        board = [['o'] * (2 * N - 1) for _ in range(2 * N - 1)]
        for i in range(2 * N - 1):
            for j in range(2 * N - 1):
                if i % 2 == 1 and j % 2 == 1:
                    board[i][j] = 'x'

        p_pos = self.player[0] if is_first_player else self.enemy[0]
        e_pos = self.enemy[0] if is_first_player else self.player[0]

        e_pos = N ** 2 - 1 - e_pos

        p_x, p_y = p_pos // N, p_pos % N
        e_x, e_y = e_pos // N, e_pos % N

        board[2 * p_x][2 * p_y] = 'P'
        board[2 * e_x][2 * e_y] = 'E'

        turn_info = "<Enemy's Turn>" if is_first_player else "<Player's Turn>"

        if not is_first_player:
            self.rotate_walls()

        # Set walls
        for i in range(N - 1):
            for j in range(N - 1):
                pos = i * (N - 1) + j
                if self.walls[pos] == 1:
                    board[2 * i + 1][2 * j] = '-'
                    board[2 * i + 1][2 * (j + 1)] = '-'
                if self.walls[pos] == 2:
                    board[2 * i][2 * j + 1] = '|'
                    board[2 * (i + 1)][2 * j + 1] = '|'

        if not is_first_player:
            self.rotate_walls()

        board_str = '\n'.join([''.join(row) for row in board])
        return turn_info + '\n' + board_str


# Randomly select an action
def random_action(state):
    legal_actions = state.legal_actions()
    action = random.randint(0, len(legal_actions) - 1)
    return legal_actions[action]

# Calculate state value using alpha-beta pruning
def alpha_beta(state, alpha, beta):
    # Loss is -1
    if state.is_lose():
        return -1

    # Draw is 0
    if state.is_draw():
        return 0

    # Calculate state values for legal actions
    for action in state.legal_actions():
        score = -alpha_beta(state.next(action), -beta, -alpha)
        if score > alpha:
            alpha = score

        # If the best score for the current node exceeds the parent node, stop the search
        if alpha >= beta:
            return alpha

    # Return the maximum value of the state values for legal actions
    return alpha

# Select an action using alpha-beta pruning
def alpha_beta_action(state):
    # Calculate state values for legal actions
    best_action = 0
    alpha = -float('inf')
    for action in state.legal_actions():
        score = -alpha_beta(state.next(action), -float('inf'), -alpha)
        if score > alpha:
            best_action = action
            alpha = score

    # Return the action with the maximum state value
    return best_action

# Playout
def playout(state):
    # Loss is -1
    if state.is_lose():
        return -1

    # Draw is 0
    if state.is_draw():
        return 0

    # Next state value
    return -playout(state.next(random_action(state)))

# Return the index of the maximum value
def argmax(collection):
    return collection.index(max(collection))

# Select an action using Monte Carlo Tree Search
def mcts_action(state):
    # Node for Monte Carlo Tree Search
    class node:
        # Initialization
        def __init__(self, state):
            self.state = state  # State
            self.w = 0  # Cumulative value
            self.n = 0  # Number of trials
            self.child_nodes = None  # Child nodes

        # Evaluation
        def evaluate(self):
            # When the game ends
            if self.state.is_done():
                # Get value from the game result
                value = -1 if self.state.is_lose() else 0  # Loss is -1, draw is 0

                # Update cumulative value and number of trials
                self.w += value
                self.n += 1
                return value

            # When there are no child nodes
            if not self.child_nodes:
                # Get value from playout
                value = playout(self.state)

                # Update cumulative value and number of trials
                self.w += value
                self.n += 1

                # Expand child nodes
                if self.n == 10:
                    self.expand()
                return value

            # When there are child nodes
            else:
                # Get value from evaluating the child node with the maximum UCB1
                value = -self.next_child_node().evaluate()

                # Update cumulative value and number of trials
                self.w += value
                self.n += 1
                return value

        # Expand child nodes
        def expand(self):
            legal_actions = self.state.legal_actions()
            self.child_nodes = []
            for action in legal_actions:
                self.child_nodes.append(node(self.state.next(action)))

        # Get the child node with the maximum UCB1
        def next_child_node(self):
            # Return the child node with n=0
            for child_node in self.child_nodes:
                if child_node.n == 0:
                    return child_node

            # Calculate UCB1
            t = 0
            for c in self.child_nodes:
                t += c.n
            ucb1_values = []
            for child_node in self.child_nodes:
                ucb1_values.append(-child_node.w/child_node.n + 2*(2*math.log(t)/child_node.n)**0.5)

            # Return the child node with the maximum UCB1
            return self.child_nodes[argmax(ucb1_values)]

    # Generate the root node
    root_node = node(state)
    root_node.expand()

    # Evaluate the root node 100 times
    for _ in range(100):
        root_node.evaluate()

    # Return the action with the maximum number of trials
    legal_actions = state.legal_actions()
    n_list = []
    for c in root_node.child_nodes:
        n_list.append(c.n)
    return legal_actions[argmax(n_list)]

# Running the function
if __name__ == '__main__':
    # Generate the state
    state = State()

    # Loop until the game ends
    while True:
        # When the game ends
        if state.is_done():
            break

        # Get the next state
        state = state.next(random_action(state))

        # Display as a string
        print(state)
        print()


<Player's Turn>
ooEoo
oxoxo
ooPoo
oxoxo
ooooo

<Enemy's Turn>
ooooE
oxoxo
ooPoo
oxoxo
ooooo

<Player's Turn>
ooooE
oxoxo
ooooo
oxoxo
ooPoo

<Enemy's Turn>
ooEoo
oxoxo
ooooo
oxoxo
ooPoo

<Player's Turn>
ooE|o
oxoxo
ooo|o
oxoxo
ooPoo

<Enemy's Turn>
o|E|o
oxoxo
o|o|o
oxoxo
ooPoo

<Player's Turn>
o|E|o
oxoxo
o|o|o
oxoxo
Poooo

<Enemy's Turn>
o|o|o
oxoxo
o|E|o
oxoxo
Poooo

<Player's Turn>
o|o|o
oxoxo
o|E|o
oxoxo
ooPoo

<Enemy's Turn>
o|o|o
oxoxo
o|o|o
oxoxo
ooPoE



pv_mcts

This code implements a reinforcement learning strategy based on Monte Carlo Tree Search (MCTS) that combines a neural network (called PV-MCTS, or Policy-Value MCTS) to select actions. It is based on the State class (assuming Quoridor game state) and
the neural network model in dual_network (dual network, providing policy and value prediction)

In [35]:
# ====================
# Monte Carlo Tree Search Implementation
# ====================

# Import packages


from math import sqrt
from tensorflow.keras.models import load_model
from pathlib import Path
import numpy as np
from copy import deepcopy
import random

# Prepare parameters
PV_EVALUATE_COUNT = 50 # Number of simulations per inference (original is 1600)

# Inference
def predict(model, state):
    # Reshape input data for inference
    a, b, c = DN_INPUT_SHAPE
    x = np.array(state.pieces_array())
    x = x.reshape(c, a, b).transpose(1, 2, 0).reshape(1, a, b, c)

    # Inference
    y = model.predict(x, batch_size=1)

    # Get policy
    policies = y[0][0][list(state.legal_actions())] # Only legal moves
    policies /= np.sum(policies) if np.sum(policies) else 1 # Convert to a probability distribution summing to 1

    # Get value
    value = y[1][0][0]
    return policies, value

# Convert list of nodes to list of scores
def nodes_to_scores(nodes):
    scores = []
    for c in nodes:
        scores.append(c.n)
    return scores

# Get Monte Carlo Tree Search scores
def pv_mcts_scores(model, state, temperature):
    # Define Monte Carlo Tree Search node
    class Node:
        # Initialize node
        def __init__(self, state, p):
            self.state = state # State
            self.p = p # Policy
            self.w = 0 # Cumulative value
            self.n = 0 # Number of simulations
            self.child_nodes = None  # Child nodes

        # Calculate value of the state
        def evaluate(self):
            # If the game is over
            if self.state.is_done():
                # Get value from the game result
                value = -1 if self.state.is_lose() else 0

                # Update cumulative value and number of simulations
                self.w += value
                self.n += 1
                return value

            # If there are no child nodes
            if not self.child_nodes:
                # Get policy and value from neural network inference
                policies, value = predict(model, self.state)

                # Update cumulative value and number of simulations
                self.w += value
                self.n += 1

                # Expand child nodes
                self.child_nodes = []
                for action, policy in zip(self.state.legal_actions(), policies):
                    self.child_nodes.append(Node(self.state.next(action), policy))
                return value

            # If there are child nodes
            else:
                # Get value from the evaluation of the child node with the maximum arc evaluation value
                value = -self.next_child_node().evaluate()

                # Update cumulative value and number of simulations
                self.w += value
                self.n += 1
                return value

        # Get child node with the maximum arc evaluation value
        def next_child_node(self):
            # Calculate arc evaluation value
            C_PUCT = 1.0
            t = sum(nodes_to_scores(self.child_nodes))
            pucb_values = []
            for child_node in self.child_nodes:
                pucb_values.append((-child_node.w / child_node.n if child_node.n else 0.0) +
                    C_PUCT * child_node.p * sqrt(t) / (1 + child_node.n))

            # Return child node with the maximum arc evaluation value
            return self.child_nodes[np.argmax(pucb_values)]

    # Create a node for the current state
    root_node = Node(state, 0)

    # Perform multiple evaluations
    for _ in range(PV_EVALUATE_COUNT):
        root_node.evaluate()

    # Probability distribution of legal moves
    scores = nodes_to_scores(root_node.child_nodes)
    if temperature == 0: # Only the maximum value is 1
        action = np.argmax(scores)
        scores = np.zeros(len(scores))
        scores[action] = 1
    else: # Add variation with Boltzmann distribution
        scores = boltzman(scores, temperature)
    return scores

# Action selection with Monte Carlo Tree Search
def pv_mcts_action(model, temperature=0):
    def pv_mcts_action(state):
        scores = pv_mcts_scores(model, deepcopy(state), temperature)

        return np.random.choice(state.legal_actions(), p=scores)
    return pv_mcts_action

# Boltzmann distribution
def boltzman(xs, temperature):
    xs = [x ** (1 / temperature) for x in xs]
    return [x / sum(xs) for x in xs]

def random_action():
    def random_action(state):
        legal_actions = state.legal_actions()
        action = random.randint(0, len(legal_actions) - 1)

        return legal_actions[action]
    return random_action

# Confirm operation
if __name__ == '__main__':
    # Load model
    path = sorted(Path('./model').glob('*.keras'))[-1]
    model = load_model(str(path))

    # Generate state
    state = State()

    # Create function to get actions with Monte Carlo Tree Search
    next_action = pv_mcts_action(model, 1.0)

    # Loop until the game is over
    while True:
        # If the game is over
        if state.is_done():
            break

        # Get action
        action = next_action(state)

        # Get next state
        state = state.next(action)

        # Print state
        print(state)


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 876ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 45ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 41ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 41ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 53ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 42ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 43ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 43ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 43ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 42ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 43ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 41ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 42ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4

KeyboardInterrupt: 

3.human player and UI

In [40]:
# Importing necessary packages and modules


from tensorflow.keras.models import load_model
from IPython.display import clear_output
import time

# Loading the best player's model
model = load_model('./model/best.keras')

class GameConsole:
    def __init__(self, model=None):

        self.state = State()
        self.N = self.state.N


        self.next_action = pv_mcts_action(model) if model else random_action

    def draw_board(self):

        print(self.state)

    def human_turn(self):

        if self.state.is_done() or not self.state.is_first_player():
            return False


        legal_actions = self.state.legal_actions()
        print("合法动作：", legal_actions)
        print("动作说明：")
        print(f"  0 ~ {self.N**2 - 1}: 移动到对应位置")
        print(f"  {self.N**2} ~ {self.N**2 + (self.N-1)**2 - 1}: 放置水平墙")
        print(f"  {self.N**2 + (self.N-1)**2} ~ {self.N**2 + 2*(self.N-1)**2 - 1}: 放置垂直墙")


        action = int(input("请输入你的动作（输入动作编号）："))


        if action not in legal_actions:
            print("非法动作，请重新输入！")
            return False


        self.state = self.state.next(action)
        return True

    def ai_turn(self):
        if self.state.is_done():
            return


        action = self.next_action(self.state)
        print(f"AI 选择的动作：{action}")


        self.state = self.state.next(action)
        time.sleep(1)

    def display_result(self):
        if self.state.is_lose():
            if self.state.is_first_player():
                print("你输了！")
            else:
                print("你赢了！")
        elif self.state.is_draw():
            print("平局！")

    def run(self):
        while True:
            clear_output(wait=True)
            self.draw_board()

            if self.state.is_done():
                self.display_result()
                break

            # 玩家回合
            if self.state.is_first_player():
                while not self.human_turn():
                    pass
            else:
                self.ai_turn()

# 运行游戏
if __name__ == '__main__':
    game = GameConsole(model=model)
    game.run()

<Enemy's Turn>
ooEoo
ox-x-
ooooo
oxoxo
Poooo
合法动作： [3, 7, 13, 11, 15, 12, 16]
动作说明：
  0 ~ 8: 移动到对应位置
  9 ~ 12: 放置水平墙
  13 ~ 16: 放置垂直墙


KeyboardInterrupt: Interrupted by user

4. Train: self_play

This code implements Self-Play, a common method of training data generation in reinforcement learning, especially in strategy games (such as Quoridor). It generates training data (state, strategy, and value) by letting a neural network model fight itself, and then saves this data for subsequent model training to use

In [45]:
# ====================
# Self-Play Part
# ====================

# Importing packages

from datetime import datetime
from tensorflow.keras.models import load_model
from tensorflow.keras import backend as K
from pathlib import Path
import numpy as np
import pickle
import os
from copy import deepcopy

# Preparing parameters
SP_GAME_COUNT = 50  # Number of games for self-play (25000 in the original version)
SP_TEMPERATURE = 1.0  # Temperature parameter for Boltzmann distribution

# Value of the first player
def first_player_value(ended_state):
    # 1: First player wins, -1: First player loses, 0: Draw
    if ended_state.is_lose():
        return -1 if ended_state.is_first_player() else 1
    return 0

# Saving training data
def write_data(history):
    now = datetime.now()
    os.makedirs('./data/', exist_ok=True)  # Create folder if it does not exist
    path = './data/{:04}{:02}{:02}{:02}{:02}{:02}.history'.format(
        now.year, now.month, now.day, now.hour, now.minute, now.second)
    with open(path, mode='wb') as f:
        pickle.dump(history, f)

# Executing one game
def play(model):
    # Training data
    history = []

    # Generating the state
    state = State()

    while True:
        # When the game ends
        if state.is_done():
            break

        # Getting the probability distribution of legal moves
        scores = pv_mcts_scores(model, deepcopy(state), SP_TEMPERATURE)

        # Adding the state and policy to the training data
        policies = [0] * DN_OUTPUT_SIZE
        for action, policy in zip(state.legal_actions(), scores):
            policies[action] = policy
        history.append([state.pieces_array(), policies, None])

        # Getting the action
        action = np.random.choice(state.legal_actions(), p=scores)

        # Getting the next state
        state = state.next(action)

    # Adding the value to the training data
    value = first_player_value(state)
    for i in range(len(history)):
        history[i][2] = value
        value = -value
    return history

# Self-Play
def self_play():
    # Training data
    history = []

    # Loading the best player's model
    model = load_model('./model/best.keras')

    # Executing multiple games
    for i in range(SP_GAME_COUNT):
        # Executing one game
        h = play(model)
        history.extend(h)

        # Output
        print('\rSelfPlay {}/{}'.format(i+1, SP_GAME_COUNT), end='')
    print('')

    # Saving the training data
    write_data(history)

    # Clearing the model
    K.clear_session()
    del model

# Running the function
if __name__ == '__main__':
    self_play()



[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 877ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 43ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 43ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 45ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 43ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 40ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 41ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 43ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 46ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 42ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 23ms/step

KeyboardInterrupt: 

4.Train :Parameter Evaluation

This code implements an evaluation and update mechanism for a neural network model, which is used to compare the performance of two models (the latest model and the best model) in the game and update the "best player" model based on the results. It is evaluated based on Quoridor games and PV-MCTS.

In [43]:
# ====================
# New Parameter Evaluation Section
# ====================

# Import packages

from tensorflow.keras.models import load_model
from tensorflow.keras import backend as K
from pathlib import Path
from shutil import copy
import numpy as np

# Prepare parameters
EN_GAME_COUNT = 10 # Number of games per evaluation (originally 400)
EN_TEMPERATURE = 1.0 # Temperature of the Boltzmann distribution

# Points for the first player
def first_player_point(ended_state):
    # 1: first player wins, 0: first player loses, 0.5: draw
    if ended_state.is_lose():
        return 0 if ended_state.is_first_player() else 1
    return 0.5

# Execute one game
def play(next_actions):
    # Generate state
    state = State()

    # Loop until the game ends
    while True:
        # When the game ends
        if state.is_done():
            break

        # Get action
        next_action = next_actions[0] if state.is_first_player() else next_actions[1]
        action = next_action(state)

        # Get the next state
        state = state.next(action)

    # Return points for the first player
    return first_player_point(state)

# Replace the best player
def update_best_player():
    copy('./model/latest.keras', './model/best.keras')
    print('Change BestPlayer')

# Network evaluation
def evaluate_network():
    # Load the model of the latest player
    model0 = load_model('./model/latest.keras')

    # Load the model of the best player
    model1 = load_model('./model/best.keras')

    # Generate a function to select actions using PV MCTS
    next_action0 = pv_mcts_action(model0, EN_TEMPERATURE)
    next_action1 = pv_mcts_action(model1, EN_TEMPERATURE)
    next_actions = (next_action0, next_action1)

    # Repeat multiple matches
    total_point = 0
    for i in range(EN_GAME_COUNT):
        # Execute one game
        if i % 2 == 0:
            total_point += play(next_actions)
        else:
            total_point += 1 - play(list(reversed(next_actions)))

        # Output
        print('\rEvaluate {}/{}'.format(i + 1, EN_GAME_COUNT), end='')
    print('')

    # Calculate average points
    average_point = total_point / EN_GAME_COUNT
    print('AveragePoint', average_point)

    # Clear models
    K.clear_session()
    del model0
    del model1

    # Replace the best player
    if average_point > 0.5:
        update_best_player()
        return True
    else:
        return False

# Operation check
if __name__ == '__main__':
    evaluate_network()


KeyboardInterrupt: 