# RL and Advanced DL. Homework #2
<br>

Task text:
<br>

https://docs.google.com/document/d/1laNIbABgIdjLiwHbd0sl0l_A4qGZcuQaOlQNrza82bQ
<br>

In [5]:
import gym

import math
import random
from datetime import datetime
# import pickle
# import multiprocessing

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
from torch.autograd import Variable

sns.set_style("whitegrid")
sns.set_palette("colorblind")
palette = sns.color_palette()
figsize = (17, 8)
legend_fontsize = 16

We start with definition of environment for Tic-tac-toe (with arbitrary biard size and number of elements in line to win).

In [None]:
N_ROWS, N_COLS, N_WIN = 3, 3, 3

class TicTacToe(gym.Env):
    def __init__(self, n_rows=N_ROWS, n_cols=N_COLS, n_win=N_WIN, clone=None):
        if clone is not None:
            self.n_rows, self.n_cols, self.n_win = clone.n_rows, clone.n_cols, clone.n_win
            self.board = copy.deepcopy(clone.board)
            self.curTurn = clone.curTurn
            self.emptySpaces = None
            self.boardHash = None
        else:
            self.n_rows = n_rows
            self.n_cols = n_cols
            self.n_win = n_win

            self.reset()

    def getEmptySpaces(self):
        if self.emptySpaces is None:
            res = np.where(self.board == 0)
            self.emptySpaces = np.array([ (i, j) for i,j in zip(res[0], res[1]) ])
        return self.emptySpaces

    def makeMove(self, player, i, j):
        self.board[i, j] = player
        self.emptySpaces = None
        self.boardHash = None

    def getHash(self):
        if self.boardHash is None:
            self.boardHash = ''.join(['%s' % (x+1) for x in self.board.reshape(self.n_rows * self.n_cols)])
        return self.boardHash

    def isTerminal(self):
        # проверим, не закончилась ли игра
        cur_marks, cur_p = np.where(self.board == self.curTurn), self.curTurn
        for i,j in zip(cur_marks[0], cur_marks[1]):
            win = False
            if i <= self.n_rows - self.n_win:
                if np.all(self.board[i:i+self.n_win, j] == cur_p):
                    win = True
            if not win:
                if j <= self.n_cols - self.n_win:
                    if np.all(self.board[i,j:j+self.n_win] == cur_p):
                        win = True
            if not win:
                if i <= self.n_rows - self.n_win and j <= self.n_cols - self.n_win:
                    if np.all(np.array([ self.board[i+k,j+k] == cur_p for k in range(self.n_win) ])):
                        win = True
            if not win:
                if i <= self.n_rows - self.n_win and j >= self.n_win-1:
                    if np.all(np.array([ self.board[i+k,j-k] == cur_p for k in range(self.n_win) ])):
                        win = True
            if win:
                self.gameOver = True
                return self.curTurn

        if len(self.getEmptySpaces()) == 0:
            self.gameOver = True
            return 0

        self.gameOver = False
        return None

    def printBoard(self):
        for i in range(0, self.n_rows):
            print('----'*(self.n_cols)+'-')
            out = '| '
            for j in range(0, self.n_cols):
                if self.board[i, j] == 1:
                    token = 'x'
                if self.board[i, j] == -1:
                    token = 'o'
                if self.board[i, j] == 0:
                    token = ' '
                out += token + ' | '
            print(out)
        print('----'*(self.n_cols)+'-')

    def getState(self):
        return (self.getHash(), self.getEmptySpaces(), self.curTurn)

    def action_from_int(self, action_int):
        return ( int(action_int / self.n_cols), int(action_int % self.n_cols))

    def int_from_action(self, action):
        return action[0] * self.n_cols + action[1]
    
    def step(self, action):
        if self.board[action[0], action[1]] != 0:
            return self.getState(), -10, True, {}
        self.makeMove(self.curTurn, action[0], action[1])
        reward = self.isTerminal()
        self.curTurn = -self.curTurn
        return self.getState(), 0 if reward is None else reward, reward is not None, {}

    def reset(self):
        self.board = np.zeros((self.n_rows, self.n_cols), dtype=int)
        self.boardHash = None
        self.gameOver = False
        self.emptySpaces = None
        self.curTurn = 1

