# Making game setup

In [2]:
import random
import copy
from collections import namedtuple, deque

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import gym

from config_3_4_2 import config_3_4_2

In [3]:
# seeds
SEED = 100
torch.manual_seed(SEED)
np.random.seed(SEED)
random.seed(SEED)

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [4]:
from copy import deepcopy
import numpy as np


class Board:
    def __init__(self, flasks, color, max_length, empty=2):
        self.color = color
        self.max_length = max_length
        self.empty = empty
        self.tubes = self.color + self.empty
        self.flasks = np.full((self.tubes, self.max_length), -1, dtype=np.int16)
        self.flasks[:self.color, :self.max_length] = flasks[:-empty]
        self.idx = np.zeros(self.tubes, dtype=np.uint16)
        self.idx[:self.color] = self.max_length
        self.actions = []
        self.states = set()

    def __str__(self):
        return str(self.flasks)

    def __repr__(self):
        return str(self.flasks)

    def __iter__(self):
        return iter(self.flasks)

    def __len__(self):
        return len(self.flasks)

    def __getitem__(self, item):
        return self.flasks[item]

    def grid_to_state(self):
        state = []
        for r in self:
            state.append([])
            for c in r:
                if c != -1:
                    state[-1].append(c)
        return state

    def state_to_grid(self, state):
        self.flasks.fill(-1)
        self.idx.fill(0)
        for i, r in enumerate(state):
            self.idx[i] = len(r)
            for j, c in enumerate(r):
                self.flasks[i][j] = c

    def is_flask_full(self, i):
        return self.idx[i] == self.max_length

    def is_flask_empty(self, i):
        return self.idx[i] == 0

    def has_one_color(self, i):
        return (self.flasks[i] == self.flasks[i][0]).all()

    def is_flask_solved(self, i):
        if self.is_flask_empty(i) or (self.is_flask_full(i) and self.has_one_color(i)):
            return True
        return False

    def top(self, i):
        return self.flasks[i][self.idx[i]-1]

    def pop(self, i):
        ball = self.flasks[i][self.idx[i]-1]
        self.flasks[i][self.idx[i] - 1] = -1
        self.idx[i] -= 1
        return ball

    def push(self, i, ball):
        self.flasks[i][self.idx[i]] = ball
        self.idx[i] += 1

    def is_push_allowed(self, i, ball):
        if self.is_flask_empty(i) or (self.top(i) == ball and not self.is_flask_full(i)):
            return True
        return False

    def is_invalid_init_state(self):
        return any(self.is_flask_full(i) and self.has_one_color(i) for i in range(self.tubes))

    def is_solved(self):
        return all(self.is_flask_solved(i) for i in range(self.tubes))

    def is_deadend(self):
        return not self.is_solved() and not self.valid_actions()

    def reward(self):
        if self.is_solved():
            return 1
        if self.is_deadend():
            return -1
        return 0

    def valid_actions(self):
        actions = []
        for i in range(self.tubes):
            if self.is_flask_solved(i):
                continue
            if self.is_flask_empty(i):
                continue
            top_i = self.top(i)
            for j in range(self.tubes):
                if i != j and self.is_push_allowed(j, top_i):
                    actions.append((i, j))
        return actions

    def play(self, action):
        self.push(action[1], self.pop(action[0]))
        self.actions.append(action)

    def undo_action(self):
        action = self.actions.pop()
        self.push(action[0], self.pop(action[1]))

    def dfs_solve(self, steps=0, path=[]):
        if self.is_solved():
            return True, steps, path
        if self.is_deadend():
            print("hey")
            return False, steps, path
        for a in self.valid_actions():
            self.play(a)
            current_state = str(self)
            if current_state in self.states:
                self.undo_action()
                continue
            self.states.add(current_state)
            path.append(deepcopy(self))
            r = self.dfs_solve(steps+1)
            if r[0]:
                return r
            self.undo_action()
        return False, steps, path

In [11]:
num_color = 3
max_len = 4
empty = 2

