In [2]:
import numpy as np
from ipywidgets import HBox, VBox, Label, Layout, Button, Output
from IPython.display import display
from functools import partial
import random
from tqdm import tqdm_notebook as tqdm
import matplotlib.pyplot as plt
%matplotlib inline
import pickle
import math
import copy
from enum import Enum
from abc import ABC, abstractmethod
from dataclasses import dataclass

In [3]:
class Mark(Enum):
    NO = 0
    X = 1
    O = 2

In [4]:
class View(ABC):
    @abstractmethod
    def __init__(self):
        pass
    
    @abstractmethod
    def update(self):
        pass

In [5]:
class JupyterView(View):
    def __init__(self):
        self.out = Output()
    
    def on_click(self, row, column, button):
        self.disable_all_buttons()
        self.turn_callback(row, column)
    
    def update(self, state):
        label = Label()
        if state.is_game_end:
            if state.winner == Mark.NO:
                label.value = "Draw!"
            else:
                label.value = state.winner.name + ' wins!'
        else:
            label.value = state.current_player.name + "'s turn"
        
        h, w = state.board.shape
        self.buttons = []
        buttons = []
        
        for r in range(h):
            line = []
            for c in range(w):
                button = Button(
                    description='',
                    layout=Layout(width='20px', height='20px', padding='0px')
                )
                
                button.on_click(partial(self.on_click, r, c))

                if state.board[r][c] == Mark.X.value:
                    button.description = Mark.X.name
                    button.disabled = True
                elif state.board[r][c] == Mark.O.value:
                    button.description = Mark.O.name
                    button.disabled = True
                elif state.is_game_end:
                    button.disabled = True
                
                line.append(button)
                self.buttons.append(button)
                
            buttons.append(HBox(line))
            
        self.out.clear_output()
        with self.out:
            display(VBox([label, VBox(buttons)]))
        
    def disable_all_buttons(self):
        for b in self.buttons:
            b.disabled = True
    


In [6]:
class Agent(ABC):
    @abstractmethod
    def __init__(self):
        pass
    
    @abstractmethod
    def get_action(self, state):
        pass
    
    @abstractmethod
    def win(self):
        pass
    
    @abstractmethod
    def loss(self):
        pass

In [7]:
class JupyterHumanAgent(Agent):
    def __init__(self):
        pass
    
    def get_action(self, state):
        return None
    
    def win(self, state):
        pass
    
    def loss(self, state):
        pass
    
    def draw(self, state):
        pass

In [8]:
@dataclass
class Player:
    mark: Mark
    agent: Agent

In [9]:
@dataclass
class GameState:
    board: np.ndarray
    current_player: Mark
    is_game_end: bool
    winner: Mark

