# 0. Install Dependencies

# 1. Test Random Environment with OpenAI Gym

In [1]:
from gym import Env
from gym.spaces import Discrete, Box
import numpy as np
import random
import gym
from gym import spaces

In [20]:
class BaghchalEnv(gym.Env):
    metadata = {'render.modes': ['human', 'ascii','rgb_array']}
    
    def __init__(self):
        self.board_size = 5
        self.num_tigers = 4
        self.num_goats = 20
        self.goats_captured = 0
        self.winner = ''
        
        self.board = np.zeros((self.board_size, self.board_size), dtype=int)
        self.tigers_positions = []
        self.goats_positions = []
        self.turn = -1  # 1: tigers, -1: goats
        self.done = False
        self.moves_since_last_capture = 0
        self.max_moves_without_capture = 50

        self.action_space = spaces.Tuple((spaces.Discrete(self.board_size), spaces.Discrete(self.board_size)))
        self.observation_space = spaces.Box(low=-1, high=1, shape=(self.board_size, self.board_size), dtype=int)
    
    def reset(self):
        #super().reset()
        self.board = np.zeros((self.board_size, self.board_size), dtype=int)
        self.tigers_positions = [(0, 0), (0, 4), (4, 0), (4, 4)]
        self.goats_positions = []
        self.turn = -1 #goat move
        self.done = False
        self.goats_captured = 0
        self.winner = ''

        
        for i, j in self.tigers_positions:
            self.board[i, j] = 1
        
        self.previous_states = [self.board.copy()]
        
        return self.board.copy()

    def baghchal_reward(self, state):
        """
        Calculates the reward for a given state of the Baghchal game.
        
        Parameters:
        state (numpy.ndarray): The current state of the game.
        
        Returns:
        tuple: A tuple (reward_tigers, reward_goats) representing the rewards for the tigers and goats players, respectively.
        """
        if self.done:
            if self.winner == 'T':
                return(1,-1)
            elif self.winner=='G':
                return(-1,1)
            elif self.winner == 'D':
                return(0,0)
        return(0,0)
        # num_tigers = np.sum(state == 1)
        # num_goats = np.sum(state == -1)
        
        # if num_tigers < 4:  # if tigers have been captured
        #     return (-1, 1)
        
        # if num_goats < 5:  # if goats have captured more than half of the tigers
        #     return (1, -1)
        
        # return (0, 0)


    
    def step(self, action):
        # if self.done:
        #         if self.turn == 1:  # Tiger wins
        #             return self.board.copy(), 10, True, {}
        #         else:  # Goat wins
        #             return self.board.copy(), 1, True, {}
                
        #self.is_game_over()

        i, j = action
        
        if self.turn == 1:  # tigers turn
            if (i, j) in self.tigers_positions:
                return self.board.copy(), 0, False, {}
            if self.board[i, j] != 0:
                return self.board.copy(), 0, False, {}
            
            valid_moves = [(i-1, j), (i+1, j), (i, j-1), (i, j+1)]
            valid_moves = [(p, q) for p, q in valid_moves if 0 <= p < self.board_size and 0 <= q < self.board_size]
            
            for p, q in valid_moves:
                if self.board[p, q] == -1:
                    middle_i, middle_j = (i+p)//2, (j+q)//2
                    if (middle_i, middle_j) in self.tigers_positions:
                        self.board[i, j] = 1
                        self.board[middle_i, middle_j] = 0
                        self.tigers_positions.remove((middle_i, middle_j))
                        self.tigers_positions.append((i, j))
                        self.moves_since_last_capture = 0
                        self.goats_captured += 1
                        self.turn = -1
                        break
                        
            else:  # no goat was eaten
                self.board[i, j] = 1
                
                for p, q in self.tigers_positions:
                    if (p, q) != (i, j):
                        self.board[p, q] = 0
                        
                self.tigers_positions = [(i, j)]
                self.moves_since_last_capture += 1
                self.turn = -1
                
        else:  # goats turn
            if len(self.goats_positions) < 20:
                if self.board[i, j] != 0:
                    return self.board.copy(), 0, False, {}
                
                self.board[i, j] = -1
                self.goats_positions.append((i, j))
                self.turn = 1

            else:
                goat_found = False
                
                for p, q in [(i-1, j), (i+1, j), (i, j-1), (i, j+1)]:
                    if 0 <= p < self.board_size and 0 <= q < self.board_size and self.board[p, q] == -1:
                        if (p, q) not in self.goats_positions:
                            self.board[p, q] = -1
                            self.board[i, j] = 0
                            self.goats_positions.remove((i, j))
                            self.goats_positions.append((p, q))
                            goat_found = True
                            break
                            
                if not goat_found:
                    return self.board.copy(), (0,0), False, {}
                
                self.turn = 1
                
        # Check if the game is over after the move
        self.is_game_over()
        
        # Update the previous states list
        self.previous_states.append(self.board.copy())
        
        # Check if the game is a draw due to stalemate
        if self.moves_since_last_capture >= self.max_moves_without_capture:
            self.done = True
            self.winner = 'D'
            #return self.board.copy(),(0,0),self.done,{}
        
        # Return the board, reward, done flag and an empty dictionary
        reward_tigers, reward_goats = self.baghchal_reward(self.board)
        return self.board.copy(), (reward_tigers, reward_goats), self.done, {}
        



    def _rgb_array(self):
        """
        Return a numpy array representing the RGB image of the current state of the game.
        """
        board_rgb = np.zeros((self.board_size, self.board_size, 3), dtype=np.uint8)
        board_rgb[self.board == 1] = np.array([255, 0, 0])  # tigers positions marked with red color
        board_rgb[self.board == -1] = np.array([255, 255, 255])  # goats positions marked with white color
        board_rgb = np.rot90(board_rgb)
        return board_rgb

    def render(self, mode='human'):
        if mode == 'human':
            print(self.board)
        elif mode == 'ascii':
            print('  ' + ' '.join(str(i) for i in range(self.board_size)))
            for i in range(self.board_size):
                row = ''.join('.' if self.board[i, j] == 0 else 'T' if (i, j) in self.tigers_positions else 'G' for j in range(self.board_size))
                print(i, row)
        elif mode == 'rgb_array':
            return self._rgb_array()
        else:
            super(BaghchalEnv, self).render(mode=mode)


    def close(self):
        pass


    def is_game_over(self):
        # if len(self.goats_positions) == 0:
        #     self.done = True
        #     return True

        # Check if tigers have captured 5 goats
       # if len(self.tigers_positions) == 0 or (self.num_goats - len(self.goats_positions)) >= 5:
        if (self.goats_captured >= 5) and (self.turn==1):
            self.done = True
            self.winner = 'T'
            return True

        # Check if goats have blocked tigers from being able to move
        for i, j in self.tigers_positions:
            valid_moves = [(i-1, j), (i+1, j), (i, j-1), (i, j+1)]
            valid_moves = [(p, q) for p, q in valid_moves if 0 <= p < self.board_size and 0 <= q < self.board_size]
            for p, q in valid_moves:
                if self.board[p, q] == 0:
                    return False
            self.done = True
            self.winner = 'G'
            return True