board = Board([
        [0, 1, 2, 0],
        [1, 1, 2, 0],
        [2, 0, 1, 2],
        [],
        [],
    ], num_color, max_len, empty)

In [31]:
class FCN(nn.Module):

    def __init__(self, num_color, max_len, empty=2) -> None:
        super().__init__()
        self.fc1 = nn.Linear((num_color + empty) * (max_len), 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, (num_color + empty) * (num_color + empty - 1))
        self.relu = torch.nn.ReLU()

    def forward(self, x):
        x = x.view(x.size(0), -1)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.relu(x)
        output = self.fc3(x)
        return output

In [33]:
x = torch.FloatTensor(board)
x

tensor([[ 0.,  1.,  2.,  0.],
        [ 1.,  1.,  2.,  0.],
        [ 2.,  0.,  1.,  2.],
        [-1., -1., -1., -1.],
        [-1., -1., -1., -1.]])

In [34]:
fcn = FCN(num_color, max_len, empty)
r = fcn(x.unsqueeze(0))
r.size(), r

(torch.Size([1, 20]),
 tensor([[-0.0093, -0.1958, -0.0582, -0.2112,  0.1820, -0.1105,  0.1671,  0.0668,
           0.0835,  0.0258, -0.0201,  0.0192, -0.0273, -0.1663, -0.0302, -0.0223,
           0.0126,  0.0068, -0.4020, -0.0320]], grad_fn=<AddmmBackward>))

In [None]:
class DeepQAgent:

    def __init__(self, epsilon, num_color, max_len, empty, batch_size=64, lr=5e-4, delta=1.0, gamma=0.99):
        self.epsilon = epsilon

        # The first model makes the predictions for Q-values which make a action.
        self.model = FCN(num_color, max_len, empty)
        # Build a target model for the prediction of future rewards.
        # The weights of a target model is fixed when the first model update weights.  
        # Thus when the loss between the Q-values is calculated the target Q-value is stable.
        self.target_model = FCN(num_color, max_len, empty)
        self.target_model.load_state_dict(self.model.state_dict())

        # optimizer.
        self._lr = lr
        self.optimizer = torch.optim.Adam(
            self.model.parameters(),
            lr=self._lr,
            # betas=(meta_conf.beta if hasattr(meta_conf, "beta") else 0.9, 0.999),
            # weight_decay=weight_decay,
        )

        # define criterion function.
        # self.criterion = torch.nn.HuberLoss(reduction='mean', delta=1.0)
        self.criterion = torch.nn.HuberLoss(delta=delta)
        # self.criterion = torch.nn.SmoothL1Loss()

        # fixed parameters.
        self._gamma = gamma
        self._batch_size = batch_size
        self._num_actions = (num_color + empty, num_color + empty - 1)

    def act(self, board):

        """This function is used to guide the move of agents."""
        allowed_actions = board.valid_actions()
        if random.random() < self.epsilon:
            # take random actions.
            action = random.choice(allowed_actions)
        else:
            with torch.no_grad():
                allowed_actions_linear = [i * self._num_actions[0] + j for i, j in allowed_actions]
                action_probs = self.model(board.flasks.unsqueeze(0))
                action_probs[allowed_actions_linear] = 0
                _, action = torch.max(action_probs, 1)
                i = action % (self._num_actions[1])
                j = action % (self._num_actions[1])
                action = (i, j)
        return action

    def train(self, replay_buffer, update_target=False):
        """This function is used to update the network parameters."""
        # retrive batches.
        # batch = replay_buffer.get_batch(self._batch_size)
        if len(replay_buffer) < replay_buffer.batch_size:
            return
        transitions = replay_buffer.sample(replay_buffer.batch_size)
        batch = Transition(*zip(*transitions))

        non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                                batch.next_state)), dtype=torch.bool)
        if replay_buffer.batch_size > 1:
            non_final_next_states = torch.cat([s.unsqueeze(0) for s in batch.next_state
                                        if s is not None], 0)
        else:
            if batch.next_state[0] is None:
                non_final_next_states = None
            else:
                non_final_next_states = torch.cat([batch.next_state[0].unsqueeze(0)], 0)

        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)

        state_action_values = self.model(state_batch).gather(1, action_batch.unsqueeze(1))

        next_state_values = torch.zeros(replay_buffer.batch_size)
        if non_final_next_states is not None:
            next_state_values[non_final_mask] = self.target_model(non_final_next_states).max(1)[0].detach()
        else:
            pass

        expected_state_action_values = (next_state_values * self._gamma) + reward_batch

        # calculate loss.
        loss = self.criterion(state_action_values, expected_state_action_values.unsqueeze(1))


        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        if update_target:
            self.target_model.load_state_dict(self.model.state_dict())

        return loss


Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))

