### DQN on SnL


In [64]:
# imports!!

## gym
import gym
from gym import spaces

## rest
import random
import numpy as np
import pickle
from tqdm import tqdm

In [65]:
check = spaces.MultiDiscrete([6]+[101 for _ in range(0, 8)])
check.sample()

array([ 1, 55, 53, 15, 50,  7, 31, 81, 14])

In [66]:
class Player:
    def __init__(self, symbol):
        self.moves = 10
        
        # player token positions [ 1 - 100 ]
        self.pos_token_array = np.zeros(4,)
        self.symbol = symbol
    
    def get_score(self): 
        score = 0
        for token_position in self.pos_token_array:
            if token_position == 100:
                score += 50
            else:
                score += token_position

        return score


In [180]:
class SnlBoard:
    
    def __init__(self,printing=False):
        
        self.print_info = printing
        
        # 100 positions available
        # player 2 is a random bot
        
        self.board = np.zeros(shape=(8,100))
        
        self.die_val = -1
        self.total_positions = 100
        self.ties = 0
        
        self.token_home_reward = 20
        self.invalid_move_reward = -50
        self.game_won_reward = 100
        self.game_lost_reward = 100
        self.game_tie_reward = 50
        
        self.p1_wins = 0
        self.p2_wins = 0
        
        self.opp = {1:2,2:1}
        
        self.info = dict()
        
    def reset(self,state):
        """resets the board to its initial state

        Args:
            state (_type_): sample from the observation_state [gym space defined for the gym environment] 
        """        
        # get initial die value
        self.die_val = state[0]  # values [0 - 5]
        
        # get initial state would always be array(2,4) of zeros
        self.board = state[1:].reshape(2,4)
        
        # board info saved
        self.info['starting_state'] = self.board
        
        self.p1 = Player(1)
        self.p2 = Player(2)
        
        self.info = dict()
        
        if(self.print_info):
            print('################################')
            print('environment state:')
            print('die value :{}\nboard state :\n{}'.format(self.die_val + 1, self.board))
            print('player 1 init: ', self.p1)
            print('player 2 init: ', self.p2)
            print('################################')
        
     # step for gym environment 
    
    def perform_step(self, action):
        """perform one step
        i.e player 1 plays and then player 2
        return board state after this
        """        
        
        # reset die value to -1 after p2 turn
        if self.die_val == -1:
            self.die_val = np.random.randint(0, 6)
            
        ######## player 1 plays
        reward = 0
        observation = self.get_board_state()
        is_game_end = False
        
        # action type [VALID | INVALID]
        action_type = self.player_plays(self.p1, action)
        self.p1.moves += -1
        
        # in case the action is invalid
        if action_type == 'INVALID':                
            # get reward
            reward = self.invalid_move_reward

        if self.print_info:
            print('######P1')
            print('action :',action)
            print('die: ', self.die_val + 1)
            print('board state: ', self.get_board_state())
            print('p1 score: ', self.p1.get_score())
            print('p1 token positions: ', self.p1.pos_token_array)
            print('action type: ', action_type)
            print('\n')
        
        ######## player 2 plays

        # roll die
        self.die_val = np.random.randint(0,6) # [ 0-5 ]
        
        # action type does not matter for p2
        action_type = self.player_plays(self.p2, np.random.randint(0,4))
        self.p2.moves += -1
        
        if(self.print_info):
            print('######P2')
            print('die: ', self.die_val + 1)
            print('board state: ', self.get_board_state())
            print('p2 score: ', self.p2.get_score())
            print('p2 token positions: ', self.p2.pos_token_array)
            print('\n')
        
        ####### setup for gym
        
        # 1. get final state
        self.die_val = np.random.randint(0,6) # [ 0-5 ] die roll for next state
        observation = np.concatenate((np.array([self.die_val]), self.get_gym_state())) # observation for next state
        
        # 2. get final reward
        is_game_end = self.game_finished()

        if(is_game_end): # rewards given at end of game
            reward += self.game_end_rewards()
        
        score_diff = (self.p1.get_score() - self.p2.get_score())/4 # score diff rewards
        
        reward += score_diff
        
        # return step output            
        return (observation,reward,is_game_end,{})
    
    def get_gym_state(self):
        return np.concatenate((self.p1.pos_token_array,self.p2.pos_token_array))
    
    def game_end_info(self):
        return self.info
        
    def is_invalid_move(self, current_position, new_position, active_player:Player):
        
        # check if new position is out of bounds
        if new_position > 100:
            return True
        
        return False
    
    def game_end_rewards(self):
        p1_won = False
        is_tie = False
        
        if self.p1.get_score() > self.p2.get_score():
            p1_won = True
        elif self.p1.get_score() == self.p2.get_score():
            is_tie = True
        
        if p1_won:
            self.info['p1_won'] = True
            self.info['p2_won'] = False
            self.info['tie'] = False
            
            return self.game_won_reward
        elif is_tie:
            self.info['p1_won'] = False
            self.info['p2_won'] = False
            self.info['tie'] = True
            
            return self.game_tie_reward
        else:
            self.info['p1_won'] = False
            self.info['p2_won'] = True
            self.info['tie'] = False
            
            return self.game_lost_reward
              
    def get_board_state(self):
        """
        get board state
        - combination of state and die_val 
        """
        # (die value - 1) + (board state)
        return np.concatenate((self.p1.pos_token_array.flatten(),self.p2.pos_token_array.flatten()))
    
    def player_plays(self, active_player:Player, action):        
        # player plays turn
        token_to_move = action
        
        # board update state
        return self.board_update_after_turn(active_player, token_to_move)
          
    def board_update_after_turn(self, active_player : Player, token_to_move):
        """ 
        1. get the new position for the current token
        2. check if snakes or ladder
        3. update position if 2. is true
        4. check if enemy token is already present
        5. update enemy position if 4. is true
        6. check if self token is already present
        7. update position accordingly
        Args:
            token_symbol (_type_): symbol of token which require updates
        """
                
        # 1. get current position of the token from board
        
        new_token_position = -1
        current_token_position = active_player.pos_token_array[token_to_move]
        
        # get new possible position
        new_token_position = current_token_position + (self.die_val + 1) # die value [0,5]

        # check if valid position
        if(self.is_invalid_move(current_token_position, new_token_position, active_player)):
            return 'INVALID'
        
        
        # 2. & 3. update position if snakes or ladder
        new_token_position,SnL = self.snake_and_ladder(new_token_position)
        
        # 4. check if enemy is present
        enemy_state, enemy_count = self.enemy_check(new_token_position,active_player)
        
        # update to new position
        active_player.pos_token_array[token_to_move] = new_token_position
        # enemy present ? | number of enemy
        if enemy_state:
            if enemy_count == 1 and new_token_position != 100:
                if self.opp[active_player.symbol] == 2:
                    mod_index = np.min(np.where(self.p2.pos_token_array == new_token_position))
                    self.p2.pos_token_array[mod_index] = 0
                else:
                    mod_index = np.min(np.where(self.p1.pos_token_array == new_token_position))
                    self.p1.pos_token_array[mod_index] = 0
        
        return 'VALID'
         
    def enemy_check(self, position, active_player:Player):
        """checks if an enemy player is present in the position of the moving token

        Args:
            position (_type_): position on board [1-100]
            active_player (Agent): current active player
        """
        enemies = 0
        
        if (active_player.symbol == 1):
            # check if p2 present in position
            for pos in self.p2.pos_token_array:
                if pos == position:
                    enemies += 1
        else:
            # check if p2 present in position
            for pos in self.p1.pos_token_array:
                if pos == position:
                    enemies += 1

        if enemies > 0:
            return (True, enemies)
        
        return (False,enemies)
    
    def game_finished(self):
        """check if game finish condition is met
        condtion 1 : if the number moves for each player is exhausted
        condtion 2 : if any of the player reach 100 before moves are exhausted
        """
        if self.p1.moves == 0 and self.p2.moves == 0:
            return True
        return False
    
    def snake_and_ladder(self,position:int):
        """takes the current position of player and returns the updated position in case of snake or ladder
        """        
        if position in self.get_snakes():
            return (self.get_snakes()[position],'snake')
        if position in self.get_ladders():
            return (self.get_ladders()[position],'ladder')
        return (position,'None')
    
    def get_snakes(self):
        snakes = {
            99:4,
            30:11,
            52:29,
            70:51,
            94:75
        }
        return snakes
    
    def get_ladders(self):
        ladders = {
            3:84,
            7:53,
            15:96,
            21:98,
            54:93
        }
        return ladders

