In [1]:
import os
import json
import numpy as np

from kaggle_environments import make

import torch as T
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from torch.utils.data import DataLoader, TensorDataset

from sklearn.metrics import f1_score, accuracy_score

device = T.device('cuda' if T.cuda.is_available() else 'cpu')
print(f'Use {device} device')

Use cuda device


In [101]:
CACHE_DIR = '.'
TRAP_STATE = np.ones((3, 3), dtype=np.int)

In [3]:
class QMemory:
    def __init__(self, max_size, dims):
        self.count = 0
        self.max_size = max_size
        
        self.dones = np.zeros(max_size)
        self.actions = np.zeros(max_size)
        self.rewards = np.zeros(max_size)

        self.prev_states = np.zeros((max_size, *dims))
        self.next_states = np.zeros((max_size, *dims))
    
    def add(self, prev_state, action, next_state, reward, done):
        k = self.count % self.max_size
        self.count += 1
        
        self.dones[k] = done
        self.rewards[k] = reward
        self.actions[k] = action
        self.prev_states[k] = prev_state
        self.next_states[k] = next_state
            
    def sample(self, batch_size):
        siz = min(self.count, batch_size)
        idx = np.random.choice(siz, batch_size, replace=False)
        return self.prev_states[idx], self.actions[idx], self.next_states[idx], self.rewards[idx], self.dones[idx]  
    
class QEpsilon:
    def __init__(self, val=1, min=1e-4, dec=1e-5):
        self.val = val
        self.min = min
        self.dec = dec

    def decrease(self):
        self.val = max(self.val - self.dec, self.min)
        
    @property
    def value(self):
        return self.val
    

In [194]:
class QAgent:
    def __init__(self, dims=(3,3), n_actions=9, gamma=0.99, batch_size=64, mem_size=512, device=device, replace_rate=1000, eps=None):

        if eps is None:
            eps = QEpsilon(1, 1e-3, 1e-4)
        elif eps == 0:
            eps = QEpsilon(0, 0, 0)            
            
        self.device = device

        self.memory = QMemory(mem_size, dims)
        self.q_eval = self.create_q(dims, n_actions, device, True)
        self.q_next = self.create_q(dims, n_actions, device, False)
        self.action = np.arange(n_actions)

        self.gamma = gamma
        self.eps = eps
        self.batch_size = batch_size
        
        self.loss = nn.MSELoss()
        self.optimizer = optim.Adam(self.q_eval.parameters())
        
        self.batch_count = 0
        self.replace_rate = replace_rate
    
    @staticmethod
    def create_q(dims, n_actions, device, is_eval):
        q = nn.Sequential(
            nn.Conv2d(1, 64, 2),
            nn.Conv2d(64, 256, 2),
            nn.Flatten(),
            nn.Tanh(),
            nn.Linear(256, n_actions),
        ).to(device)
        if is_eval:
            q.eval()
        else:
            q.train()
        return q
    
    def save(self):
        T.save(self.q_eval.state_dict(), f'{CACHE_DIR}/eval.q')
        T.save(self.q_next.state_dict(), f'{CACHE_DIR}/next.q')
        
    def load(self):
        self.q_eval.load_state_dict(T.load(f'{CACHE_DIR}/eval.q'))
        self.q_next.load_state_dict(T.load(f'{CACHE_DIR}/next.q'))
        
    def tensor(self, array, dtype=T.float):
        return T.tensor(array, dtype=dtype, device=self.device)
    
    def __call__(self, board):
        if np.random.rand() <= self.eps.value:            
            action = np.random.choice(self.action)
        else:
            action = self.q_eval(self.tensor(board.reshape(1, 1, 3, 3))).argmax().item()
        return int(action)
    
    def learn(self, prev_state, action, next_state, reward, done):
        
        self.memory.add(prev_state, action, next_state, reward, done)
        if self.memory.count < self.batch_size:
            return

        
        self.batch_count += 1
        if self.batch_count >= self.replace_rate:
            self.batch_count = 0
            self.q_next.load_state_dict(self.q_eval.state_dict())
            
        prev_states, actions, next_states, rewards, dones = self.memory.sample(self.batch_size)
        
        batch_idx = np.arange(self.batch_size)
        q_pred = self.q_eval(self.tensor(prev_states.reshape(-1, 1, 3, 3)))[batch_idx, actions]
        q_next = self.q_next(self.tensor(next_states.reshape(-1, 1, 3, 3))).max(dim=1)[0]
        
        q_next[dones] = 0.0
        q_targ = self.tensor(rewards) + self.gamma * q_next
        
        loss = self.loss(q_pred.flatten(), q_targ.flatten())
        
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        self.eps.decrease()


In [196]:
class QTrainer:
    def __init__(self, env, other="random"):
        self.trainer = env.train([None, other])

    @staticmethod
    def to_state(obs):
        board = np.array(obs['board']).reshape(3,3)
        board[board == 2] = -1
        return board
    
    def reset(self):
        return self.to_state(self.trainer.reset())

    def step(self, action):
        state, reward, done, info = self.trainer.step(int(action))
        state = self.to_state(state)
        if done:
            state = TRAP_STATE
        return state, reward, done

In [None]:
n_games = 500000

env = make('tictactoe')
trainer = QTrainer(env)
agent = QAgent(batch_size=64, mem_size=1024, replace_rate=200)

wins, lost, teko, invs = 0, 0, 0, 0