In [None]:
def plot_board(env, pi, showtext=True, verbose=True, fontq=20, fontx=60):
    '''Рисуем доску с оценками из стратегии pi'''
    fig, ax = plt.subplots(1, 1, figsize=(10, 10))
    X, Y = np.meshgrid(np.arange(0, env.n_rows), np.arange(0, env.n_rows))
    Z = np.zeros((env.n_rows, env.n_cols)) + .01
    s, actions = env.getHash(), env.getEmptySpaces()
    if pi is not None and s in pi.Q:
        for i, a in enumerate(actions):
            Z[a[0], a[1]] = pi.Q[s][i]
    ax.set_xticks([])
    ax.set_yticks([])
    surf = ax.imshow(Z, cmap=plt.get_cmap('Accent', 10), vmin=-1, vmax=1)
    if showtext:
        for i,a in enumerate(actions):
            if pi is not None and s in pi.Q:
                ax.text( a[1] , a[0] , "%.3f" % pi.Q[s][i], fontsize=fontq, horizontalalignment='center', verticalalignment='center', color="w" )
    for i in range(env.n_rows):
        for j in range(env.n_cols):
            if env.board[i, j] == -1:
                ax.text(j, i, "O", fontsize=fontx, horizontalalignment='center', verticalalignment='center', color="w" )
            if env.board[i, j] == 1:
                ax.text(j, i, "X", fontsize=fontx, horizontalalignment='center', verticalalignment='center', color="w" )
#     cbar = plt.colorbar(surf, ticks=[0, 1])
    ax.grid(False)
    plt.show()

def get_and_print_move(env, pi, s, actions, random=False, verbose=True, fontq=20, fontx=60):
    '''Делаем ход, рисуем доску'''
    plot_board(env, pi, fontq=fontq, fontx=fontx)
    if verbose and (pi is not None):
        if s in pi.Q:
            for i,a in enumerate(actions):
                print(i, a, pi.Q[s][i])
        else:
            print("Стратегия не знает, что делать...")
    if random:
        return np.random.randint(len(actions))
    else:
        return pi.getActionGreedy(s, len(actions))

def plot_test_game(env, pi1, pi2, random_crosses=False, random_naughts=True, verbose=True, fontq=20, fontx=60):
    '''Играем тестовую партию между стратегиями или со случайными ходами, рисуем ход игры'''
    done = False
    env.reset()
    while not done:
        s, actions = env.getHash(), env.getEmptySpaces()
        if env.curTurn == 1:
            a = get_and_print_move(env, pi1, s, actions, random=random_crosses, verbose=verbose, fontq=fontq, fontx=fontx)
        else:
            a = get_and_print_move(env, pi2, s, actions, random=random_naughts, verbose=verbose, fontq=fontq, fontx=fontx)
        observation, reward, done, info = env.step(actions[a])
        if reward == 1:
            print("Крестики выиграли!")
            plot_board(env, None, showtext=False, fontq=fontq, fontx=fontx)
        if reward == -1:
            print("Нолики выиграли!")
            plot_board(env, None, showtext=False, fontq=fontq, fontx=fontx)

# I. Q-learning

The first method to try is classic table Q-learning.

The idea is to learn simultaneously both Q-functions for crosses and noughts. The algorithm is:
1. initialize Q1 and Q2 (for crosses and noughts respectively)
2. for given number of episodes run Q-learning for both players (as far as players act one after another, we update Q-functions in turns)

*Note.* Everywhere after in code I use following notation:
- `s` - current state
- `a` - current action
- `r` - reward
- `s_` - next state
- `a_` - next action

In [None]:
MAX_CARD_SCORE = 11
N_ACTIONS = 2


def init_Q():
    Q = {}
    for player_sum in range(MAX_PLAYER_SUM):
        for dealer_card in range(MAX_CARD_SCORE):
            for usable_ace in range(2):
                Q[(player_sum, dealer_card, usable_ace)] = np.random.randn(N_ACTIONS) if val is None else val * np.ones(N_ACTIONS)
    return Q


def compute_policy_by_Q(Q):
    pi = {}
    for s in Q:
        pi[s] = np.argmax(Q[s])
    return pi


