3-a) Tic-Tac-Toe agent using the MCTS algorithm. The number of Monte Carlo simulations allowed for each board configuration must be configurable

In [18]:
import numpy as np


class Transform:
    def __init__(self, *operations):
        self.operations = operations

    def transform(self, target):
        for op in self.operations:
            target = op.transform(target)
        return target

    def reverse(self, target):
        for op in reverse(self.operations):
            target = op.reverse(target)
        return target


class Identity:
    @staticmethod
    def transform(matrix2d):
        return matrix2d

    @staticmethod
    def reverse(matrix2d):
        return matrix2d


class Rotate90:
    def __init__(self, number_of_rotations):
        self.number_of_rotations = number_of_rotations
        self.op = np.rot90

    def transform(self, matrix2d):
        return self.op(matrix2d, self.number_of_rotations)

    def reverse(self, transformed_matrix2d):
        return self.op(transformed_matrix2d, -self.number_of_rotations)


class Flip:
    def __init__(self, op):
        self.op = op

    def transform(self, matrix2d):
        return self.op(matrix2d)

    def reverse(self, transformed_matrix2d):
        return self.transform(transformed_matrix2d)


def reverse(items):
    return items[::-1]

In [19]:
import random
import itertools
import numpy as np


TRANSFORMATIONS = [Identity(), Rotate90(1), Rotate90(2), Rotate90(3),
                   Flip(np.flipud), Flip(np.fliplr),
                   Transform(Rotate90(1), Flip(np.flipud)),
                   Transform(Rotate90(1), Flip(np.fliplr))]

BOARD_SIZE = 3
BOARD_DIMENSIONS = (BOARD_SIZE, BOARD_SIZE)

CELL_X = 1
CELL_O = -1
CELL_EMPTY = 0

RESULT_X_WINS = 1
RESULT_O_WINS = -1
RESULT_DRAW = 0
RESULT_NOT_OVER = 2

new_board = np.array([CELL_EMPTY] * BOARD_SIZE ** 2)


def play_game(x_strategy, o_strategy):
    board = Board()
    player_strategies = itertools.cycle([x_strategy, o_strategy])

    while not board.is_gameover():
        play = next(player_strategies)
        board = play(board)

    return board


def play_games(total_games, x_strategy, o_strategy, play_single_game=play_game):
    results = {
        RESULT_X_WINS: 0,
        RESULT_O_WINS: 0,
        RESULT_DRAW: 0
    }

    for g in range(total_games):
        end_of_game = (play_single_game(x_strategy, o_strategy))
        result = end_of_game.get_game_result()
        results[result] += 1

    x_wins_percent = results[RESULT_X_WINS] / total_games * 100
    o_wins_percent = results[RESULT_O_WINS] / total_games * 100
    draw_percent = results[RESULT_DRAW] / total_games * 100

    print(f"x wins: {x_wins_percent:.2f}%")
    print(f"o wins: {o_wins_percent:.2f}%")
    print(f"draw  : {draw_percent:.2f}%")


def play_random_move(board):
    move = board.get_random_valid_move_index()
    return board.play_move(move)


def is_even(value):
    return value % 2 == 0


def is_empty(values):
    return values is None or len(values) == 0