#### board test

1. Board Initialization

In [155]:
# Snl Board tests
board = SnlBoard(printing=True)
initital_state = np.concatenate( ( np.array([np.random.randint(0,6)]),np.zeros((2,4)).flatten() ) )

# perform reset
print('\n','**inital state** :',initital_state,'\n')
board.reset(initital_state)


 **inital state** : [2. 0. 0. 0. 0. 0. 0. 0. 0.] 

################################
environment state:
die value :3.0
board state :
[[0. 0. 0. 0.]
 [0. 0. 0. 0.]]
player 1 init:  <__main__.Player object at 0x12bc8bf10>
player 2 init:  <__main__.Player object at 0x16e5fac70>
################################


2. Perform a step action

In [125]:
# perform step function
random_action = np.random.randint(0,4)
print('\nrandom_action : ',random_action)
step_result = board.perform_step(random_action)

print(step_result)

game_fin = step_result[2]



random_action :  2
######P1
die:  6.0
board state:  [0. 0. 6. 0. 0. 0. 0. 0.]
p1 score:  6.0
p1 token positions:  [0. 0. 6. 0.]
action type:  VALID


######P2
die:  5
board state:  [0. 0. 6. 0. 0. 5. 0. 0.]
p2 score:  5.0
p2 token positions:  [0. 5. 0. 0.]