In [10]:
class Game:
    def __init__(self, player1, player2, m=None, n=None, k=5, view=None, play=True):
        self.width = m
        self.height = n
        self.row = k # win row
       
        width = m
        height = n
        
        auto_first_turn = False
        if self.width is None and self.height is None:
            # infinte board
            # let first turn will be on center of the board
            # and accessible field will be doubled row
            auto_first_turn = True

        if self.width is None:
            width = self.row * 2 + 1
        
        if self.height is None:
            height = self.row * 2 + 1

        self.board = np.full((height, width), Mark.NO.value)
        
        self.player1 = Player(Mark.X, player1)
        self.player2 = Player(Mark.O, player2)

        
        if auto_first_turn:
            self.board[self.row, self.row] = self.player1.mark.value
            self.current_player = self.player2
        else:
            self.current_player = self.player1

        self.view = view
        if self.view is not None:
            self.view.turn_callback = self.apply_action
            
        if play:
            self.start()
            
    def apply_action(self, row, column):
        if self.board[row][column] != Mark.NO.value:
            raise RuntimeError('Cell already used')
        self.board[row][column] = self.current_player.mark.value
        
        # check for win
        self.winner = self.check_win()
        
        if self.winner != Mark.NO:
            self.end_game = True
            # obviously, the winner is current player
            self.winner = self.current_player.mark

            # distribution of elephants
            if self.current_player == self.player1:
                self.player1.agent.win(self.new_state())
                self.player2.agent.loss(self.new_state())
            elif self.current_player == self.player2:
                self.player1.agent.loss(self.new_state())
                self.player2.agent.win(self.new_state())
        
        # check for draw
        elif len(self.board[self.board != Mark.NO.value]) == 0:
            self.end_game = True
            self.player1.agent.draw(self.new_state())
            self.player2.agent.draw(self.new_state())
        else:
        # extend board
            if self.width is None or self.height is None:
                self.extend_board()
        
            if self.current_player == self.player1:
                self.current_player = self.player2
            elif self.current_player == self.player2:
                self.current_player = self.player1
            else:
                raise RuntimeError('Unknown player')

        self.state = self.new_state()
        self.update_view()
        
        if not self.end_game:
            self.get_action()
            
        
    def get_action(self):
        action = self.current_player.agent.get_action(self.state)
        if action is None:
            return # wait callback from UI
        else:
            row, column = action
            self.apply_action(row, column)

    def start(self):
        self.winner = Mark.NO
        self.end_game = False

        self.state = self.new_state()
        self.update_view()
        
        self.get_action()
        
    def new_state(self):
        state = GameState(self.board.copy(), self.current_player.mark, self.end_game, self.winner)
        return state

    def update_view(self):
        if self.view is not None:
            self.view.update(self.state)
            
    def check_win(self):
        h, w = self.board.shape
        
        # check rows
        for r in range(h):
            player = Mark.NO.value
            count = 0
            for c in range(w):
                player, count = self.check_cell(self.board[r, c], player, count)
                if count == self.row:
                    return player
                
                    
        # check columns
        for c in range(w):
            player = Mark.NO.value
            count = 0
            for r in range(h):
                player, count = self.check_cell(self.board[r, c], player, count)
                if count == self.row:
                    return player
        
        #check diagonal left to right
        #top of the board
        for cs in range(w - (self.row - 1)):
            c = cs
            player = Mark.NO.value
            count = 0
            r = 0
            while True:
                player, count = self.check_cell(self.board[r, c], player, count)
                if count == self.row:
                    return player
                r += 1
                c += 1
                if r == h or c == w:
                    break
                
        # bottom of the board
        for rs in range(1, h - (self.row - 1)):
            r = rs
            player = Mark.NO.value
            count = 0
            c = 0
            while True:
                player, count = self.check_cell(self.board[r, c], player, count)
                if count == self.row:
                    return player
                r += 1
                c += 1
                if r == h or c == w:
                    break
                    
        #check diagonal right to left
        #top of the board
        for cs in range(self.row - 1, w):
            c = cs
            player = Mark.NO.value
            count = 0
            r = 0
            while True:
                player, count = self.check_cell(self.board[r, c], player, count)
                if count == self.row:
                    return player
                c -= 1
                r += 1
                if c < 0 or r == h:
                    break
        
        # bottom of the board
        for rs in range(1, h - self.row + 1):
            r = rs
            player = Mark.NO.value
            count = 0
            c = w - 1
            while True:
                player, count = self.check_cell(self.board[r, c], player, count)
                if count == self.row:
                    return player
                c -= 1
                r += 1
                if c < 0 or r == h:
                    break
        
        return Mark.NO

    def check_cell(self, cell, player, count):
        if cell == Mark.NO.value:
            player = Mark.NO.value
            count = 0
        elif cell == player:
            count += 1
        else:
            player = cell
            count = 1

        return (player, count)

    def extend_board(self):
        top_rows = 0
        for i, r in enumerate(self.board):
            if r.sum() > 0:
                top_rows = i
                break
                
        bottom_rows = 0
        for i, r in enumerate(reversed(self.board)):
            if r.sum() > 0:
                bottom_rows = i
                break
                
        left_cols = 0
        for i, c in enumerate(self.board.T):
            if c.sum() > 0:
                left_cols = i
                break
                
        right_cols = 0
        for i, c in enumerate(reversed(self.board.T)):
            if c.sum() > 0:
                right_cols = i
                break
                
        add_top = self.row - top_rows
        add_bottom = self.row - bottom_rows
        add_left = self.row - left_cols
        add_right = self.row - right_cols
        
        top = np.zeros((add_top, self.board.shape[1]))
        bottom = np.zeros((add_bottom, self.board.shape[1]))
        
        left = np.zeros((self.board.shape[0] + add_top + add_bottom, add_left))
        right = np.zeros((self.board.shape[0] + add_top + add_bottom, add_right))
        
        self.board = np.c_[left, np.r_[top, self.board, bottom], right]

In [11]:
g = Game(player1=JupyterHumanAgent(), player2=JupyterHumanAgent(), view=JupyterView())
g.view.out

Output()

In [13]:
class Chooser(ABC):
    @abstractmethod
    def __init__(self):
        pass
    
    @abstractmethod
    def choose_action(self, state, actions):
        pass

In [14]:
class EpsilonGreedyChooser(Chooser):
    def __init__(self, epsilon, seed=None):
        self.seed = seed
        if self.seed is not None:
            random.seed(seed)
            self.random_state = random.getstate()
        
        self.epsilon = epsilon
    
    def choose_action(self, state, actions):
        """
        actions is list of (action, weight),
        """
            
        if self.epsilon > random.random():
            # exploration
            if self.seed is not None:
                random.setstate(self.random_state)
                
            action = random.choice(actions)
            
            if self.seed is not None:
                self.random_state = random.getstate()
        else:
            # exploitation
            max_weight = np.max([weight for action, weight in actions])
            action = random.choice([(action, weight) for action, weight in actions if weight == max_weight])
        
        return action