class Board:
    def __init__(self, board=None, illegal_move=None):
        if board is None:
            self.board = np.copy(new_board)
        else:
            self.board = board

        self.illegal_move = illegal_move

        self.board_2d = self.board.reshape(BOARD_DIMENSIONS)

    def get_game_result(self):
        if self.illegal_move is not None:
            return RESULT_O_WINS if self.get_turn() == CELL_X else RESULT_X_WINS

        rows_cols_and_diagonals = get_rows_cols_and_diagonals(self.board_2d)

        sums = list(map(sum, rows_cols_and_diagonals))
        max_value = max(sums)
        min_value = min(sums)

        if max_value == BOARD_SIZE:
            return RESULT_X_WINS

        if min_value == -BOARD_SIZE:
            return RESULT_O_WINS

        if CELL_EMPTY not in self.board_2d:
            return RESULT_DRAW

        return RESULT_NOT_OVER

    def is_gameover(self):
        return self.get_game_result() != RESULT_NOT_OVER

    def is_in_illegal_state(self):
        return self.illegal_move is not None

    def play_move(self, move_index):
        board_copy = np.copy(self.board)

        if move_index not in self.get_valid_move_indexes():
            return Board(board_copy, illegal_move=move_index)

        board_copy[move_index] = self.get_turn()
        return Board(board_copy)

    def get_turn(self):
        non_zero = np.count_nonzero(self.board)
        return CELL_X if is_even(non_zero) else CELL_O

    def get_valid_move_indexes(self):
        return ([i for i in range(self.board.size)
                 if self.board[i] == CELL_EMPTY])

    def get_illegal_move_indexes(self):
        return ([i for i in range(self.board.size)
                if self.board[i] != CELL_EMPTY])

    def get_random_valid_move_index(self):
        return random.choice(self.get_valid_move_indexes())

    def print_board(self):
        print(self.get_board_as_string())

    def get_board_as_string(self):
        rows, cols = self.board_2d.shape
        board_as_string = "-------\n"
        for r in range(rows):
            for c in range(cols):
                move = get_symbol(self.board_2d[r, c])
                if c == 0:
                    board_as_string += f"|{move}|"
                elif c == 1:
                    board_as_string += f"{move}|"
                else:
                    board_as_string += f"{move}|\n"
        board_as_string += "-------\n"

        return board_as_string


class BoardCache:
    def __init__(self):
        self.cache = {}

    def set_for_position(self, board, o):
        self.cache[board.board_2d.tobytes()] = o

    def get_for_position(self, board):
        board_2d = board.board_2d

        orientations = get_symmetrical_board_orientations(board_2d)

        for b, t in orientations:
            result = self.cache.get(b.tobytes())
            if result is not None:
                return (result, t), True

        return None, False

    def reset(self):
        self.cache = {}


def get_symmetrical_board_orientations(board_2d):
    return [(t.transform(board_2d), t) for t in TRANSFORMATIONS]


def get_rows_cols_and_diagonals(board_2d):
    rows_and_diagonal = get_rows_and_diagonal(board_2d)
    cols_and_antidiagonal = get_rows_and_diagonal(np.rot90(board_2d))
    return rows_and_diagonal + cols_and_antidiagonal


def get_rows_and_diagonal(board_2d):
    num_rows = board_2d.shape[0]
    return ([row for row in board_2d[range(num_rows), :]]
            + [board_2d.diagonal()])


def get_symbol(cell):
    if cell == CELL_X:
        return 'X'
    if cell == CELL_O:
        return 'O'
    return '-'


def is_draw(board):
    return board.get_game_result() == RESULT_DRAW

In [20]:
import math

nodecache = BoardCache()


class Node:
    def __init__(self):
        self.parents = BoardCache()
        self.visits = 0
        self.wins = 0
        self.losses = 0
        self.draws = 0

    def add_parent_node(self, node_cache, parent_board):
        result, found = self.parents.get_for_position(parent_board)
        if found is False:
            parent_node = find_or_create_node(node_cache, parent_board)
            self.parents.set_for_position(parent_board, parent_node)

    def get_total_visits_for_parent_nodes(self):
        return sum([parent_node.visits for parent_node
                    in self.parents.cache.values()])

    def value(self):
        if self.visits == 0:
            return 0

        success_percentage = (self.wins + self.draws) / self.visits
        return success_percentage


def play_game_and_reset_playouts(x_strategy, o_strategy, node_cache=nodecache):
    node_cache.reset()
    board = play_game(x_strategy, o_strategy)
    node_cache.reset()
    return board

def play_mcts_move_with_live_playouts(board, node_cache=nodecache, num_playouts=200):
    perform_training_playouts(node_cache, board, num_playouts,
                              display_progress=False)
    return play_mcts_move(board, node_cache)

def play_mcts_move(board, node_cache=nodecache):
    move_index_node_pairs = get_move_index_node_pairs(board, node_cache)
    move_index_to_play = max(move_index_node_pairs,
                             key=lambda pair: pair[1].value())[0]
    return board.play_move(move_index_to_play)


def get_move_index_node_pairs(board, node_cache):
    boards = [board.play_move(mi) for mi in board.get_valid_move_indexes()]
    nodes = [find_or_create_node(node_cache, b) for b in boards]

    return zip(board.get_valid_move_indexes(), nodes)