class Buffer:
    """This class is used to instantiate replay buffers."""

    def __init__(self, capacity, batch_size):
        self.memory = deque([], maxlen=capacity)
        self.batch_size = batch_size

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [None]:
def validation(epsilon_opt, epsilon_learn, DeepQLearner, epoch=500):
    env = TictactoeEnv()
    Turns = np.array(['O','X'])
    reward_sum = 0
    
    for i in range(epoch):
        np.random.seed(i)
        
        # Our policy makes the first move in the first 250 games
        if i < 250:
            Turns = np.array(['O','X'])
        else:
            Turns = np.array(['X','O'])
        
        env.reset()
        grid, _, __ = env.observe()
        player_opt = OptimalPlayer(epsilon=epsilon_opt, player=Turns[0])
        player_learn = DeepQLearner
        player_learn.epsilon = epsilon_learn
        player_learn.player = Turns[1]

        for j in range(9):
            if env.current_player == player_opt.player:
                action = player_opt.act(grid)
                grid, end, winner = env.step(action, print_grid=False)
            else:
                state = grid2state(grid, player_learn.player).to(DEVICE)
                action = player_learn.act(state)
                # new_position = player_learn.act(grid, q_library)
                if check_available(grid2state(grid, player_learn.player), action.item()):
                    grid, end, winner = env.step(action.item(), print_grid=False)
                else:
                    # End the game if the agent takes an unavailable action
                    end = True
                    # Give the agent a negative reward
                    reward = -1
                    
                    # # Opt wins the games
                    env.winner = player_opt.player

            if end:
                reward_sum += env.reward(player=player_learn.player)
                env.reset()
                break
                
    return reward_sum/epoch