#The `__init__` method initializes the environment, with the board size, number of tigers, and number of goats. The board is represented as a numpy array of integers, where 0 means empty, 1 means tiger, and -1 means goat. The `turn` variable keeps track of whose turn it is (tigers or goats), and the `done` variable is set to `True` when the game is over.

#The `reset` method resets the environment to its initial state, with the tigers and goats positioned in their starting positions.

#The `step` method receives an action (a tuple of integers representing a row and a column), and updates the board state accordingly. If the action is invalid (e.g. trying to move to an occupied cell), no change is made to the board. If it is a valid move, and the tigers can eat a goat, they do so and receive a reward of 1. If a goat reaches the other side of the board, the game is over and the goats win with a reward of 10.

#The `render` method can display the board in either "human" or "ascii" mode. In "human" mode, it prints the numpy array to the console. In "ascii" mode, it prints a more visually appealing representation of the board.

#The `close` method is empty, as it is not needed for this environment.

In [21]:
env = BaghchalEnv()

In [22]:
env.observation_space.sample()

array([[-1,  1,  1,  0,  1],
       [ 1,  1, -1,  1, -1],
       [ 0,  1,  1,  1, -1],
       [-1, -1,  1,  1, -1],
       [-1,  0,  1,  1,  1]])

In [23]:
episodes = 10
for episode in range(1, episodes+1):
    state = env.reset()
    done = False
    score = 0 
    
    while not done:
        env.render()
        action = env.action_space.sample()
        n_state, reward, done, info = env.step(action)

    print('Episode:{} Tiger_Score:{} Goat_Score{}'.format(episode, reward[0], reward[1]))