def perform_training_playouts(node_cache=nodecache, board=Board(),
                              num_playouts=4000, display_progress=True):
    for game in range(num_playouts):
        perform_game_playout(node_cache, board)
        if display_progress is True and (game+1) % (num_playouts / 10) == 0:
            print(f"{game+1}/{num_playouts} playouts...")


def perform_game_playout(node_cache, board):
    game_history = [board]

    while not board.is_gameover():
        move_index = choose_move(node_cache, board)
        board = board.play_move(move_index)
        game_history.append(board)

    backpropagate(node_cache, board, game_history)


def choose_move(node_cache, parent_board):
    move_value_pairs = calculate_values(node_cache, parent_board)
    return max(move_value_pairs, key=lambda pair: pair[1])[0]


def calculate_values(node_cache, parent_board):
    child_boards = [parent_board.play_move(mi) for mi
                    in parent_board.get_valid_move_indexes()]
    values = [calculate_value(node_cache, parent_board, cb) for cb
              in child_boards]
    return zip(parent_board.get_valid_move_indexes(), values)


def calculate_value(node_cache, parent_board, board):
    node = find_or_create_node(node_cache, board)
    node.add_parent_node(node_cache, parent_board)
    if node.visits == 0:
        return math.inf

    parent_node_visits = node.get_total_visits_for_parent_nodes()

    assert node.visits <= parent_node_visits, \
        "child node visits should be a subset of visits to the parent node "

    exploration_term = (math.sqrt(2.0)
                        * math.sqrt(math.log(parent_node_visits) / node.visits))

    value = node.value() + exploration_term

    return value


def backpropagate(node_cache, final_board_position, game_history):
    for board in game_history:
        node = find_node(node_cache, board)
        node.visits += 1
        if is_win(board.get_turn(), final_board_position):
            node.wins += 1
        elif is_loss(board.get_turn(), final_board_position):
            node.losses += 1
        elif is_draw(final_board_position):
            node.draws += 1
        else:
            raise ValueError("Illegal game state")


def find_node(node_cache, board):
    result, found = node_cache.get_for_position(board)
    assert found is True, "node must exist"
    node, _ = result
    return node


def find_or_create_node(node_cache, board):
    result, found = node_cache.get_for_position(board)
    if found is False:
        node = Node()
        node_cache.set_for_position(board, node)
        return node

    node, _ = result
    return node


def is_win(player, board):
    result = board.get_game_result()
    return ((player == CELL_X and result == RESULT_O_WINS)
            or (player == CELL_O and result == RESULT_X_WINS))


def is_loss(player, board):
    result = board.get_game_result()
    return ((player == CELL_X and result == RESULT_X_WINS)
            or (player == CELL_O and result == RESULT_O_WINS))

Main

In [21]:
print("Training MCTS...")
perform_training_playouts()

Training MCTS...
400/4000 playouts...
800/4000 playouts...
1200/4000 playouts...
1600/4000 playouts...
2000/4000 playouts...
2400/4000 playouts...
2800/4000 playouts...
3200/4000 playouts...
3600/4000 playouts...
4000/4000 playouts...


(3-b) Test the efficacy of the MCTS agent by soliciting move recommendations for the following board configurations

• A suitable board configuration in which the MCTS agent is one move away from win

**Soln:** when the # of simulations is less than 100, the MCTS agent lost everytime. Only when the agent was trained for more than 1000 simulations, MCTS was winning the last move

• A suitable board configuration in which the MCTS agent is one move away from loss

**Soln:** when the # of simulations is less than 100, the MCTS agent lost everytime. Only when the agent was trained for more than 1000 simulations, MCTS was winning the last move

• The board configuration where the opponent has made the first move and has occupied the centre square

**Soln:** when the # of simulations is less than 100 and when opponent took centre square, MCTS lost all the time. After increasing the simulations to 1000, MCTS draws the match all the time.

3-c) Record the number of wins, loss and draws for Random agents

In [22]:

print("")
print("Playing random vs MCTS:")
print("-----------------------")
play_games(1000, play_random_move, play_mcts_move)




Playing random vs MCTS:
-----------------------
x wins: 0.00%
o wins: 60.60%
draw  : 39.40%


In [23]:
import numpy as np
import statistics as stats
import random
import operator
import itertools
from collections import deque

WIN_VALUE = 1.0
DRAW_VALUE = 1.0
LOSS_VALUE = 0.0