In [18]:
class QLearningApproximationAgent(Agent):
    def __init__(self, alfa, gamma, chooser, row=5, learning=True, weights=None, inf_field=True):
        self.alfa = alfa
        self.gamma = gamma
        self.chooser = chooser
        self.is_learning = learning
        self.row = row
        self.prev_state = None
        self.prev_action = None
        self.inf_field = inf_field
        
        
        self.features_count = ((((row - 2) * 2) + 1) * 2) - 1
        # for example, current player is X, and features will be count of each:
        # XXXXX, XXXX , OXXXX , XXX  , OXXX  , XX   , OXX   ,
        # OOOO , XOOOO , OOO  , XOOO  , OO   , XOO   .

        if weights is not None:
            self.weights = weights
        else:
            self.weights = np.zeros(self.features_count)
    
    def get_action(self, state):
        possible_actions = self.get_possible_actions(state)
        features = self.get_actions_features(state, possible_actions)
        
        if self.prev_state is not None and self.is_learning:
            revard = self.get_reward(self.prev_state, self.prev_action)
            self.td_learn(self.prev_state, self.prev_action, features, reward)
        
        weighted_actions = self.weigh_action(features)
            
        action = self.chooser.choose_action(state, weighted_actions)
        
        self.prev_state = state
        self.prev_action = action[0]
        
        return action[0]
    
    def get_actions_features(self, state, actions):
        features = []
        for action in possible_actions:
            new_state = copy.deepcopy(state)
            new_state.board[action[0], action[1]] = new_state.current_player.value
            features.append(action, self.get_features(new_state))
        return features
        
    
    
    def get_reward(self, prev_state, prev_action):
        if inf_field:
            # if board is infinite, real board will grow to infinity too
            # so, we need, that the agent move marks to center
            h, w = prev_state.board.shape
            v = np.where(prev_state.board != 0)
            center = np.array([np.mean(v[0]), np.mean(v[1])])
            d = np.linalg.norm(np.array([prev_action[0], prev_action[1]]) - center)
            if d < 10:
                return -1
            else:
                return np.floor(10 - d)
        else:
            return -1
    
    def get_possible_actions(self, state):
        f = np.where(state.board == Mark.NO.value)
        possible_actions = list(zip(f[0], f[1]))
        return possible_actions
    
    def get_features(self, state):
        directions = [(0, 1), (1, 1), (1, 0), (1, -1)]
        b = state.board
        h, w = b.shape
        #print(f'h, w: {h}, {w}')
        features = {}
        for i in range(2, self.row):
            features[(1, i, True)] = 0
            features[(2, i, True)] = 0
            features[(1, i, False)] = 0
            features[(2, i, False)] = 0
        features[(1, self.row, False)] = 0
        #rows = []
        v = np.zeros((h, w, len(directions)))
        
        for r in range(h):
            for c in range(w):
                #print(f'r, c, val: {r}, {c}, {b[r, c]}')
                if b[r, c] == Mark.NO.value: # empty cell, next
                    continue
                for d_i, (d_r, d_c) in enumerate(directions):
                    #print(f'direction: {(d_r, d_c, d_i)}')
                    if v[r, c, d_i] == 1: # we already count this cell in this direction
                        #print('already count')
                        continue
                    m = b[r, c] # memorize mark
                    #row = []
                    #row.append((r, c))
                    s = 1
                    cnt = 1
                    enemy_behind = False
                    
                    # look forward
                    nr = r
                    nc = c
                    steal = True
                    for _ in range(self.row):
                        nr += d_r
                        nc += d_c
                        if nr < 0 or nc < 0 or nr == h or nc == w: # beware of borders
                            #print('borders')
                            break
                            
                        #if v[nr, nc, d_i] == 1: # perhaps, it is impossible
                        #    break
                        
                        nm = b[nr, nc]
                        #print(f'cell: {(nr, nc)}, {nm}')
                        #row.append((nr, nc))
                        if nm == m: # same mark
                            cnt += 1
                            v[nr, nc, d_i] = 1
                            steal = True
                        elif nm != Mark.NO.value: # another mark
                            if steal:
                                enemy_behind = True
                            break
                        else:
                            steal = False # empty cell
                        s += 1
                        if s == self.row:
                            break
                    
                    #look backward:
                    nr = r
                    nc = c
                    for i in range(self.row):
                        nr -= d_r
                        nc -= d_c
                        if nr < 0 or nc < 0 or nr == h or nc == w: # beware of borders
                            break
                            
                        nm = b[nr, nc]
                        #row.append((nr, nc))
                        if nm == m: # same mark, it can't be
                            cnt += 1
                            v[nr, nc, d_i] = 1
                        elif nm != Mark.NO.value: # another mark
                            if i == 0:
                                enemy_behind = True
                            break
                        s += 1
                        if s >= self.row:
                            break
                        
                    if s < self.row: # between enemies
                        continue
                    
                    if cnt > 1:
                        if m == state.current_player.value:
                            #rows.append((row, (1, cnt, enemy_behind)))
                            features[(1, cnt, enemy_behind)] += 1
                        else:
                            #rows.append((row, (2, cnt, enemy_behind)))
                            features[(2, cnt, enemy_behind)] += 1
        #print(rows)
        return list(features.values())
    
    def weigh_actions(self, features):
        weighted_actions = []
        for action, feat in features:
            weighted_actions.append((action, self.get_Q_value(feat)))
            
        return weighted_actions
    
    def get_Q_value(self, features):
        q_value = np.dot(self.weights, features)
        return q_value
    
    def td_learn(self, prev_state, prev_action, features, reward):
        weighted_actions = self.weigh_actions(features)
        
        max_weight = np.max([weight for action, weight in weighted_actions])
        action = random.choice([(action, weight) for action, weight in actions if weight == max_weight])
        q_value = action[1]
        expected = reward + self.gamma * q_value
        
        prev_state.board[prev_action[0], prev_action[1]] = prev_state.current_player.value
        prev_feat = self.get_features(prev_state)
        current = self.get_Q_value(prev_feat)
        
        td = expected - current
        
        for i in range(self.features_count):
            self.weights[i] += self.alfa * td * prev_feat[i]
            
    def mc_learn(self, prev_state, prev_action, reward):
        prev_state.board[prev_action[0], prev_action[1]] = prev_state.current_player.value
        prev_feat = self.get_features(prev_state)
    
        q_value = self.get_Q_value(prev_feat)
        for i in range(self.features_count):
            self.weights[i] += self.alfa * (reward - q_value) * prev_feat[i]
        
    
    def loss(self, state):
        if self.is_learning:
            self.mc_learn(self.prev_state, self.prev_action, -100)
        
    
    def win(self, state):
        if self.is_learning:
            self.mc_learn(self.prev_state, self.prev_action, 100)
        
    
    def draw(self, state):
        if self.is_learning:
            self.mc_learn(self.prev_state, self.prev_action, -5)


