In [2]:
import numpy as np
import random

We set a default grid that is the question's grid. different grid pattern can be used in case of need.

In [3]:
defaultGrid = [['A','D','D','D','D','D','D','D','D'],
               ['D','W','W','W','D','W','W','W','D'],
               ['D','W','D','D','D','D','D','W','D'],
               ['D','D','D','W','E','W','D','D','D'],
               ['D','W','D','W','E','W','D','W','D'],
               ['D','W','D','D','W','D','D','W','D'],
               ['D','D','D','D','D','D','D','D','D']]
prev_state = defaultGrid

**The Environment**
---
Pacman env consists of 5 methods:

1. init:
> initialization of class's variables such as agents' configuration and number of total dots in the grid.
2. valid_action_space:
> every 4 possible actions are not always valid. this functions excludes movements that places the agent outside the grid (less that 0 and more that max number of total columns and rows) and those movements that lead to a wall which is indicated as 'W' in the grid.
3. count_dots:
> counts the dots. when equal to 0, the game is over.
4. step:
> checks if the game is over and takes an action (up, down, left, right) as an attribute transforms to the next state:
*   places the agent 'A' based on current (x, y) and after applyting the direction
*   set the prev place as 'E'
*   set the reward: if there are any dots 'D' in the next square agent will recieve a +1 reward and also dots count will increase - if the next square is empty the agents receives no reward (+0), if agent reaches a ghost 'G' it will be given a -1 reward.
5. reset: reset all parameters so that the agent can start training again.

In [4]:
class PacmanEnv():
    def __init__(self, agent_x = 0, agent_y = 0, g = defaultGrid):
        self.action_space = ['U', 'D', 'L', 'R']
        self.n_action_space = len(self.action_space)

        self.state = g
        self.prev_state = g
        self.x = agent_x
        self.y = agent_y
        self.originalGird, self.originalX, self.originalY = g, agent_x, agent_y

        self.state_observation = [self.x, self.y]
        self.max_r = len(self.state) - 1
        self.max_c = len(self.state[0]) - 1

        self.n_dots = self.count_dots()
        self.done = False

    def valid_action_space(self):
        valid_actions = []
        for action in self.action_space:
            if action == 'U' and self.x != 0 and self.state[self.x - 1][self.y] != 'W':
                valid_actions.append(action)
            elif action == 'D' and self.x != self.max_r and self.state[self.x + 1][self.y] != 'W':
                valid_actions.append(action)
            elif action == 'L' and self.y != 0 and self.state[self.x][self.y - 1] != 'W':
                valid_actions.append(action)
            elif action == 'R' and self.y != self.max_c and self.state[self.x][self.y + 1] != 'W':
                valid_actions.append(action)
        return valid_actions

    def count_dots(self):
        n = 0
        for i in range (self.max_r):
            for j in range (self.max_c):
                if self.state[i][j] == 'D':
                    n += 1
        return n

    def step(self, action):
        self.state[self.x][self.y] = 'E'
        reward = 0

        if action == 'U':
            if self.state[self.x - 1][self.y] == 'G':
                reward -= 1
                self.done = True
            elif self.state[self.x - 1][self.y] == 'D':
                reward += 1
                self.n_dots -= 1
            self.x -= 1
            self.state[self.x][self.y] = 'A'

        elif action == 'D':
            if self.state[self.x + 1][self.y] == 'G':
                reward -= 1
            elif self.state[self.x + 1][self.y] == 'D':
                reward += 1
                self.n_dots -= 1
            self.x += 1
            self.state[self.x][self.y] = 'A'

        elif action == 'L':
            if self.state[self.x][self.y - 1] == 'G':
                reward -= 1
            elif self.state[self.x][self.y - 1] == 'D':
                reward += 1
                self.n_dots -= 1
            self.y -= 1
            self.state[self.x][self.y] = 'A'

        elif action == 'R':
            if self.state[self.x][self.y + 1] == 'G':
                reward -= 1
            elif self.state[self.x][self.y + 1] == 'D':
                reward += 1
                self.n_dots -= 1
            self.y += 1
            self.state[self.x][self.y] = 'A'

        done = True if self.n_dots <= 0 else False
        return self.state, reward, done

    def reset(self):
        self.done = False
        self.state = defaultGrid
        self.prev_state = defaultGrid
        self.x = 0
        self.y = 0
        self.n_dots = self.count_dots()
        self.state_observation = [self.x, self.y]
        return self.state

**Q agent**
---
primarily makes Q-table. the Q-table is a 2D dictionary which the first key is a tuple of (state, agent_x, agent_y) and the second key is "action" and the value is the Q-val. this structures ensured the uniqueness of the table but the rows are obtained as the model learns. The Q-table looks something like this for a hypothetical 3x3 grid:

> qtable = {('ADW;DDD;DWE', 0, 0):{'U': 0, 'R': 0, 'L':0.5, 'R', 0.6}


1. init:
> initialization of class's variables.
2. updateTable:
> adds the new tuple along with action and corresponding q-val to the table. if the key does not exist, the state and all 4 actions are initialized and if the key already exists, it will be updated for the specific action key.
3. getKeyTup:
> together with "encode" and "getAgentPos" methods, form the tuple to act as key in qtable dict.
4. encode:
> encode the current grid matrix a.k.a state to a string of form: ow1; row2; row3; ...
5. decode:
> turn the string_state back to the 2D array form.
6. getArgMax:
> for a state, the possible actions are calculated and are stored in "directions". all valid actions which depends on current state are converted to the valid input form for the qtable (also those that the agent has not seen yet are added to the table and the value is initialized as 0). the method returns the max q-value.
7. getAgentPos:
> searches the grid to find the agent's position.
8. getQval:
> given the state and the action, this function returns the Q-value using the dict structure. first the state is converted to the tuple format and then the value is obtained: val = qtable[(state, x, y)][action]
9. printQtable

In [5]:
class AgentQ():
    def __init__(self, grid):
        self.qtable = {}
        self.max_r = len(grid) - 1
        self.max_c = len(grid[0]) - 1

    def updateTable(self, state, action, qval):
        x, y = self.getAgentPos(state)
        stateTup = self.getKeyTup(state, x, y)

        if stateTup not in self.qtable.keys():
            self.qtable[stateTup] = {'U':0, 'D':0, 'L':0, 'R':0}
        self.qtable[stateTup].update({action : qval})

    def getKeyTup(self, state, x, y):
        state_str = self.encode(state)
        return (state_str, x, y)


    def encode(self, state):
        s = ""
        n = len(state)
        m = len(state[0])
        for i in range(0, n):
            for j in range(0, m):
                s += state[i][j]
            s += ';'
        return s

    def decode(self, str_state):
        row = []
        s = []
        for i in str_state:
            if i == ';':
                s.append(row)
                row = []
            else:
                row.append(i)
        return s

    def getArgMax(self, state, directions):
        mx = ""
        mx_val = -1000000
        n = state
        x, y = self.getAgentPos(state)
        for dir in directions:
            tup = ()
            tmp = 0
            if dir == 'U':
                tup = self.getKeyTup(state, x - 1, y)
            elif dir == 'D':
                tup = self.getKeyTup(state, x + 1, y)
            elif dir == 'R':
                tup = self.getKeyTup(state, x, y + 1)
            elif dir == 'L':
                tup = self.getKeyTup(state, x, y - 1)

            if tup not in self.qtable.keys():
                self.qtable[tup] = {'U':0, 'D':0, 'L':0, 'R':0}
                tmp = 0
            else:
                tmp = self.qtable[tup][dir]

            if tmp >= mx_val:
                mx_val = tmp
                mx = dir
        return [mx, mx_val]


    def getAgentPos(self, state):
        n = len(state)
        m = len(state[0])
        for i in range(n):
            for j in range(m):
                if state[i][j] == 'A':
                    return i, j;

    def getQval(self, state, action):
        x, y = self.getAgentPos(state)
        tup = self.getKeyTup(state, x, y)
        if tup not in self.qtable.keys() and x >= 0 and y >= 0:
            self.qtable[tup] = {'U':0, 'D':0, 'L':0, 'R':0}
            return 0
        else:
            return self.qtable[tup][action]

    def printQtable(self):
        for state in self.qtable.keys():
            print(state)
            print(self.qtable[state])

**Training**
--
**gamma**: discounting rate, it make the future values less important than current state

**alpha**: learning rate

**epsilon**: treshold to exploration rate

*for* **loop**

"exp_exp_tradeoff" desides if the agent should choose the next action randomly or make the best decision based on q-values. the action is chosen and env.step executeed the action. now we are in the new state and qval is calculated using bellman equation. the value in the table is updated andwe proceed to the next state till we collect all dots.

In [6]:
env = PacmanEnv()
action_space_size = env.n_action_space
qtable = AgentQ(env.state)

total_episodes = 1
alpha = 0.8
max_steps = 2                # Max steps per episode
gamma = 0.95                  # Discounting rate

# Exploration parameters
epsilon = 1.0                 # Exploration rate
max_epsilon = 1.0             # Exploration probability at start
min_epsilon = 0.01            # Minimum exploration probability
decay_rate = 0.005

rewards = []

# 2 For life or until learning is stopped
for episode in range(total_episodes):
    # Reset the environment
    old_state = env.reset()
    done = False
    total_rewards = 0
    step = 0

    while not done:

        step += 1
        valid_actions = env.valid_action_space()

        exp_exp_tradeoff = random.uniform(0, 1)

        if exp_exp_tradeoff > epsilon:
            action = qtable.getArgMax(old_state, valid_actions)[0]
        else:
            action = valid_actions[random.randint(0, len(valid_actions) - 1)]


        new_state, reward, done = env.step(action)

        print("state:")
        print(prev_state)

        print("valid actions")
        print(valid_actions)

        print("action")
        print(action)

        print("reward")
        print(reward)

        print("new state:")
        print(new_state)

        valid_actions_new = env.valid_action_space()

        print("valid  newactions")
        print(valid_actions_new)


        qval = (1 - alpha) * qtable.getQval(old_state, action) + alpha * (reward + gamma * qtable.getArgMax(new_state, valid_actions_new)[1])

        print("qval")
        print(qval)
        qtable.updateTable(old_state, action, qval)
        print("qtable")
        qtable.printQtable()

        total_rewards += reward

        # Our new state is state
        old_state = new_state

    # Reduce epsilon (because we need less and less exploration)
    epsilon = min_epsilon + (max_epsilon - min_epsilon) * np.exp(-decay_rate * episode)
    rewards.append(total_rewards)


state:
[['E', 'A', 'D', 'D', 'D', 'D', 'D', 'D', 'D'], ['D', 'W', 'W', 'W', 'D', 'W', 'W', 'W', 'D'], ['D', 'W', 'D', 'D', 'D', 'D', 'D', 'W', 'D'], ['D', 'D', 'D', 'W', 'E', 'W', 'D', 'D', 'D'], ['D', 'W', 'D', 'W', 'E', 'W', 'D', 'W', 'D'], ['D', 'W', 'D', 'D', 'W', 'D', 'D', 'W', 'D'], ['D', 'D', 'D', 'D', 'D', 'D', 'D', 'D', 'D']]
valid actions
['D', 'R']
action
R
reward
1
new state:
[['E', 'A', 'D', 'D', 'D', 'D', 'D', 'D', 'D'], ['D', 'W', 'W', 'W', 'D', 'W', 'W', 'W', 'D'], ['D', 'W', 'D', 'D', 'D', 'D', 'D', 'W', 'D'], ['D', 'D', 'D', 'W', 'E', 'W', 'D', 'D', 'D'], ['D', 'W', 'D', 'W', 'E', 'W', 'D', 'W', 'D'], ['D', 'W', 'D', 'D', 'W', 'D', 'D', 'W', 'D'], ['D', 'D', 'D', 'D', 'D', 'D', 'D', 'D', 'D']]
valid  newactions
['L', 'R']
qval
0.8
qtable
('EADDDDDDD;DWWWDWWWD;DWDDDDDWD;DDDWEWDDD;DWDWEWDWD;DWDDWDDWD;DDDDDDDDD;', 0, 1)
{'U': 0, 'D': 0, 'L': 0, 'R': 0.8}
('EADDDDDDD;DWWWDWWWD;DWDDDDDWD;DDDWEWDDD;DWDWEWDWD;DWDDWDDWD;DDDDDDDDD;', 0, 0)
{'U': 0, 'D': 0, 'L': 0, 'R': 0}
('EA

In [7]:
qtable.printQtable()
print ("Score over time: " +  str(sum(rewards)/total_episodes))
print(rewards)

('EADDDDDDD;DWWWDWWWD;DWDDDDDWD;DDDWEWDDD;DWDWEWDWD;DWDDWDDWD;DDDDDDDDD;', 0, 1)
{'U': 0, 'D': 0, 'L': 0, 'R': 0.8}
('EADDDDDDD;DWWWDWWWD;DWDDDDDWD;DDDWEWDDD;DWDWEWDWD;DWDDWDDWD;DDDDDDDDD;', 0, 0)
{'U': 0, 'D': 0, 'L': 0, 'R': 0}
('EADDDDDDD;DWWWDWWWD;DWDDDDDWD;DDDWEWDDD;DWDWEWDWD;DWDDWDDWD;DDDDDDDDD;', 0, 2)
{'U': 0, 'D': 0, 'L': 0, 'R': 0}
('EEADDDDDD;DWWWDWWWD;DWDDDDDWD;DDDWEWDDD;DWDWEWDWD;DWDDWDDWD;DDDDDDDDD;', 0, 2)
{'U': 0, 'D': 0, 'L': 0, 'R': 0.8}
('EEADDDDDD;DWWWDWWWD;DWDDDDDWD;DDDWEWDDD;DWDWEWDWD;DWDDWDDWD;DDDDDDDDD;', 0, 1)
{'U': 0, 'D': 0, 'L': 0, 'R': 0}
('EEADDDDDD;DWWWDWWWD;DWDDDDDWD;DDDWEWDDD;DWDWEWDWD;DWDDWDDWD;DDDDDDDDD;', 0, 3)
{'U': 0, 'D': 0, 'L': 0, 'R': 0}
('EAEDDDDDD;DWWWDWWWD;DWDDDDDWD;DDDWEWDDD;DWDWEWDWD;DWDDWDDWD;DDDDDDDDD;', 0, 1)
{'U': 0, 'D': 0, 'L': 0.0, 'R': 0}
('EAEDDDDDD;DWWWDWWWD;DWDDDDDWD;DDDWEWDDD;DWDWEWDWD;DWDDWDDWD;DDDDDDDDD;', 0, 0)
{'U': 0, 'D': 0, 'L': 0, 'R': 0}
('EAEDDDDDD;DWWWDWWWD;DWDDDDDWD;DDDWEWDDD;DWDWEWDWD;DWDDWDDWD;DDDDDDDDD;', 0, 2)
{