INITIAL_Q_VALUES_FOR_X = 0.0
INITIAL_Q_VALUES_FOR_O = 0.0


class QTable:
    def __init__(self):
        self.qtable = BoardCache()

    def get_q_values(self, board):
        move_indexes = board.get_valid_move_indexes()
        qvalues = [self.get_q_value(board, mi) for mi
                   in board.get_valid_move_indexes()]

        return dict(zip(move_indexes, qvalues))

    def get_q_value(self, board, move_index):
        new_position = board.play_move(move_index)
        result, found = self.qtable.get_for_position(new_position)
        if found is True:
            qvalue, _ = result
            return qvalue

        return get_initial_q_value(new_position)

    def update_q_value(self, board, move_index, qvalue):
        new_position = board.play_move(move_index)

        result, found = self.qtable.get_for_position(new_position)
        if found is True:
            _, t = result
            new_position_transformed = Board(
                t.transform(new_position.board_2d).flatten())
            self.qtable.set_for_position(new_position_transformed, qvalue)
            return

        self.qtable.set_for_position(new_position, qvalue)

    def get_move_index_and_max_q_value(self, board):
        q_values = self.get_q_values(board)
        return max(q_values.items(), key=operator.itemgetter(1))

    def print_q_values(self):
        print(f"num q_values = {len(self.qtable.cache)}")
        for k, v in self.qtable.cache.items():
            b = np.frombuffer(k, dtype=int)
            board = Board(b)
            board.print_board()
            print(f"qvalue = {v}")


def get_initial_q_value(board):
    return (INITIAL_Q_VALUES_FOR_X if board.get_turn() == CELL_O
            else INITIAL_Q_VALUES_FOR_O)


qtables = [QTable()]

double_qtables = [QTable(), QTable()]


def create_q_table_player(q_tables):
    def play(board):
        return play_q_table_move(board, q_tables)

    return play


def play_q_table_move(board, q_tables=None):
    if q_tables is None:
        q_tables = qtables

    move_index = choose_move_index(q_tables, board, 0)
    return board.play_move(move_index)


def choose_move_index(q_tables, board, epsilon):
    if epsilon > 0:
        random_value_from_0_to_1 = np.random.uniform()
        if random_value_from_0_to_1 < epsilon:
            return board.get_random_valid_move_index()

    move_q_value_pairs = get_move_average_q_value_pairs(q_tables, board)

    return max(move_q_value_pairs, key=lambda pair: pair[1])[0]


def get_move_average_q_value_pairs(q_tables, board):
    move_indexes = sorted(q_tables[0].get_q_values(board).keys())

    avg_q_values = [stats.mean(gather_q_values_for_move(q_tables, board, mi))
                    for mi in move_indexes]

    return list(zip(move_indexes, avg_q_values))


def gather_q_values_for_move(q_tables, board, move_index):
    return [q_table.get_q_value(board, move_index) for q_table in q_tables]


def play_training_games_x(total_games=7000, q_tables=None,
                          learning_rate=0.4, discount_factor=1.0, epsilon=0.7,
                          o_strategies=None):
    if q_tables is None:
        q_tables = qtables
    if o_strategies is None:
        o_strategies = [play_random_move]

    play_training_games(total_games, q_tables, CELL_X, learning_rate,
                        discount_factor, epsilon, None, o_strategies)


def play_training_games_o(total_games=7000, q_tables=None,
                          learning_rate=0.4, discount_factor=1.0, epsilon=0.7,
                          x_strategies=None):
    if q_tables is None:
        q_tables = qtables
    if x_strategies is None:
        x_strategies = [play_random_move]

    play_training_games(total_games, q_tables, CELL_O, learning_rate,
                        discount_factor, epsilon, x_strategies, None)


