In [1]:
# Creating RL MLP for Tic Tac Toe

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import random
from IPython.display import display, clear_output
from time import sleep

import numpy as np
import time
import sys
from functools import reduce


In [3]:
# Setting up GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [4]:
class AgentNN(nn.Module):
    def __init__(self, board_dim=3, hidden_size=64):
        super().__init__()
        self.board_dim = board_dim
        self.hidden_size = hidden_size

        self.input = nn.Linear(2 + (board_dim ** 2) * 3, hidden_size)
        self.hidden = nn.Linear(hidden_size, hidden_size)
        self.out = nn.Linear(hidden_size, board_dim ** 2)

    def forward(self, x):
        x = self.input(x)
        x = F.relu(x)
        x = self.hidden(x)
        x = F.relu(x)
        return self.out(x)

    def save(self, file_name):
        torch.save(self, file_name)

In [5]:
class Board:
    def __init__(self, board_dim=3, win_condition=None):
        self.board_dim = board_dim
        if win_condition is None:
            self.win_condition = board_dim
        else:
            self.win_condition = win_condition
        # (is_empty, is_cross, is_nought)
        self.side_to_pos = {
            'empty': 0,
            'cross': 1,
            'nought': 2
        }
        self.board = np.zeros((board_dim, board_dim, 3))
        self.board[:,:,0] = 1
        self.empty_cells = board_dim * board_dim
        self.game_ended = False
        self.result = None

        self.mark_illegal_move = None

    def deepcopy(self):
        b = Board()
        b.board_dim = self.board_dim
        b.win_condition = self.win_condition
        b.empty_cells = self.empty_cells
        b.board = np.copy(self.board)
        return b

    def check_win(self, i, j, side):
        if self.empty_cells == 0:
            self.game_ended = True
            self.result = 'draw'
            return

        pos = self.side_to_pos[side]
        # vertical
        same_cell = 0
        k = i + 1
        while k < self.board_dim and self.board[k][j][pos] == 1:
            same_cell += 1
            k += 1

        k = i - 1
        while k >= 0 and self.board[k][j][pos] == 1:
            same_cell += 1
            k -= 1

        if same_cell == (self.win_condition - 1):
            self.game_ended = True
            self.result = side
            return


        # horizontals
        same_cell = 0
        k = j + 1
        while k < self.board_dim and self.board[i][k][pos] == 1:
            same_cell += 1
            k += 1

        k = j - 1
        while k >= 0 and self.board[i][k][pos] == 1:
            same_cell += 1
            k -= 1

        if same_cell == (self.win_condition - 1):
            self.game_ended = True
            self.result = side
            return

        # diagonals
        same_cell = 0
        k = i + 1
        l = j + 1
        while k < self.board_dim and l < self.board_dim and self.board[k][l][pos] == 1:
            same_cell += 1
            k += 1
            l += 1

        k = i - 1
        l = j - 1
        while k >= 0 and l >= 0 and self.board[k][l][pos] == 1:
            same_cell += 1
            k -= 1
            l -= 1

        if same_cell == (self.win_condition - 1):
            self.game_ended = True
            self.result = side
            return

        same_cell = 0
        k = i + 1
        l = j - 1
        while k < self.board_dim and l >= 0 and self.board[k][l][pos] == 1:
            same_cell += 1
            k += 1
            l -= 1

        k = i - 1
        l = j + 1
        while k >= 0 and l < self.board_dim and self.board[k][l][pos] == 1:
            same_cell += 1
            k -= 1
            l += 1

        if same_cell == (self.win_condition - 1):
            self.game_ended = True
            self.result = side
            return

    def get_field_symbol(self, i, j):
        pos_to_symbol = {0: ' ', 1: 'x', 2: 'o'}
        for pos, symbol in pos_to_symbol.items():
            if self.board[i][j][pos] == 1:
                return symbol


    def show(self):
        clear_output(wait=True)
        print('    ', end='')
        for i in range(self.board_dim):
            print(f"{i}   ", end='')
        print()
        for i in range(self.board_dim):
            print('    ', end='')
            print('-   ' * self.board_dim)
            print(f"{i} | ", end='')
            row = []
            for j in range(self.board_dim):
                if self.mark_illegal_move is not None and self.mark_illegal_move == (i, j):
                    row.append(f'!{self.get_field_symbol(i, j)}!')
                else:
                    row.append(self.get_field_symbol(i, j))
            print(' | '.join(row) + ' |')
        print('    ' + '-   ' * self.board_dim)

    @staticmethod
    def get_opponent(side):
        if side == 'cross':
            return 'nought'

        if side == 'nought':
            return 'cross'

        raise ValueError(f"Illegal side {side}")

    def change_cell(self, i, j, pos):
        self.board[i][j][0] = 0
        self.board[i][j][1] = 0
        self.board[i][j][2] = 0
        self.board[i][j][pos] = 1


    def make_move(self, move, side) -> None:
        i = move[0]
        j = move[1]

        pos = self.side_to_pos[side]

        if self.board[i][j][0] != 1: # illegal move
            self.game_ended = True
            self.result = self.get_opponent(side)
            self.mark_illegal_move = (i, j)
            return

        self.change_cell(i, j, pos)
        self.empty_cells -= 1

        if self.empty_cells == 0:
            self.game_ended = True
            self.result = 'draw'
            return

        self.check_win(i, j, side)

    def make_move_x(self, move):
        self.make_move(move, 'cross')

    def make_move_o(self, move):
        self.make_move(move, 'nought')