[[1 0 0 0 1]
 [0 0 0 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]
 [1 0 0 0 1]]
[[ 1  0  0  0  1]
 [ 0  0  0  0  0]
 [ 0 -1  0  0  0]
 [ 0  0  0  0  0]
 [ 1  0  0  0  1]]
[[ 0  0  0  0  0]
 [ 0  0  0  0  0]
 [ 0 -1  0  1  0]
 [ 0  0  0  0  0]
 [ 0  0  0  0  0]]
[[ 0  0  0  0  0]
 [ 0  0  0  0  0]
 [ 0 -1  0  1  0]
 [ 0  0  0 -1  0]
 [ 0  0  0  0  0]]
[[ 1  0  0  0  0]
 [ 0  0  0  0  0]
 [ 0 -1  0  0  0]
 [ 0  0  0 -1  0]
 [ 0  0  0  0  0]]
[[ 1  0  0  0  0]
 [ 0  0  0  0  0]
 [ 0 -1  0  0  0]
 [ 0  0  0 -1 -1]
 [ 0  0  0  0  0]]
[[ 1  0  0  0  0]
 [ 0  0  0  0  0]
 [ 0 -1  0  0  0]
 [ 0  0  0 -1 -1]
 [ 0  0  0  0  0]]
[[ 0  0  0  0  0]
 [ 0  0  0  0  0]
 [ 0 -1  0  1  0]
 [ 0  0  0 -1 -1]
 [ 0  0  0  0  0]]
[[ 0  0  0  0  0]
 [ 0  0  0  0  0]
 [ 0 -1 -1  1  0]
 [ 0  0  0 -1 -1]
 [ 0  0  0  0  0]]
[[ 0  0  0  1  0]
 [ 0  0  0  0  0]
 [ 0 -1 -1  0  0]
 [ 0  0  0 -1 -1]
 [ 0  0  0  0  0]]
[[ 0  0  0  1  0]
 [ 0  0  0  0  0]
 [ 0 -1 -1  0  0]
 [ 0  0 -1 -1 -1]
 [ 0  0  0  0  0]]
[[ 0  0  0  0  0]
 [ 0  

# 2. Create a Deep Learning Model with Keras

In [6]:
import numpy as np
import tensorflow
import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.optimizers import Adam

In [7]:
# states = env.observation_space.shape
# actions = env.action_space.n

states = env.observation_space.shape
actions = env.action_space[0].n * env.action_space[1].n  # or actions = env.action_space[0].n ** 2



In [8]:
actions

25

In [9]:
def build_model(states, actions):
    model = Sequential()    
    model.add(Dense(24, activation='relu', input_shape=states))
    model.add(Dense(24, activation='relu'))
    model.add(Dense(actions, activation='linear'))
    return model

In [64]:
del model 

In [10]:
model = build_model(states, actions)

Metal device set to: Apple M1

systemMemory: 16.00 GB
maxCacheSize: 5.33 GB



2023-02-18 21:19:47.554874: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:305] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2023-02-18 21:19:47.555024: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:271] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


In [11]:
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 5, 24)             144       
                                                                 
 dense_1 (Dense)             (None, 5, 24)             600       
                                                                 
 dense_2 (Dense)             (None, 5, 25)             625       
                                                                 
Total params: 1,369
Trainable params: 1,369
Non-trainable params: 0
_________________________________________________________________


# 3. Build Agent with Keras-RL

In [18]:
pip install keras-rl


