In [1]:
! rm -rf Reinforcement_Learning_Othello

In [2]:
! git clone https://github.com/naomiehl/Reinforcement_Learning_Othello

Cloning into 'Reinforcement_Learning_Othello'...
remote: Enumerating objects: 22, done.[K
remote: Counting objects: 100% (22/22), done.[K
remote: Compressing objects: 100% (20/20), done.[K
remote: Total 22 (delta 8), reused 0 (delta 0), pack-reused 0[K
Unpacking objects: 100% (22/22), done.


In [3]:
! cp Reinforcement_Learning_Othello/* .

In [27]:
from environment import OthelloEnv
import torch
from torch import nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
from collections import namedtuple
from itertools import count
import random

In [74]:
env = OthelloEnv(n=8)
env.reset()
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
EPS_START = 0.9
EPS_END = 0.0
EPS_DECAY = 400
BATCH_SIZE = 128
NUM_EPISODES_EVAL = 100
GAMMA = 0.99
LR = 0.05
N_CHANNELS = 3

In [81]:
class DQN(nn.Module):
    def __init__(self, n=8, n_channels=N_CHANNELS):
        super(DQN, self).__init__()
        self.n_channels = n_channels
        self.convo = nn.Sequential(
            nn.Conv2d(n_channels, 4, 3, stride=1, padding=1),
            nn.LeakyReLU(),
            nn.BatchNorm2d(4),
            nn.Conv2d(4, 8, 3, stride=1, padding=1),
            nn.LeakyReLU(),
            nn.BatchNorm2d(8),
            nn.Conv2d(8, 16, 3, stride=1, padding=1),
            nn.LeakyReLU(),
            nn.BatchNorm2d(16),
        )

        self.head = nn.Sequential(
            nn.Linear(n*n*16, n*n),
            nn.LeakyReLU(),
            nn.BatchNorm1d(n*n),
            nn.Linear(n*n, n*n+1),
            nn.Hardtanh()
        )

    def forward(self, states):
        states = nn.functional.one_hot(states + 1, num_classes=self.n_channels)
        states = states.to(torch.float).transpose(2, -1).squeeze(1)

        x = self.convo(states)
        # x = states.to(torch.float)

        x = x.view(x.shape[0], -1)
        x = self.head(x)

        return x

In [82]:
class DQNAgent:
    def __init__(self, env, color, device=device, n_channels=3, lr=LR):
        self.q_model = DQN(env.n, n_channels).to(device)
        self.target_model = DQN(env.n, n_channels).to(device)
        self.update_target_model()
        self.target_model.eval()
        self.optimizer = torch.optim.Adam(self.q_model.parameters(), lr=lr)
        self.buffer = ReplayBuffer(10000)
        self.color = color
        self.steps_done = 0

    def draw_action(self, env, s, epsilon=None):
        self.steps_done += 1
        if epsilon is None:
            epsilon =  EPS_END + (EPS_START - EPS_END) * np.exp(-1. * self.steps_done / EPS_DECAY)

        s *= self.color

        with torch.no_grad():
            values = self.q_model(s).reshape(-1)
        valid_moves = env.get_valid_moves(self.color)
        if len(valid_moves) > 0:
            if np.random.rand() <= 1 - epsilon:
                valid_moves_ind = [env.coord2ind(p) for p in valid_moves]
                action = valid_moves_ind[torch.argmax(values[valid_moves_ind])]
                return env.ind2coord(action), values[action]
            else:
                action = valid_moves[np.random.randint(0, len(valid_moves))]
                return action , values[env.coord2ind(action)]
            
        else:
            return None, values[env.n * env.n]

    def update_target_model(self):
         self.target_model.load_state_dict(self.q_model.state_dict())

class RandomAgent:
    def __init__(self, color):
        self.color = color
    
    def draw_action(self, env, s, epsilon):
        valid_moves = env.get_valid_moves(self.color)
        if len(valid_moves) > 0:
            action = valid_moves[np.random.randint(0, len(valid_moves))]
            return action, 1. / len(valid_moves)
        else:
            return None, 0


class OthelloGame:
    def __init__(self, agent_white, agent_black):
        self.white = agent_white
        self.black = agent_black

    def get_agent(self, color):
        if color == 1:
            return self.white
        else:
            return self.black
    
    def sync(self, color_optimized, color_update):
        """Copy model state of agent color_optimized to agent color_update"""
        self.get_agent(color_update).q_model.load_state_dict(self.get_agent(color_optimized).q_model.state_dict())

In [93]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))

class ReplayBuffer(object):
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        """Saves a transition."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [97]:
def optimize_model(agent, batch_size=BATCH_SIZE, device=device, gamma=GAMMA):
    agent.q_model.train()

    if len(agent.buffer) < batch_size:
        agent.q_model.eval()
        return

    transitions = agent.buffer.sample(batch_size)
    batch = Transition(*zip(*transitions))

    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                            batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])

    reward_batch = torch.tensor(batch.reward, device=device)
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)

    state_action_values = agent.q_model(state_batch).gather(1, action_batch)

    next_state_values = torch.zeros(batch_size, device=device)
    next_state_values[non_final_mask] = agent.target_model(non_final_next_states).max(1)[0].detach()

    expected_state_action_values = next_state_values * gamma + reward_batch

    loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))

    agent.optimizer.zero_grad()
    loss.backward()
    # for param in agent.q_model.parameters():
    #     param.grad.data.clamp_(-1, 1)
    agent.optimizer.step()
    agent.q_model.eval()

def state_numpy_to_tensor(state, device=device):
    state = torch.from_numpy(state.astype(np.int64)).unsqueeze(0).unsqueeze(0)
    return state.to(device)

def train_one_episode(env, game, color, device=device, batch_size=BATCH_SIZE, gamma=GAMMA, epsilon=None):
    game.get_agent(color).q_model.eval()
    game.get_agent(-color).q_model.eval()

    state = env.reset()
    state = state_numpy_to_tensor(state)
    done = False

    while not done:
        # Player plays
        action, value = game.get_agent(env.turn).draw_action(env, state, epsilon)
        if action is not None:
            opp_state, reward, done, info = env.step(action)
            action = torch.tensor([[env.coord2ind(action)]], device=device, dtype=torch.int64)
            opp_state = state_numpy_to_tensor(opp_state)
        else:
            opp_state = state
            env.turn *= -1
            action = torch.tensor([[env.n*env.n]], device=device, dtype=torch.int64)

        if env.turn == color:
        # if next turn belongs to player color, then let him play, no need to update this agent
            state = opp_state
            continue
        
        # Opponent plays
        opp_action, _ = game.get_agent(env.turn).draw_action(env, opp_state, epsilon)
        if opp_action is not None and not done:
            new_state, reward, done, info = env.step(opp_action)
            new_state = state_numpy_to_tensor(new_state)

            if reward * color > 0:
                reward = 1.
            elif reward *color < 0:
                reward = -1.

            reward = torch.tensor([reward], device=device, dtype=torch.float)
        elif done:
            new_state = None
        else:
            new_state = opp_state
            env.turn *= -1
    
        game.get_agent(color).buffer.push(state * color, action, new_state * color if new_state is not None else None, reward)

        optimize_model(game.get_agent(color), batch_size, device, gamma)

        # Next turn
        state = new_state

def score_multi_episode(env, game, color, device=device, num_episodes=NUM_EPISODES, epsilon = .0):
    '''Trained agent plays against an agent with random policy'''
    game.get_agent(color).q_model.eval()
    num_success = 0
    num_cons_success = [0]
    results = []
    score = .0

    for i in range(num_episodes):
        state = env.reset()
        state = state_numpy_to_tensor(state)
        done = False

        while not done:
            if env.turn == color:
                action, value = game.get_agent(color).draw_action(env, state, epsilon)
            else:
                valid_moves = env.get_valid_moves(env.turn)
                if len(valid_moves) > 0:
                    action = valid_moves[np.random.randint(0, len(valid_moves))]
                else:
                    action = None
            if action is not None:
                state, reward, done, info = env.step(action)
                state = state_numpy_to_tensor(state)
            else:
                env.turn *= -1
            
        if env.score() * color > 0:
            num_success += 1
            num_cons_success[-1] += 1
            score += 1.
        else:
            num_cons_success.append(0)
            if env.score() != 0:
                score -= 1.

        results.append(reward)
    return num_success, max(num_cons_success), score, results

In [98]:
np.random.seed(0)
torch.manual_seed(0)
random.seed(0)

game = OthelloGame(DQNAgent(env, 1, lr=LR), DQNAgent(env, -1, lr=LR))
game.sync(1, -1)

In [None]:
color = 1
nb_episodes_per_agent = 1
target_update = 10
print_step = 500

for i in tqdm(range(200001)):
    train_one_episode(env, game, color)

    # if i % 100 == 0:
    #     print(env.render())
    #     print(env.score())

    if i % nb_episodes_per_agent == 0:
        game.sync(color, -color)  # Update model for the other player
        color *= -1
    
    if i % target_update == 0:
        game.get_agent(color).update_target_model()
        game.get_agent(-color).update_target_model()

    if i % print_step == 0:
        num_success, max_cons_success, score, _ = score_multi_episode(env, game, 1)
        print("White ... Episode: {}, Number of wins: {}, Max number of consecutive wins: {}, Total score: {:.1f}".format(i, num_success, max_cons_success, score))
        num_success, max_cons_success, score, _ = score_multi_episode(env, game, -1)
        print("Black ... Episode: {}, Number of wins: {}, Max number of consecutive wins: {}, Total score: {:.1f}".format(i, num_success, max_cons_success, score))

HBox(children=(FloatProgress(value=0.0, max=200001.0), HTML(value='')))

White ... Episode: 0, Number of wins: 74, Max number of consecutive wins: 12, Total score: 48.0
Black ... Episode: 0, Number of wins: 54, Max number of consecutive wins: 6, Total score: 12.0
White ... Episode: 500, Number of wins: 64, Max number of consecutive wins: 5, Total score: 29.0
Black ... Episode: 500, Number of wins: 39, Max number of consecutive wins: 5, Total score: -17.0
White ... Episode: 1000, Number of wins: 62, Max number of consecutive wins: 6, Total score: 29.0
Black ... Episode: 1000, Number of wins: 48, Max number of consecutive wins: 7, Total score: 1.0
White ... Episode: 1500, Number of wins: 65, Max number of consecutive wins: 13, Total score: 31.0
Black ... Episode: 1500, Number of wins: 63, Max number of consecutive wins: 11, Total score: 29.0
White ... Episode: 2000, Number of wins: 57, Max number of consecutive wins: 7, Total score: 17.0
Black ... Episode: 2000, Number of wins: 53, Max number of consecutive wins: 6, Total score: 8.0
White ... Episode: 2500, N

In [None]:
print(env.render())

In [None]:
from ipywidgets import widgets
from IPython.display import display
text = widgets.Text()
display(text)
state = env.reset()
print(env.render())
print(env.get_valid_moves(1))
print(env.score())
state = state_numpy_to_tensor(state)
game.get_agent(-1).q_model.eval()
action, value = game.get_agent(-1).draw_action(env, state, epsilon=.0)
state, reward, done, info = env.step(action)
print(env.render())
print(env.get_valid_moves(1))
print(reward)
print(value)

def handle_submit(sender):
    human_step(env, eval(text.value))

def human_step(env, action):
    if action is not None:
        state, _, done, info = env.step(action)
    else:
        pass
    state = state_numpy_to_tensor(state)
    action, _ = game.get_agent(-1).draw_action(env, state, epsilon=.0)
    state, reward, done, info = env.step(action)
    print(env.render())
    print(env.get_valid_moves(1))
    print(reward)

text.on_submit(handle_submit)

