In [245]:
import numpy as np
from copy import deepcopy
from typing import Tuple
from tqdm import tqdm
from collections import deque

In [237]:
class Gomoku:
    
    '''
    Gym-like environmet for Chienese game Gomoku (a.k.a. Five-In-Row)
    '''
    def __init__(self, height=10, width=10, len_to_win=5, current_player=1):
        
        self.height = height
        self.width = width
        self.len_to_win = len_to_win
        
        # Board for game.
        # Possible values 0, 1, 2
        # 0 - empty field
        # 1 - chip of 1st player
        # 2 - chip of 2nd player
        self.state = np.zeros((height, width))
        
        # Player's turn
        # 1 - turn of 1st player
        # 2 - turn of 2nd player
        self.current_player = current_player
        
    def _check_winner(self):
        
        # Exclude first and last self.len_to_win // 2 rows and columns.
        # Will check it later
        for i in range(self.len_to_win // 2, self.height - self.len_to_win // 2):
            for j in range(self.len_to_win // 2, self.width - self.len_to_win // 2):
                
                # Check diagonals
                main_d, side_d = 0, 0
                
                for k in range(-(self.len_to_win // 2), self.len_to_win // 2 + 1):
                    
                    
                    if self.state[i + k][j + k] == self.current_player:
                        
                        main_d += 1
                        
                    if self.state[i - k][j + k] == self.current_player:
                        
                        side_d += 1
                        
                # Check, is there a line with length self.len_to_win
                
                if self.len_to_win in (main_d, side_d,
                                       np.sum(self.state[i - self.len_to_win: i + self.len_to_win + 1] == self.current_player),
                                       np.sum(self.state[j - self.len_to_win: j + self.len_to_win + 1] == self.current_player)):
                    return True, self.current_player
                
            # Check first and last self.len_to_win rows
            for i in range(self.len_to_win // 2):
                
                for j in range(self.width - self.len_to_win + 1):
                    
                    if self.len_to_win in (
                        np.sum(self.state[i, j:j + self.len_to_win + 1]),
                        np.sum(self.state[self.height - i - 1, j:j + self.len_to_win + 1])
                    ):
                        
                        return True, self.current_player
                    
            # Check first and last self.len_to_win columns
            for j in range(self.len_to_win // 2):
                
                for i in range(self.height - self.len_to_win + 1):
                    
                    if self.len_to_win in (
                        np.sum(self.state[i:i + self.len_to_win + 1, j]),
                        np.sum(self.state[i:i + self.len_to_win + 1, self.width - j - 1])
                    ):
                        
                        return True, self.current_player
                    
        return False, -1
                    
    
    def _get_reward(self):
        
        '''Return rewards both for the 1st and for 2nd player'''
        
        flag, player = self._check_winner()
        
        if flag:
            
            rewards = [-1, -1]
            
            rewards[self.current_player - 1] = 1
            
            return rewards, True
        
        return [0, 0], False
    
    def available_actions(self):
        
        # Rows and columns
        return list(zip(*np.where(self.state == 0)))
        
    def reset(self, current_player=1):
        
        '''Start the new game from initial position'''
        
        self.state = np.zeros((self.height, self.width))
        self.current_player = current_player
        
        return self.state
    
    def step(self, action: Tuple):
        
        # Action is a tuple with (i, j) coordinates, 
        # where the current player place his chip
        
        self.state[action] = self.current_player
        rewards, is_done = self._get_reward()
        number_of_available_actions = self.height * self.width - len(self.available_actions())
        
        self.current_player = 2 - self.current_player + 1
        
        if is_done or number_of_available_actions == 0:
            
            # Terminal state
            return self.state, rewards, True
        
        return self.state, rewards, False
        
    def render(self):
        
        print(*self.state, sep='\n')

In [238]:
def heuristic_value_fn(env):
    '''
    Calculate the longest line for each player which 
    doesn't contain chips the opponent.
    '''
    longest_line = [0, 0]
    
    for player in range(1, 3):
        
        # Check main and side diaogonals
        
        for i in range(env.len_to_win // 2, env.height - env.len_to_win // 2):
            for j in range(env.len_to_win // 2, env.width - env.len_to_win // 2):
                
                main_d, side_d = 0, 0
                
                for k in range(-(env.len_to_win // 2), env.len_to_win // 2 + 1):
                    
                    if env.state[i + k][j + k] == player:
                        
                        main_d += 1
                        
                    elif env.state[i + k][j + k] == 2 - player + 1:
                        
                        main_d = -10
                        
                    if env.state[i - k][j + k] == player:
                        
                        side_d += 1
                        
                    elif env.state[i - k][j + k] == 2 - player + 1:
                        
                        side_d = -10
                        
                longest_line[player - 1] = max([longest_line[player - 1], main_d, side_d])
                   
        # Check rows
 
        for i in range(env.height):
            for j in range(env.width - env.len_to_win + 1):
                
                row = 0 
                for k in range(env.len_to_win - 1):
                    
                    if env.state[i][j + k] == player:

                        row += 1

                    elif env.state[i][j + k] == 2 - player + 1:

                        row = -10
                        
                longest_line[player - 1] = max(longest_line[player - 1], row)
                
        # Check columns
        
        for i in range(env.height - env.len_to_win + 1):
            for j in range(env.width):
                
                column = 0
                for k in range(env.len_to_win - 1):
                    
                    if env.state[i + k][j] == player:

                        column += 1

                    elif env.state[i + k][j] == 2 - player + 1:

                        column = -10
                    
                longest_line[player - 1] = max(longest_line[player - 1], column)
                
    return longest_line
    
    
class Node:
    
    def __init__(self, state, depth=0, is_terminal=False, heuristic_value_fn=heuristic_value_fn):
        
        self.depth = depth
        self.state = state
        self.is_terminal = is_terminal
    
    def get_heuristic_value(self):
        
        return self.heuristic_value_fn(self.state)
    
def alphabeta(env, node, parent, depth, alpha, beta, maximazingPlayer, visited_states):
    
    if depth == 0:
        
        return node.get_heuristic_value()
    
    if maximazingPlayer:
        
        value = -float('inf')
        
        for action in env.available_actions():
            
            env_copy = deepcopy(env)
            
            new_state, reward, is_terminal = env_copy.step(action)
            
            new_state = tuple(new_state)
            
            if new_state not in visited_states:
                
                visited_states.add(new_state)
                
                new_node = Node(new_state, depth - 1)
            
                if is_terminal:

                    node.is_terminal = True

                    return node.get_heuristic_value()
                
                value = np.max(value, alphabeta(new_node, node, depth - 1, alpha, beta, False, visited_states))
            
            if value >= beta:
                
                break
                
            alpha = np.max(alpha, value)
            
        return value
    
    else:
        
        value = float('inf')
        
        for action in env.available_actions():
            
            env_copy = deepcopy(env)
            
            new_state, reward, is_terminal = env_copy.step(action)
            
            new_state = tuple(new_state)
            
            if new_state not in visited_states:
                
                visited_states.add(new_state)
                
                new_node = Node(new_state, depth - 1)
            
                if is_terminal:

                    node.is_terminal = True

                    return node.get_heuristic_value()
                
                value = np.min(value, alphabeta(new_node, node, depth - 1, alpha, beta, True, visited_states))
            
            if value <= alpha:
                
                break
                
            beta = np.min(beta, value)
            
        return value

In [None]:
def random_policy(actions):
    
    return np.random.choice(actions)

class MCTS:
    
    def __init__(self, default_policy, c_ucb=5, n_simulations=100):
        
        self.default_policy = default_policy
        self.c_ucb = c_ucb
        self.n_simulations = n_simulations
        
        
    def selection(self, env, root, tree, visited_states):
    
        q = deque()
        q.append(root)

        while q:

            state = q.pop()

            candidate_states = []

            for a in env.available_actions():
                
                new_env = deepcopy(env)

                new_state, reward, flag = env.step(a)
                
                new_state = tuple(new_state)

                if new_state not in tree:

                    visited_states.update({new_state: state})

                    return new_state, visited_states

                if new_state not in visited_states:

                    candidate_states.append(new_state)

            if len(candidate_states) > 0:


                best_state = max(candidate_states, key=lambda s: tree[s])

                q.appendleft(best_state)
                visited_states.update({best_state: state})

        return root, visited_states
            

    def game(env, state_e: Tuple, state_p: Tuple, default_policy: np.ndarray, max_steps=50):

        trajectory = [state_e]

        s_e = state_e
        s_p = state_p

        for i in range(max_steps):

            u = action_space[default_policy[s_e]]

            new_state_e, _ = transition_function(env, s_e, u)

            new_state_p = pursuer_transition(env, s_e, s_p)

            trajectory.append(new_state_e)

            s_e = new_state_e

            s_p = new_state_p

            if new_state_e in (goal, new_state_p):

                break

        return trajectory, s_p


    def simulation(env, state_e: Tuple, state_p: Tuple, goal: Tuple, default_policy: np.ndarray, n_iters: int = 50):

        rewards = []

        for i in range(n_iters):

            trajectory, s_p = game(env, state_e, state_p, default_policy)

            reward = get_reward(trajectory, s_p, goal)

            rewards.append(reward)

        mean_reward = np.mean(rewards)

        return mean_reward


    def backpropagation(tree, new_state, visited_states, mean_reward):

        state = new_state

        while state is not None:

            tree[state] = max(tree[state], mean_reward)

            state = visited_states[state]



In [239]:
env = Gomoku()

In [240]:
heuristic_value_fn(env)

[0, 0]

In [241]:
env.step((0, 0))
env.step((9, 9))
env.step((0, 5))
env.step((8, 8))
env.step((9, 0))
env.step((7, 7))
env.step((3, 3))
env.step((5, 5))
heuristic_value_fn(env)

[2, 4]

In [242]:
env.render()

[1. 0. 0. 0. 0. 1. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 1. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 2. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 2. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 2. 0.]
[1. 0. 0. 0. 0. 0. 0. 0. 0. 2.]


In [243]:
env.reset()

array([[0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]])

In [None]:
class Play:
    
    def __init__(self, env, player1, player2, n_episods):
        
        self.env = env
        self.player1 = player1
        self.player2 = player2
        
        self.n_episods = n_episods
        
        self.rewards = []
        
    def play(self):
        
        for i in tqdm(range(self.n_episods)):
            
            
            