In [None]:
def run_simulation(epsilon_opt, buffer_capacity=10000, batch_size=64, epoch=20000, plot_interval=250, 
                   colour='black', epsilon_learn_init=None, n_star=None, fig_widge=None, mode=None):

    any_epsilon = (epsilon_learn_init is None) and (n_star is None)
    assert not any_epsilon, "Both epsilon_learn and n_star is None! At least having one of them."

    env = TictactoeEnv()        
    Turns = np.array(['X','O'])
    plot_interval = 250

    # rendering in figure widge
    if fig_widge is not None:
        init_figure_widge(fig_widge, mode, colour, 
                          epsilon_opt=epsilon_opt, 
                          epsilon_learn=epsilon_learn_init, 
                          n_star=n_star)
        
    replay_memory = Buffer(capacity=buffer_capacity, batch_size=batch_size)
    player_opt = OptimalPlayer(epsilon=epsilon_opt, player=Turns[0])
    player_learn = DeepQAgent(epsilon=epsilon_learn_init, player=Turns[1])

    epsilon_min = 0.1
    epsilon_max = 0.8

    reward_sum = 0
    loss_sum = 0
    loss_step = 0
    average_rewards = []
    average_losses = []

    for i in range(epoch):
        # if mode == 'decrease exploration' or 'validation':
        if epsilon_learn_init is None:
            assert n_star is not None, "In this setting, we use n_star to tune the epsilon_learn."
            epsilon_learn = max(epsilon_min, epsilon_max*(1-(i+1)/n_star))
        else:
            epsilon_learn = epsilon_learn_init
        
        player_learn.epsilon = epsilon_learn

        env.reset()
        grid, _, _ = env.observe()

        # Switch Order
        Turns = Turns[::-1]
        player_opt.player = Turns[0]
        player_learn.player = Turns[1]
        env.current_player  = 'X'

        state = None
        next_state = None

        for j in range(9):
            if env.current_player == player_opt.player:
                action_opt = player_opt.act(grid)
                grid, end, winner = env.step(action_opt, print_grid=False)
                
                # Get reward
                reward = env.reward(player=player_learn.player)
                
                # In case opt plays first
                if j > 0:
                    next_state = grid2state(grid, player_learn.player)
            else:
                state = grid2state(grid, player_learn.player)
                action = player_learn.act(state)
                action = action.to(DEVICE)

                # Check the availability of current action.
                if check_available(grid2state(grid, player_learn.player), action.item()):
                    grid, end, winner = env.step(action.item(), print_grid=False)

                    # Get reward.
                    reward = env.reward(player=player_learn.player)
                else:
                    # End the game if the agent takes an unavailable action
                    end = True
                    unavailable_action = True
                    # Give the agent a negative reward
                    reward = -1
                    
                    # Opponent wins the games
                    env.winner = player_opt.player

                    
            if not end:
                # In case opt players first - next_state does not exist
                if next_state != None:
                    replay_memory.push(state.unsqueeze(0), action, next_state, torch.tensor([reward], device=DEVICE))
                    if len(replay_memory) >= replay_memory.batch_size:
                        loss = player_learn.train(replay_memory)
                        loss_step += 1
                        loss_sum += loss
                    next_state = None
                    
            if end:
                # Once the game ends, no matter which player plays first
                # Update is the same.
                if env.winner == player_opt.player:
                    # If opt wins the game, reward is guaranteed to be update-to-date
                    reward = -1
                elif env.winner == player_learn.player:
                    # our agent wins.
                    reward = 1
                else:
                    # Draw
                    reward = 0

                next_state = None
                # Update target model every 500 epoch
                update_target = False

                if (i+1) % 500 == 0:
                    update_target = True
                replay_memory.push(state.unsqueeze(0), action, next_state, torch.tensor([reward], device=DEVICE))
                if len(replay_memory) >= replay_memory.batch_size:
                    loss = player_learn.train(replay_memory, update_target)
                    loss_step += 1
                    loss_sum += loss
                    
                reward_sum += reward
                env.reset()
                break       
            
        #############################
        ######### Plot ##############

        if (i+1) % plot_interval == 0:
            # calculate average reward at the end of the current interval.
            average_reward = reward_sum / plot_interval  
            average_loss = loss_sum / loss_step
            # print(average_reward)  
            # print(average_loss)
            
            if not 'validate' in mode:
                idx = len(fig_widge.data)-2
                
                fig_widge.data[idx].x = np.append(fig_widge.data[idx].x, i+1)[0:]
                fig_widge.data[idx].y = np.append(fig_widge.data[idx].y, average_reward)[0:]
                
                fig_widge.data[idx+1].x = np.append(fig_widge.data[idx+1].x, i+1)[0:]
                fig_widge.data[idx+1].y = np.append(fig_widge.data[idx+1].y, average_loss.item())[0:]
                
            else:
                M_opt = validation(0, 0, player_learn, epoch=500)
                M_rand = validation(1, 0, player_learn, epoch=500)
                
                idx = len(fig_widge.data)-2
                fig_widge.data[idx].x = np.append(fig_widge.data[idx].x, i+1)[0:]
                fig_widge.data[idx].y = np.append(fig_widge.data[idx].y, M_opt)[0:]
                fig_widge.data[idx+1].x = np.append(fig_widge.data[idx+1].x, i+1)[0:]
                fig_widge.data[idx+1].y = np.append(fig_widge.data[idx+1].y, M_rand)[0:]
                fig_widge.layout.title.text = f'Epoch {i+1}, M_opt = {M_opt}, M_rand = {M_rand}'

            average_rewards.append(average_reward)
            average_losses.append(average_loss)
            # reset reward_sum.
            reward_sum = 0
            loss_sum = 0