(array([2., 0., 0., 6., 0., 0., 5., 0., 0.]), 0.25, False, {})


3. Run a full Game

In [126]:
# perform a game loop
while not game_fin: # game finished state
    random_action = np.random.randint(0,4)
    print('\nrandom_action : ',random_action)
    step_result = board.perform_step(random_action)
    print(step_result)
    print('################################################################')
    game_fin = step_result[2]


random_action :  1
######P1
die:  3
board state:  [ 0. 84.  6.  0.  0.  5.  0.  0.]
p1 score:  90.0
p1 token positions:  [ 0. 84.  6.  0.]
action type:  VALID


######P2
die:  4
board state:  [ 0. 84.  6.  0.  0.  5.  0.  4.]
p2 score:  9.0
p2 token positions:  [0. 5. 0. 4.]


(array([ 5.,  0., 84.,  6.,  0.,  0.,  5.,  0.,  4.]), 20.25, False, {})
################################################################

random_action :  1
######P1
die:  6
board state:  [ 0. 90.  6.  0.  0.  5.  0.  4.]
p1 score:  96.0
p1 token positions:  [ 0. 90.  6.  0.]
action type:  VALID


######P2
die:  4
board state:  [ 0. 90.  6.  0.  0.  9.  0.  4.]
p2 score:  13.0
p2 token positions:  [0. 9. 0. 4.]


(array([ 5.,  0., 90.,  6.,  0.,  0.,  9.,  0.,  4.]), 20.75, False, {})
################################################################

random_action :  2
######P1
die:  6
board state:  [ 0. 90. 12.  0.  0.  9.  0.  4.]
p1 score:  102.0
p1 token positions:  [ 0. 90. 12.  0.]
action type:  VALID


##

#### Setup GYM environment

In [181]:
import gym
import numpy as np
from gym import spaces


class SNL_env(gym.Env):
    """Custom Environment that follows gym interface."""

    metadata = {"render.modes": ["human"]}

    def __init__(self, printing=False):
        super().__init__()
        # Define action and observation space
        # They must be gym.spaces objects
        # Example when using discrete actions:
        self.action_space = spaces.Discrete(4)
        
        ## 8 [total tokens] * 100 [positions]
        self.observation_space = spaces.MultiDiscrete([6]+[101 for _ in range(0, 8)])
        self.SNLBoard = SnlBoard(printing)
        

    def step(self, action):
        # player 1 plays
        # player 2 plays
        # new state is observed
        # reward is calculated
        # check if game is completed
        # info is optional
        
        observation, reward, done, info = self.SNLBoard.perform_step(action)
        
        return observation, reward, done, info

    def reset(self):
        # initializing state
        state = np.concatenate(([np.random.randint(0,6)],np.zeros(shape=(2,4)).flatten()))
        
        # reset board | set inital state
        self.SNLBoard.reset(state)
        
        return state
    

    def render(self, mode='console'):
        if mode != 'console':
            raise NotImplementedError()
        
        return self.SNLBoard.game_end_info()
        