In [6]:
from random import randint

class RandomAgent:
    def __init__(self, board_dim):
        self.board_dim = board_dim

    @staticmethod
    def get_move(board):
        available_cells = []
        for i in range(board.board_dim):
            for j in range(board.board_dim):
                if board.board[i][j][0] == 1:
                    available_cells.append((i,j))
        return available_cells[randint(0, len(available_cells) - 1)]

In [7]:
class DoubleAgent:
    def __init__(self, hidden_size=64, file_name=None, exploration_rate=0.1) -> None:

        if file_name is not None:
            self.agent = torch.load(file_name)
        else:
            self.agent = AgentNN(hidden_size=hidden_size)
        self.agent.to(device)
        self.agent_random = RandomAgent(3)

        self.optimizer = optim.AdamW(self.agent.parameters(), amsgrad=True)

        self.remap_move = {}
        pos = 0
        for i in range(3):
            for j in range(3):
                self.remap_move[pos] = (i, j)
                pos += 1

        self.exploration_rate = exploration_rate

    def get_single_moves(self, show):
        b = Board()
        turn = 'cross'
        if show:
            b.show()
            time.sleep(1)

        moves = []
        turn_counter = 0
        while b.game_ended != True:
            if turn == 'cross':
                curr_data = torch.FloatTensor(np.concatenate(([1], [0], b.board.flatten()))).to(device)

                thought = self.agent(curr_data)
                if data is None:
                    data = thought.unsqueeze(1)
                else:
                    data = torch.cat((data, thought.unsqueeze(1)), dim=1)

                if is_eval == False and random.random() < self.exploration_rate:
                    curr_move = torch.topk(thought, 2).indices[1].item()
                else:
                    curr_move = thought.argmax().item()
                moves.append(curr_move)

                b.make_move_x(self.remap_move[curr_move])
                turn = 'nought'
                turn_counter += 1
            else:
                if True:
                    b.make_move_o(self.agent_random.get_move(b))
                else:
                    thought = self.agent(torch.FloatTensor(np.concatenate(([0], [1], b.board.flatten()))).to(device))
                    b.make_move_o(self.remap_move[thought.argmax().item()])
                turn = 'cross'

            if show:
                b.show()
                time.sleep(1)

        return 


    def play(self, show=True, n_games=1, is_eval=False):
        if is_eval:
            self.agent.eval()

        data = None
        game_results = None
        n_wins_or_draws = 0
        for i in range(n_games):

            game_revard = 0

            if game_result == 'nought':
                game_revard = -1 * b.empty_cells
            elif game_result == 'cross':
                game_revard = 1 * b.empty_cells
                n_wins_or_draws += 1
            else:
                game_revard = 3
                n_wins_or_draws += 1

            game_result = np.zeros((9, turn_counter))
            for i, move in enumerate(moves):
                game_result[move][i] = game_revard
            game_result = torch.FloatTensor(game_result).to(device)

            if game_results is None:
                game_results = game_result
            else:
                game_results = torch.cat((game_results, game_result), dim=1)



        if is_eval:
            print(f'Wins or draws rate: {n_wins_or_draws / n_games}')
            self.agent.train()
            time.sleep(1)
        else:
            loss = F.cross_entropy(data, game_results)
            loss.backward()
            self.optimizer.step()
            self.optimizer.zero_grad()

    def train(self, n_batches=100, batch_size=10, show_per_batch=None):
        for i in range(n_batches):
            if show_per_batch is not None and i % show_per_batch == 0:
                self.play(show=True, n_games=1)
            self.play(show=False, n_games=batch_size)

    def save(self, file_name):
        self.agent.save(file_name)