def play_training_games(total_games, q_tables, q_table_player, learning_rate,
                        discount_factor, epsilon, x_strategies, o_strategies):
    if x_strategies:
        x_strategies_to_use = itertools.cycle(x_strategies)

    if o_strategies:
        o_strategies_to_use = itertools.cycle(o_strategies)

    for game in range(total_games):
        move_history = deque()

        if not x_strategies:
            x = [create_training_player(q_tables, move_history, epsilon)]
            x_strategies_to_use = itertools.cycle(x)

        if not o_strategies:
            o = [create_training_player(q_tables, move_history, epsilon)]
            o_strategies_to_use = itertools.cycle(o)

        x_strategy_to_use = next(x_strategies_to_use)
        o_strategy_to_use = next(o_strategies_to_use)

        play_training_game(q_tables, move_history, q_table_player,
                           x_strategy_to_use, o_strategy_to_use, learning_rate,
                           discount_factor)

        if (game+1) % (total_games / 10) == 0:
            epsilon = max(0, epsilon - 0.1)
            print(f"{game+1}/{total_games} games, using epsilon={epsilon}...")


def play_training_game(q_tables, move_history, q_table_player, x_strategy,
                       o_strategy, learning_rate, discount_factor):
    board = play_game(x_strategy, o_strategy)

    update_training_gameover(q_tables, move_history, q_table_player, board,
                             learning_rate, discount_factor)


def update_training_gameover(q_tables, move_history, q_table_player, board,
                             learning_rate, discount_factor):
    game_result_reward = get_game_result_value(q_table_player, board)

    # move history is in reverse-chronological order - last to first
    next_position, move_index = move_history[0]
    for q_table in q_tables:
        current_q_value = q_table.get_q_value(next_position, move_index)
        new_q_value = calculate_new_q_value(current_q_value, game_result_reward,
                                            0.0, learning_rate, discount_factor)

        q_table.update_q_value(next_position, move_index, new_q_value)

    for (position, move_index) in list(move_history)[1:]:
        current_q_table, next_q_table = get_shuffled_q_tables(q_tables)

        max_next_move_index, _ = current_q_table.get_move_index_and_max_q_value(
            next_position)

        max_next_q_value = next_q_table.get_q_value(next_position,
                                                    max_next_move_index)

        current_q_value = current_q_table.get_q_value(position, move_index)
        new_q_value = calculate_new_q_value(current_q_value, 0.0,
                                            max_next_q_value, learning_rate,
                                            discount_factor)
        current_q_table.update_q_value(position, move_index, new_q_value)

        next_position = position


def calculate_new_q_value(current_q_value, reward, max_next_q_value,
                          learning_rate, discount_factor):
    weighted_prior_values = (1 - learning_rate) * current_q_value
    weighted_new_value = (learning_rate
                          * (reward + discount_factor * max_next_q_value))
    return weighted_prior_values + weighted_new_value


def get_shuffled_q_tables(q_tables):
    q_tables_copy = q_tables.copy()
    random.shuffle(q_tables_copy)
    q_tables_cycle = itertools.cycle(q_tables_copy)

    current_q_table = next(q_tables_cycle)
    next_q_table = next(q_tables_cycle)

    return current_q_table, next_q_table


def create_training_player(q_tables, move_history, epsilon):
    def play(board):
        move_index = choose_move_index(q_tables, board, epsilon)
        move_history.appendleft((board, move_index))
        return board.play_move(move_index)

    return play


def get_game_result_value(player, board):
    if is_win(player, board):
        return WIN_VALUE
    if is_loss(player, board):
        return LOSS_VALUE
    if is_draw(board):
        return DRAW_VALUE


def is_win(player, board):
    result = board.get_game_result()
    return ((player == CELL_O and result == RESULT_O_WINS)
            or (player == CELL_X and result == RESULT_X_WINS))


def is_loss(player, board):
    result = board.get_game_result()
    return ((player == CELL_O and result == RESULT_X_WINS)
            or (player == CELL_X and result == RESULT_O_WINS))



3-c) Record the number of wins, loss and draws for Safe (QTable) agents

In [24]:
print("-------------------------")
play_games(1000, play_q_table_move, play_mcts_move)
print("")
print("Playing qtable vs MCTS:")
print("---------------------------------")

-------------------------
x wins: 0.00%
o wins: 100.00%
draw  : 0.00%

Playing qtable vs MCTS:
---------------------------------


3-d) Record of the number of wins, loss and draws by letting the MCTS agent play 1000 games against itself

In [25]:
print("")
print("Playing MCTS vs MCTS:")
print("---------------------")
play_games(1000, play_mcts_move, play_mcts_move)
print("")


Playing MCTS vs MCTS:
---------------------
x wins: 0.00%
o wins: 0.00%
draw  : 100.00%