In [None]:
def self_practice_simulation(epsilon_opt=None, epoch=20000, plot_interval=250, 
                   colour='black', epsilon_learn_init=None, n_star=None, fig_widge=None, mode=None):

    any_epsilon = (epsilon_learn_init is None) and (n_star is None)
    assert not any_epsilon, "Both epsilon_learn and n_star is None! At least having one of them."

    env = TictactoeEnv()        
    Turns = np.array(['X','O'])
    plot_interval = 250

    # rendering in figure widge
    if fig_widge is not None:
        if n_star is None:
            fig_widge.add_scatter(x=[0], y=[0], name=f'M-opt vs. Q-agent, epsilon = {epsilon_learn_init}', marker=dict(color=colour))
            fig_widge.add_scatter(x=[0], y=[0], name=f'M-rand vs. Q-agent, epsilon = {epsilon_learn_init}', marker=dict(color=colour), line=dict(dash='dot'))
        else:
            fig_widge.add_scatter(x=[0], y=[0], name=f'M-opt vs. Q-agent, n* = {n_star}', marker=dict(color=colour))
            fig_widge.add_scatter(x=[0], y=[0], name=f'M-rand vs. Q-agent, n* = {n_star}', marker=dict(color=colour), line=dict(dash='dot'))
        
    replay_memory = Buffer(capacity=10000, batch_size=64)
    shared_model = FCN()
    # Use player_A as our agent. Need to exchange order when using player_B.
    player_A = DeepQAgent(epsilon=epsilon_learn_init, player='O', model=shared_model)
    player_B = DeepQAgent(epsilon=epsilon_learn_init, player='X', model=shared_model)

    epsilon_min = 0.1
    epsilon_max = 0.8

    reward_sum_A = 0
    loss_sum_A = 0
    loss_step_A = 0
    average_rewards_A = []
    average_losses_A = []

    for i in range(epoch):
        # if mode == 'decrease exploration' or 'validation':
        if epsilon_learn_init is None:
            assert n_star is not None, "In this setting, we use n_star to tune the epsilon_learn."
            epsilon_learn = max(epsilon_min, epsilon_max*(1-(i+1)/n_star))
        else:
            epsilon_learn = epsilon_learn_init
        
        player_A.epsilon = epsilon_learn
        player_B.epsilon = epsilon_learn

        env.reset()
        grid, _, _ = env.observe()

        # Switch Order
        # Turns = Turns[::-1] 
        # env.current_player = Turns[0]
        Turns = Turns[::-1]
        player_A.player = Turns[0]
        player_B.player = Turns[1]
        env.current_player  = 'X'

        state_A = None
        next_state_A = None
        state_B = None
        next_state_B = None

        for j in range(9):
            if env.current_player == player_A.player:
                # state_A is observed from the perspective of A.
                state_A = grid2state(grid, player_A.player)
                action_A = player_A.act(state_A)
                action_A = action_A.to(DEVICE)
                
                # Get reward
                if check_available(grid2state(grid, player_A.player), action_A.item()):
                    grid, end, winner = env.step(action_A.item(), print_grid=False)

                    # Get reward.
                    reward_A = env.reward(player=player_A.player)
                    # In case A plays first.
                    if j > 0:
                        next_state_B = grid2state(grid, player_B.player)
                else:
                    # End the game if the agent takes an unavailable action
                    end = True
                    # Give the agent a negative reward
                    reward_A = -1
                    
                    # # Opt wins the games
                    env.winner = player_B.player

            else:
                # state_B is observed from the perspective of B.
                state_B = grid2state(grid, player_B.player)
                action_B = player_B.act(state_B)
                action_B = action_B.to(DEVICE)

                if check_available(grid2state(grid, player_B.player), action_B.item()):
                    grid, end, winner = env.step(action_B.item(), print_grid=False)

                    # Get reward.
                    reward_B = env.reward(player=player_B.player)
                    # In case A plays first
                    if j > 0:
                        next_state_A = grid2state(grid, player_A.player)
                else:
                    # End the game if the agent takes an unavailable action
                    end = True
                    # Give the agent a negative reward
                    reward_B = -1
                    
                    # # Opt wins the games
                    env.winner = player_A.player
    
            if not end:
                # In case opt players first - next_state does not exist
                if next_state_A != None:
                    replay_memory.push(state_A.unsqueeze(0), action_A, next_state_A, torch.tensor([reward_A], device=DEVICE))
                    if len(replay_memory) >= replay_memory.batch_size:
                        loss_A = player_A.train(replay_memory)
                        loss_sum_A += loss_A
                        loss_step_A += 1
                    next_state_A = None
                
                if next_state_B != None:
                    replay_memory.push(state_B.unsqueeze(0), action_B, next_state_B, torch.tensor([reward_B], device=DEVICE))
                    if len(replay_memory) >= replay_memory.batch_size:
                        _ = player_B.train(replay_memory)
                    next_state_B = None
                          
            if end:
                # Once ending the game, no matter which player plays first
                # Update is the same
                if env.winner == player_A.player:
                    # If opt wins the game, reward is guaranteed to be update-to-date
                    reward_A = 1
                    reward_B = -1
                elif env.winner == player_B.player:
                    reward_A = -1
                    reward_B = 1
                else:
                    # Draw
                    reward_A = 0
                    reward_B = 0
                
                # If one agent wins the game, we don't actually real next_state
                # Because q(s', a') will be cancelled out.
                next_state_A = None
                next_state_B = None
                
                replay_memory.push(state_A.unsqueeze(0), action_A, next_state_A, torch.tensor([reward_A], device=DEVICE))
                replay_memory.push(state_B.unsqueeze(0), action_B, next_state_B, torch.tensor([reward_B], device=DEVICE))
                
                # Update target model every 500 epoch.
                update_target = False

                if (i+1) % 500 == 0:
                    update_target = True

                # Training model when game over, and replay_memory size >= batch size
                if len(replay_memory) >= replay_memory.batch_size:
                    loss_A = player_A.train(replay_memory, update_target)
                    loss_sum_A += loss_A
                    loss_step_A += 1
                    _ = player_B.train(replay_memory, update_target)

                    
                # always focus on the average reward of player_A.
                reward_sum_A += reward_A
                env.reset()
                break       
            
        #############################
        ######### Plot ##############

        if (i+1) % plot_interval == 0:
            # calculate average reward at the end of the current interval.
            average_reward_A = reward_sum_A / plot_interval  
            average_loss_A = loss_sum_A / loss_step_A
            print(average_reward_A)  
            print(average_loss_A)
            
            M_opt_1 = validation(0, 0, player_A, epoch=500)
            M_rand_1 = validation(1, 0, player_A, epoch=500)

            idx = len(fig_widge.data)-2
            fig_widge.data[idx].x = np.append(fig_widge.data[idx].x, i+1)[0:]
            fig_widge.data[idx].y = np.append(fig_widge.data[idx].y, M_opt_1)[0:]
            fig_widge.data[idx+1].x = np.append(fig_widge.data[idx+1].x, i+1)[0:]
            fig_widge.data[idx+1].y = np.append(fig_widge.data[idx+1].y, M_rand_1)[0:]
            fig_widge.layout.title.text = f'Epoch {i+1}, M_opt = {M_opt_1}, M_rand = {M_rand_1}'

            average_rewards_A.append(average_reward_A)
            average_losses_A.append(average_loss_A)
            # reset reward_sum.
            reward_sum_A = 0
            loss_sum_A = 0
            loss_step_A = 0

    return (shared_model, replay_memory)