In [12]:
a = DoubleAgent(hidden_size=1024)


In [13]:
b = Board()
curr_data = torch.FloatTensor(np.concatenate(([1], [0], b.board.flatten()))).to(device)
a.agent(curr_data)

tensor([-52797.1953, -50200.4727, -76704.4844, -51112.2070, -50424.5781,
        -50710.7695, -50444.6094, -50239.3750, -50402.1719], device='cuda:0',
       grad_fn=<AddBackward0>)

In [51]:
a = DoubleAgent(hidden_size=1024)
#a.play()
N_EPOCHS = 1000

for e in range(N_EPOCHS):
    a.train(n_batches=100, batch_size=10)
    a.play(show=True)
    a.play(n_games=1000, show=False, is_eval=True)
    a.save(f'data/double_agent_10_for_draw_{e}')

    0   1   2   
    -   -   -   
0 |   |   |   |
    -   -   -   
1 |   | o |   |
    -   -   -   
2 |   | !x! |   |
    -   -   -   
Wins or draws rate: 0.0


In [11]:
len(play_n_games(RandomAgent(3), RandomAgent(3), 10))

10

In [23]:
from torch.multiprocessing import Pool, Process, set_start_method

set_start_method('spawn', force=True)

pool = Pool(processes=8)

with Pool(processes=2) as pool:
    multiple_results = [pool.apply_async(play_n_games, (IntelligentAgent(), RandomAgent(3), 100)) for i in range(2)]
    print([res.get() for res in multiple_results])