#### Modeling

In [165]:
from stable_baselines3 import A2C,PPO,
from stable_baselines3.common.env_checker import check_env

# Instantiate the env
env = SNL_env()
# check environment validity
check_env(env, warn=True)

In [166]:
# Instantiate the env
env = SNL_env()
# Define and Train the agent
model = A2C(policy='MlpPolicy',env=env).learn(total_timesteps=1000000)
model.save("A2CMlP")

In [185]:
# Instantiate the env
env = SNL_env()
# Define and Train the agent
model = PPO(policy='MlpPolicy',env=env).learn(total_timesteps=1000000)
model.save("PPO")

#### SnL

**MultiDiscrete** _observation space_ for SnL 

**Discrete** _action space_ for SnL [only 4 actions are allowed]

- state representation will be as follows:

    [
        [0,0,0,0,0,0,0,0],[0,0,0,0,0,0,0,0] ..... ..... x 100
    ]

- reward should be very bad for an illegal action
- reward should be given proportionally to the difference of score between the players [handles snake and ladder case as well]

In [66]:
# observation space for SnL
observation_space = spaces.MultiDiscrete([6]+[3 for _ in range(0, 800)])
observation_space.sample()

# action space for SnL
# action_space = spaces.Discrete(4)

array([4, 0, 2, 1, 2, 2, 2, 0, 1, 1, 1, 2, 1, 1, 2, 2, 2, 2, 2, 0, 2, 1,
       1, 0, 0, 0, 2, 0, 2, 2, 2, 0, 2, 0, 2, 1, 2, 1, 0, 0, 0, 1, 1, 2,
       0, 2, 1, 2, 1, 1, 0, 1, 0, 0, 0, 2, 1, 0, 2, 0, 2, 1, 0, 0, 1, 0,
       2, 0, 1, 2, 0, 1, 2, 0, 1, 0, 2, 1, 0, 2, 1, 2, 0, 0, 1, 0, 0, 1,
       2, 1, 2, 2, 2, 1, 1, 1, 1, 0, 2, 2, 1, 1, 0, 2, 0, 1, 2, 2, 1, 2,
       2, 1, 1, 1, 2, 2, 2, 0, 0, 0, 0, 1, 2, 2, 0, 0, 2, 2, 2, 0, 0, 1,
       0, 1, 0, 1, 1, 2, 1, 1, 1, 0, 1, 2, 0, 0, 2, 0, 1, 1, 0, 1, 1, 0,
       0, 0, 1, 2, 2, 1, 2, 0, 1, 2, 2, 0, 2, 0, 1, 2, 2, 1, 1, 1, 2, 0,
       0, 2, 1, 0, 0, 2, 0, 0, 1, 2, 1, 2, 1, 2, 0, 2, 0, 1, 2, 0, 0, 1,
       1, 2, 1, 0, 1, 0, 2, 2, 2, 0, 0, 1, 2, 1, 0, 2, 0, 1, 0, 1, 2, 1,
       0, 1, 2, 0, 0, 0, 0, 1, 0, 2, 2, 1, 1, 0, 0, 1, 0, 1, 0, 0, 2, 2,
       0, 2, 1, 2, 2, 2, 2, 0, 1, 0, 2, 0, 0, 0, 0, 2, 1, 1, 2, 2, 1, 1,
       1, 2, 2, 1, 0, 1, 1, 0, 0, 1, 1, 2, 2, 0, 2, 1, 1, 2, 2, 2, 0, 0,
       0, 2, 0, 1, 1, 0, 2, 1, 1, 0, 1, 2, 1, 0, 1,