def Q_learning_episode(env, pi, Q, alpha=0.05, temperature=0.0, gamma=0.9):
    s = env.reset()
    a = pi[s] if np.random.rand() > temperature else np.random.randint(N_ACTIONS)
    finished = False
    while not finished:
        s_, r, finished, _ = env.step(a)
        a_ = pi[s_] if np.random.rand() > temperature else np.random.randint(N_ACTIONS)
        Q[s][a] = Q[s][a] + alpha * (r + gamma * np.max(Q[s_]) - Q[s][a])
        s, a = s_, a_
    return Q, r

def Q_learning(env, n_episodes, alpha=0.1, temperature=0.0, gamma=0.95, verbose=False):
    sum_reward = 0
    mean_rewards = []
    Q = init_Q()
    pi = compute_policy_by_Q(Q)
    for cur_episode in tqdm(range(n_episodes), disable=not verbose):
        Q, r = Q_learning_episode(env, pi, Q, alpha=alpha, temperature=temperature, gamma=gamma)
        pi = compute_policy_by_Q(Q)
        sum_reward += r
        mean_rewards.append(sum_reward / (cur_episode + 1))
    return pi, Q, mean_rewards

In [None]:
def plot_durations(xs, labels):
    plt.figure(figsize=(10, 5))
    plt.xlabel('Episode number')
    plt.ylabel('Number of steps')
    for i,x in enumerate(xs):
        plt.plot(x, label=labels[i])
    plt.legend(loc="upper left")

# def plot_alg(array, alg_name, cut_first=10):
#     plt.figure(figsize=(18,8))
#     plt.title(f"{alg_name}: Mean reward for first N episodes", fontsize=20)
#     plt.xlabel("Episodes", fontsize=18)
#     plt.ylabel("Mean reward", fontsize=18)
#     plt.plot(array[cut_first:]);

# def plot_by_param(reward_arrays, params, param_name, alg_name, cut_first=10):
#     plt.figure(figsize=(18,9))
#     plt.title(f"{alg_name}: Mean reward for first N episodes", fontsize=20)
#     plt.xlabel("Episodes", fontsize=18)
#     plt.ylabel("Mean reward", fontsize=18)
#     assert len(reward_arrays) == len(params)
#     for array, param in zip(reward_arrays, params):
#         plt.plot(array[cut_first:], label=f"{param_name} = {param}")
#     plt.legend(fontsize=20);

# II. DQN

In [2]:
class ReplayMemory():
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def store(self, exptuple):
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = exptuple
        self.position = (self.position + 1) % self.capacity
       
    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)
    
    def __len__(self):
        return len(self.memory)

In [3]:
class Network(nn.Module):
    def __init__(self, layer_size=256):
        nn.Module.__init__(self)
        self.l1 = nn.Linear(4, layer_size)
        self.l2 = nn.Linear(layer_size, 2)

    def forward(self, x):
        x = F.relu(self.l1(x))
        x = self.l2(x)
        return x

In [None]:
class ConvNet(nn.Module):
    def __init__(self, n_channels=100, n_board=3):
        super().__init__()
        self.n_board = n_board
        self.conv = nn.Conv2d(in_channels=1, out_channels=n_channels, kernel_size=3, padding=0)
        self.fc = nn.Linear(in_features=n_channels, out_features=n_board*n_board)
    
    def forward(self, x):
        # (bs, n_board, n_board)
        x = self.conv(x.unsqueeze(1))
        x = torch.amax(x, dim=(2, 3))
        x = self.fc(x)
        return x