[[(-1, ([(<utils.Board object at 0x7efd75df0610>, (1, 2)), (<utils.Board object at 0x7efd71c5a090>, (1, 2)), (<utils.Board object at 0x7efd70c43d10>, (1, 0))], [(<utils.Board object at 0x7efd70c43990>, (1, 1)), (<utils.Board object at 0x7efd71bff3d0>, (0, 2))])), (-1, ([(<utils.Board object at 0x7efd712b1710>, (1, 0)), (<utils.Board object at 0x7efd712b2ed0>, (1, 0))], [(<utils.Board object at 0x7efd712b0190>, (1, 2))])), (-1, ([(<utils.Board object at 0x7efd712b16d0>, (1, 0)), (<utils.Board object at 0x7efd71c436d0>, (1, 2)), (<utils.Board object at 0x7efd71c42710>, (1, 0))], [(<utils.Board object at 0x7efd71c41e50>, (2, 2)), (<utils.Board object at 0x7efd71c415d0>, (2, 1))])), (-1, ([(<utils.Board object at 0x7efd71b779d0>, (1, 0)), (<utils.Board object at 0x7efd71b74ed0>, (1, 0))], [(<utils.Board object at 0x7efd71b77790>, (2, 2))])), (-1, ([(<utils.Board object at 0x7efd71b75c90>, (1, 0)), (<utils.Board object at 0x7efd71b74750>, (0, 1)), (<utils.Board object at 0x7efd71b75f10>, (1

In [22]:
START = 0
NUM_EPOCHS = 15
NUM_GAMES = 3
PRINT_TIME = False
FILE_PREFIX = 'test_cross'
SIDE = 'cross'
FILED_SIZE = 3


start_time = time.perf_counter()
for epoch in range(START, START + NUM_EPOCHS):
    file_name = None
    if epoch > 0:
        file_name = f'data/{FILE_PREFIX}_test_nn_{epoch - 1}'
    agent = IntelligentAgent(nn_filename=file_name, hidden_size=4096, board_dim=FILED_SIZE)
    random_agent = RandomAgent(FILED_SIZE)

    wins = 0
    draws = 0
    total_moves = []
    results = []
    start_batch_time = time.perf_counter()
    with Pool(processes=8) as pool:

        if epoch == 0:
            multiple_results = [pool.apply_async(play_n_games, (random_agent, random_agent, NUM_GAMES)) for i in range(8)]
        else:
            multiple_results = [pool.apply_async(play_n_games, (IntelligentAgent(nn_filename=file_name, hidden_size=4096, board_dim=FILED_SIZE), random_agent, NUM_GAMES)) for i in range(8)]
        for r in multiple_results:
            partiotional_result = r.get()
            for x in partiotional_result:
                res, moves = x[0], x[1]
                if res == 1:
                    wins += 1
                if res == 0:
                    draws += 1
                    res = 1

                if res != -1:
                    total_moves += moves[0]
                    results += [res] * len(moves[0])
                else:
                    total_moves += moves[0]
                    results += [res] * len(moves[0])

    if PRINT_TIME:
        print(f'Total time: {time.perf_counter() - start_time}, Time for creating current batch: {time.perf_counter() - start_batch_time}')
        start_train_time = time.perf_counter()

    agent.train(total_moves, results)

    if PRINT_TIME:
        print(f'Total time: {time.perf_counter() - start_time}, Time for training current batch: {time.perf_counter() - start_train_time}')


    # clear tmp variables
    total_moves = []
    results = []

    if PRINT_TIME:
        start_batch_time = time.perf_counter()


    print(f'Total time: {time.perf_counter() - start_time}, Win rate {wins / (NUM_GAMES * 8)} Draw rate {draws / (NUM_GAMES * 8)} Win or draw rate {(draws + wins) / (NUM_GAMES * 8)}')
    agent.save(f'data/{FILE_PREFIX}_test_nn_{epoch}')

Total time: 1.1449478029999227, Win rate 0.625 Draw rate 0.0 Win or draw rate 0.625
Total time: 12.045603569999912, Win rate 0.0 Draw rate 0.0 Win or draw rate 0.0
Total time: 22.957473987999947, Win rate 0.0 Draw rate 0.0 Win or draw rate 0.0
Total time: 33.9161569479993, Win rate 0.0 Draw rate 0.0 Win or draw rate 0.0
Total time: 44.912877117999415, Win rate 0.0 Draw rate 0.0 Win or draw rate 0.0
Total time: 56.13179768199916, Win rate 0.0 Draw rate 0.0 Win or draw rate 0.0
Total time: 67.10688208800002, Win rate 0.0 Draw rate 0.0 Win or draw rate 0.0
Total time: 78.20594420499947, Win rate 0.0 Draw rate 0.0 Win or draw rate 0.0
Total time: 89.23685048099924, Win rate 0.0 Draw rate 0.0 Win or draw rate 0.0
Total time: 100.2242379219997, Win rate 0.0 Draw rate 0.0 Win or draw rate 0.0
Total time: 111.04986624799949, Win rate 0.0 Draw rate 0.0 Win or draw rate 0.0
Total time: 121.9362800129993, Win rate 0.0 Draw rate 0.0 Win or draw rate 0.0
Total time: 132.80976037899927, Win rate 0.0

In [None]:
agent_o = RandomAgent(3)
agent_x = IntelligentAgent(nn_filename='test_nn_9')
wins = 0
games = 1000
for i in range(games):
    res, _ = play_single_game(agent_x, agent_o)
    if res == 1:
        wins += 1
print(f'Win rate {wins / games}')

In [None]:
# agent_o = IntelligentAgent(nn_filename='test_nn_9')
agent_x = IntelligentAgent(nn_filename='test_nn_73')
agent_o = RandomAgent(3)
# agent_x = RandomAgent(3)
play_single_game(agent_x, agent_o, show=True)

In [None]:
# Our NN should return board size output with probabilities of best move
# We need to collect all steps of our agent
# How do we backprop with this information?