In [16]:
max_row = 5
num_feat = ((((max_row - 2) * 2) + 1) * 2) - 1
weights = np.zeros(num_feat)
agent1 = QLearningApproximationAgent(0.5, 0.5, EpsilonGreedyChooser(0.05), weights=weights)
agent2 = QLearningApproximationAgent(0.5, 0.5, EpsilonGreedyChooser(0.05), weights=weights)
g = Game(player1=agent1, player2=agent2, view=JupyterView())
g.view.out

NameError: name 'QLearningApproximationAgent' is not defined

In [20]:
max_row = 5
num_feat = ((((max_row - 2) * 2) + 1) * 2) - 1
weights = np.zeros(num_feat)
agent1 = QLearningApproximationAgent(0.5, 0.5, EpsilonGreedyChooser(0.05), weights=weights)
agent2 = QLearningApproximationAgent(0.5, 0.5, EpsilonGreedyChooser(0.05), weights=weights)
g = Game(player1=agent1, player2=JupyterHumanAgent(), view=JupyterView())
g.view.out

Output()

In [None]:
q = QLearningApproximationAgent(0.5, 0.5, EpsilonGreedyChooser(0.05))

In [None]:
g.state

In [None]:
q.get_features(g.state)

In [None]:
q.get_action(g.state)

In [None]:
v = np.where(g.state.board != 0)
v

In [None]:
center = np.array([np.mean(v[0]), np.mean(v[1])])
center

In [None]:
h, w = g.state.board.shape
h, w

In [None]:
corners = [[0, 0], [0, w-1], [h-1, w-1], [h-1, 0]]

In [None]:
for c in corners:
    print(c)
    print(np.linalg.norm(c - center))

In [None]:
max_dist = np.max([np.linalg.norm(c - center) for c in corners])
max_dist

In [None]:
reward = np.zeros((h, w))
for r in range(h):
    for c in range(w):
        d = np.linalg.norm(np.array([r, c]) - center)
        if d < 10:
            rew = -1
        else:
            rew = np.floor(10 - d)
        reward[r, c] = rew
reward

In [None]:
np.mean([3.0,5.0])

### TODO:
* UI переделать на jp_proxy_widget
* Выделить апроксимирующую модель в отдельный класс