Collecting keras-rl
  Using cached keras-rl-0.4.2.tar.gz (40 kB)
  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: keras-rl
  Building wheel for keras-rl (setup.py) ... [?25ldone
[?25h  Created wheel for keras-rl: filename=keras_rl-0.4.2-py3-none-any.whl size=48362 sha256=30b39882ab9e5a2156e25732139c2945008a04dfdde7be168eb9a15a459c718b
  Stored in directory: /Users/sachingiri/Library/Caches/pip/wheels/12/c5/34/a89a10839a1fe8f3e38fe7c9f81faa3f244b55c3b9a04a5b34
Successfully built keras-rl
Installing collected packages: keras-rl
Successfully installed keras-rl-0.4.2
Note: you may need to restart the kernel to use updated packages.


In [19]:
from rl.agents.dqn import DQNAgent
from rl.policy import BoltzmannQPolicy
from rl.memory import SequentialMemory


In [20]:
def build_agent(model, actions):
    policy = BoltzmannQPolicy()
    memory = SequentialMemory(limit=50000, window_length=1)
    dqn = DQNAgent(model=model, memory=memory, policy=policy, 
                  nb_actions=actions, nb_steps_warmup=10, target_model_update=1e-2)
    return dqn

In [23]:
dqn = build_agent(model, actions)
dqn.compile(Adam(lr=1e-3), metrics=['mae'])
dqn.fit(env, nb_steps=50000, visualize=False, verbose=1)

TypeError: Keras symbolic inputs/outputs do not implement `__len__`. You may be trying to pass Keras symbolic inputs/outputs to a TF API that does not register dispatching, preventing Keras from automatically converting the API call to a lambda layer in the Functional Model. This error will also get raised if you try asserting a symbolic input/output directly.

In [54]:
scores = dqn.test(env, nb_episodes=100, visualize=False)
print(np.mean(scores.history['episode_reward']))

Testing for 100 episodes ...
Episode 1: reward: -56.000, steps: 60
Episode 2: reward: -60.000, steps: 60
Episode 3: reward: -50.000, steps: 60
Episode 4: reward: -60.000, steps: 60
Episode 5: reward: -56.000, steps: 60
Episode 6: reward: -52.000, steps: 60
Episode 7: reward: -60.000, steps: 60
Episode 8: reward: -50.000, steps: 60
Episode 9: reward: -52.000, steps: 60
Episode 10: reward: -56.000, steps: 60
Episode 11: reward: -60.000, steps: 60
Episode 12: reward: -60.000, steps: 60
Episode 13: reward: -52.000, steps: 60
Episode 14: reward: -52.000, steps: 60
Episode 15: reward: -58.000, steps: 60
Episode 16: reward: -50.000, steps: 60
Episode 17: reward: -54.000, steps: 60
Episode 18: reward: -58.000, steps: 60
Episode 19: reward: -60.000, steps: 60
Episode 20: reward: -56.000, steps: 60
Episode 21: reward: -56.000, steps: 60
Episode 22: reward: -52.000, steps: 60
Episode 23: reward: -60.000, steps: 60
Episode 24: reward: -56.000, steps: 60
Episode 25: reward: -58.000, steps: 60
Episo

In [22]:
_ = dqn.test(env, nb_episodes=15, visualize=True)

NameError: name 'dqn' is not defined

# 4. Reloading Agent from Memory

In [30]:
dqn.save_weights('dqn_weights.h5f', overwrite=True)

In [31]:
del model
del dqn
del env

In [9]:
env = gym.make('CartPole-v0')
actions = env.action_space.n
states = env.observation_space.shape[0]
model = build_model(states, actions)
dqn = build_agent(model, actions)
dqn.compile(Adam(lr=1e-3), metrics=['mae'])

In [10]:
dqn.load_weights('dqn_weights.h5f')

In [11]:
_ = dqn.test(env, nb_episodes=5, visualize=True)

Testing for 5 episodes ...
Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.
Episode 1: reward: 200.000, steps: 200
Episode 2: reward: 200.000, steps: 200
Episode 3: reward: 200.000, steps: 200
Episode 4: reward: 200.000, steps: 200
Episode 5: reward: 200.000, steps: 200