In [5]:
class TictactoeDQN():
    def __init__(self):
        self.env = gym.make('CartPole-v0')
        self.model = Network()
        self.memory = ReplayMemory(10000)
        self.optimizer = optim.Adam(self.model.parameters(), 0.001)
        self.steps_done = 0
        self.episode_durations = []
        
        self.gamma = 0.8
        self.batch_size = 64
        
        self.eps_init, self.eps_final, self.eps_decay = 0.9, 0.05, 200
        self.num_step = 0

    def select_greedy_action(self, state):
        return self.model(state).data.max(1)[1].view(1, 1)

    def select_action(self, state):
        sample = random.random()
        self.num_step += 1
        eps_threshold = self.eps_final + (self.eps_init - self.eps_final) * math.exp(-1. * self.num_step / self.eps_decay)
        if sample > eps_threshold:
            return self.select_greedy_action(state)
        else:
            return torch.tensor([[random.randrange(2)]], dtype=torch.int64)
        
    def run_episode(self, e=0, do_learning=True, greedy=False, render=False):
        state, num_step = self.env.reset(), 0
        while True:
            if render:
                self.env.render()

            state_tensor = torch.tensor([state], dtype=torch.float32)
            with torch.no_grad():
                if greedy:
                    action = self.select_greedy_action(state_tensor)
                else:
                    action = self.select_action(state_tensor)
            next_state, reward, done, _ = self.env.step(action.numpy()[0][0])
            next_state_tensor = torch.tensor([next_state], dtype=torch.float32)

            if done:
                reward = -1

            transition = (state_tensor, action, next_state_tensor, torch.tensor([reward], dtype=torch.float32))
            self.memory.store(transition)

            if do_learning:
                self.learn()

            state = next_state
            num_step += 1

            if done:
                print("\tepisode %d finished after %d steps" % (e, num_step))
                self.episode_durations.append(num_step)
                break

    def learn(self):
        if len(self.memory) < self.batch_size:
            return

        # берём мини-батч из памяти
        transitions = self.memory.sample(self.batch_size)
        batch_state, batch_action, batch_next_state, batch_reward = zip(*transitions)

        batch_state = Variable(torch.cat(batch_state))
        batch_action = Variable(torch.cat(batch_action))
        batch_reward = Variable(torch.cat(batch_reward))
        batch_next_state = Variable(torch.cat(batch_next_state))

        # считаем значения функции Q
        Q = self.model(batch_state).gather(1, batch_action).reshape([self.batch_size])

        # оцениваем ожидаемые значения после этого действия
        Qmax = self.model(batch_next_state).detach().max(1)[0]
        Qnext = batch_reward + (self.gamma * Qmax)

        # и хотим, чтобы Q было похоже на Qnext -- это и есть суть Q-обучения
        loss = F.smooth_l1_loss(Q, Qnext)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

In [7]:
dqn = TictactoeDQN()

print("%s\tStarting training for 300 episodes..." % (datetime.now().time()))
for e in range(300):
    dqn.run_episode(e)
print("%s\t\t...done!" % (datetime.now().time()))

14:05:14.544354	Starting training for 300 episodes...
	episode 0 finished after 11 steps
	episode 1 finished after 17 steps
	episode 2 finished after 42 steps
	episode 3 finished after 17 steps
	episode 4 finished after 15 steps
	episode 5 finished after 13 steps
	episode 6 finished after 12 steps
	episode 7 finished after 14 steps
	episode 8 finished after 12 steps
	episode 9 finished after 10 steps
	episode 10 finished after 9 steps
	episode 11 finished after 16 steps
	episode 12 finished after 10 steps
	episode 13 finished after 11 steps




	episode 14 finished after 12 steps
	episode 15 finished after 11 steps
	episode 16 finished after 11 steps
	episode 17 finished after 20 steps
	episode 18 finished after 9 steps
	episode 19 finished after 9 steps
	episode 20 finished after 12 steps
	episode 21 finished after 23 steps
	episode 22 finished after 64 steps
	episode 23 finished after 12 steps
	episode 24 finished after 26 steps
	episode 25 finished after 21 steps
	episode 26 finished after 24 steps
	episode 27 finished after 23 steps
	episode 28 finished after 65 steps
	episode 29 finished after 24 steps
	episode 30 finished after 43 steps
	episode 31 finished after 34 steps
	episode 32 finished after 55 steps
	episode 33 finished after 87 steps
	episode 34 finished after 41 steps
	episode 35 finished after 42 steps
	episode 36 finished after 145 steps
	episode 37 finished after 42 steps
	episode 38 finished after 46 steps
	episode 39 finished after 27 steps
	episode 40 finished after 59 steps
	episode 41 finished after 59

In [77]:
plot_durations([dqn.episode_durations], ['DQN'])

# III. Planning