for game_no in range(n_games):
    done = False
    prev_state = trainer.reset()
    while not done: 
        action = agent(prev_state)
        next_state, reward, done = trainer.step(action)
        
        if reward == 1:
            wins += 1
        elif reward == -1:
            lost += 1
        elif reward == 0:
            teko += 1
        else:
            invs += 1
            reward = -1

        agent.learn(prev_state, action, next_state, reward, done)
        prev_state = next_state

    if game_no % 1000 == 999:
        print(f'Game {game_no + 1}. Wins: {wins}, Loose: {lost}, Teko: {teko}, Invalid Moves: {invs}, Eps: {agent.eps.value}')
        wins, lost, teko, invs = (0, 0, 0, 0)
        

Game 1000. Wins: 26, Loose: 50, Teko: 1997, Invalid Moves: 924, Eps: 0
Game 2000. Wins: 9, Loose: 90, Teko: 2131, Invalid Moves: 900, Eps: 0
Game 3000. Wins: 96, Loose: 70, Teko: 2461, Invalid Moves: 834, Eps: 0
Game 4000. Wins: 398, Loose: 33, Teko: 2208, Invalid Moves: 569, Eps: 0
Game 5000. Wins: 584, Loose: 53, Teko: 2212, Invalid Moves: 363, Eps: 0
Game 6000. Wins: 547, Loose: 40, Teko: 2265, Invalid Moves: 412, Eps: 0
Game 7000. Wins: 379, Loose: 43, Teko: 2066, Invalid Moves: 578, Eps: 0
Game 8000. Wins: 564, Loose: 29, Teko: 2257, Invalid Moves: 407, Eps: 0
Game 9000. Wins: 487, Loose: 42, Teko: 2259, Invalid Moves: 471, Eps: 0
Game 10000. Wins: 566, Loose: 40, Teko: 2304, Invalid Moves: 394, Eps: 0
Game 11000. Wins: 505, Loose: 61, Teko: 2405, Invalid Moves: 434, Eps: 0
Game 12000. Wins: 659, Loose: 20, Teko: 2131, Invalid Moves: 321, Eps: 0
Game 13000. Wins: 571, Loose: 17, Teko: 2166, Invalid Moves: 412, Eps: 0
Game 14000. Wins: 737, Loose: 8, Teko: 2168, Invalid Moves: 255,

In [180]:
class QClassicAgent:
    def __init__(self, alpha=0.1, gamma=0.9):
        self.alpha = alpha
        self.gamma = gamma
        self.Q = np.random.rand(3 ** 9, 9)
        self.eps = QEpsilon(1, 0, 1e-4)

        self.Q[self.to_state(TRAP_STATE), :] = 0
        
    @staticmethod
    def to_state(board):
        return ((3 ** np.arange(9)) * (board.flatten() + 1)).sum()
    
    def __call__(self, board):
        if np.random.rand() < self.eps.value:
            return np.random.randint(9)
        s = self.to_state(board)
        a = self.Q[s, :].argmax()
        return int(a)
    
    def learn(self, prev_state, action, next_state, reward, done):
        a0 = action
        if done:
            next_state = TRAP_STATE
            
        s0 = self.to_state(prev_state)
        s1 = self.to_state(next_state)
        
            
        q_upd = reward + self.gamma * self.Q[s1, :].max()
        
        self.Q[s0, a0] = (1 - self.alpha)*self.Q[s0, a0] + self.alpha*q_upd        
        self.eps.decrease()

In [181]:
n_games = 50000

env = make('tictactoe')
trainer = QTrainer(env)
agent = QClassicAgent()

wins, lost, teko, invs = 0, 0, 0, 0

for game_no in range(n_games):
    done = False
    prev_state = trainer.reset()
    while not done: 
        action = agent(prev_state)
        next_state, reward, done = trainer.step(action)
        
        if reward == 1:
            wins += 1
        elif reward == -1:
            lost += 1
        elif reward == 0:
            teko += 1
        else:
            invs += 1
            reward = -1
#            next_state = np.zeros((3,3)) - 100

        agent.learn(prev_state, action, next_state, reward, done)
        prev_state = next_state

    if game_no % 1000 == 999:
        print(f'Game {game_no + 1}. Wins: {wins}, Loose: {lost}, Teko: {teko}, Invalid Moves: {invs}, Eps: {agent.eps.value}')
        wins, lost, teko, invs = (0, 0, 0, 0)
        

Game 1000. Wins: 91, Loose: 70, Teko: 2275, Invalid Moves: 837, Eps: 0.672700000000036
Game 2000. Wins: 106, Loose: 85, Teko: 2394, Invalid Moves: 807, Eps: 0.3335000000000734
Game 3000. Wins: 160, Loose: 116, Teko: 2696, Invalid Moves: 721, Eps: 0
Game 4000. Wins: 262, Loose: 113, Teko: 2853, Invalid Moves: 614, Eps: 0
Game 5000. Wins: 325, Loose: 106, Teko: 2875, Invalid Moves: 551, Eps: 0
Game 6000. Wins: 422, Loose: 83, Teko: 2960, Invalid Moves: 475, Eps: 0
Game 7000. Wins: 543, Loose: 70, Teko: 2907, Invalid Moves: 375, Eps: 0
Game 8000. Wins: 588, Loose: 48, Teko: 2873, Invalid Moves: 340, Eps: 0
Game 9000. Wins: 675, Loose: 44, Teko: 2887, Invalid Moves: 263, Eps: 0
Game 10000. Wins: 601, Loose: 57, Teko: 2890, Invalid Moves: 321, Eps: 0
Game 11000. Wins: 815, Loose: 21, Teko: 2797, Invalid Moves: 139, Eps: 0
Game 12000. Wins: 882, Loose: 3, Teko: 2763, Invalid Moves: 88, Eps: 0
Game 13000. Wins: 883, Loose: 6, Teko: 2730, Invalid Moves: 76, Eps: 0
Game 14000. Wins: 902, Loose: