In [285]:
from typing import Tuple, List, Optional, Dict
import numpy as np
import dataclasses

In [325]:
#data structure with the information about transitions
@dataclasses.dataclass
class Transition:
    state: Tuple[int, int]
    action: str
    next_state: Tuple[int, int]
    reward: float
    termination: bool

class PacEnv:
    _states: np.array #two-dimensional grid world
    _rewards: np.array #depend on ghosts met and points collected
    _action_semantics: List[str] #readable representation of the actions
    _actions: Dict[str, np.array] #for transitions in the state space
    _init_state: Tuple[int, int] #fixed state in the state space
    _current_state: Tuple[int, int] #current position of the agent
    _ghosts: Optional[List[Tuple[int, int]]] #to be set by user
    _transition_probabilities: np.array #encorage the exploration


    #parameters of the environment which may be set by user
    def __init__(self,
                rows: int,
                cols: int,
                step_cost: float,
                lifes: Optional[int],
                ghosts: Optional[List[Tuple[int, int]]] = None,
                walls: Optional[List[Tuple[int, int]]] = None) -> None:
        #the state space created
        self._states = np.array(np.zeros((rows, cols)), dtype=str)

        walls = [] if walls is None else walls
        
        #"p" stands for "points" which should be eaten by PacMan in order to reach his goal
        #initially every sell which is not the wall, the ghost or the agent includes point to be eaten
        for r in range(0, self._states.shape[0]):
            for c in range(0, self._states.shape[1]):
                self._states[r, c] = 'p'

        #assignings of the walls
        for r, c in walls:
            self._states[r, c] = 1

        #agent considered to get reward 1 for one eaten point and pay penaltie equal to step cost for transitions in empty cells
        self._rewards = (1-step_cost)*np.ones((rows, cols))

        #reward = -100 if the agent is killed by ghost
        for r, c in ghosts:
            self._rewards[r, c] = -100

        #initializing
        self._action_semantics = ['up', 'left', 'down', 'right']
        self._actions = np.array([[-1, 0], [0, -1], [1, 0], [0, 1]])
        self._init_state = (3, 2)
        self._current_state = self._init_state
        self._ghosts = ghosts
        self._step_cost = step_cost
        self._lifes = lifes
        self._copy_lifes = lifes

        #instead of chosen up/down actions agent could be replaced left/right with the probability = 0.1 and vice versa
        self._transition_probabilities = np.array([0.1, 0.8, 0.1])

    #to get list of actions  
    @property
    def actions(self) -> List[str]:
        return self._action_semantics

    #to get current state
    @property
    def current_state(self) -> Tuple[int, int]:
        return self._current_state

    #if agent have already eaten all of the points it will get reward = 100
    @property
    def reward(self) -> float:
        r, c = self._current_state
        return self._rewards[r, c] if self.termination == False else 100

    #checks the state of the environment: if all of the points are eaten, then goal is reached -> termination
    @property
    def termination(self) -> bool:
        grid = np.array(self._states, dtype = str)
        l = list(grid)
        goal_flag = True
        for i in l:
          if 'p' in set(i):
              goal_flag = False
        return goal_flag

    #represents the agent and the ghosts on the instance of the environment
    def render(self) -> None:
        grid = np.array(self._states, dtype = str)
        r, c = self._current_state
        grid[r, c] = 'x'

        for r, c in self._ghosts:
            grid[r, c] = 'g'

        print(grid)

    #manipulations with the indexes to provide correct transitions of the agent inside the environmental boundaries
    #structure of the return of that function provides a lack of possibility to go through walls
    def _transition(self, state: Tuple[int, int], a: np.array) -> Tuple[int, int]:
        n_actions = len(self._actions)
        a = self._actions[a + n_actions if a < 0 else a % n_actions]
        new_r = max(0, min(self._states.shape[0] - 1, state[0] + a[0]))
        new_c = max(0, min(self._states.shape[1] - 1, state[1] + a[1]))
        return (new_r, new_c) if (self._states[new_r, new_c] == 0. or self._states[new_r, new_c] == 'p') else state

    #the main transitional mechanism
    def step(self, action: str) -> Transition:
        a_idx = self._action_semantics.index(action) #get an index given semantics of action

        rnd = np.random.rand()
        chosen_action = a_idx + np.random.choice([-1, 0, 1], p=self._transition_probabilities) #exploitation/exploration trade-off
        prev_state = self._current_state 
        r, c = self._current_state
        #The PacMan game's agent usually has three lifes (may be set by user in my implementation) each of which may be lost by ghost
        if self._rewards[r, c] == -100:
            self._lifes -= 1
        #step itself
        self._current_state = self._transition(self._current_state, chosen_action)
        #once the cell has been visited it has no points to be eaten anymore
        self._states[r, c] = 0.
        #in the future in such cells agent will not get a reward for eating a points, only step cost for transition
        self._rewards[r, c] = -self._step_cost
        #the result of the transition
        return Transition(state = prev_state,
                          action = action,
                          next_state = self._current_state,
                          reward = self.reward,
                          termination = self.termination)
    
    #there are two modes for reseting:
    #1)if the agent lost all its lifes then: agent in the initial state, all of the points aren't eaten, agent has all lifes again
    #2)if the agent is still having lifes after reset it appears to be in the initial state, but eaten previously points are still eaten 
    def reset(self) -> None:
        if self._lifes <= 0:
          self._current_state = self._init_state
          for r in range(0, self._states.shape[0]):
              for c in range(0, self._states.shape[1]):
                  if (self._states[r, c] == "0.0") or (self._states[r, c] == 'x'):
                      self._states[r, c] = 'p'
          self._lifes = self._copy_lifes
        else:
          self._current_state = self._init_state 

    #these two functions are used for the construction of Q-table
    def state_space_size(self) -> int:
        return np.prod(list(self._states.shape))
    
    def action_space_size(self) -> int:
        return len(self._actions)

In [319]:
#initial situation
my_env = PacEnv(rows=6, cols=6, step_cost = 0.04, lifes = 0, ghosts=[(4, 0), (1, 4)], walls=[(1,1), (2,1), (1,2), (4,4), (3,4), (4,3), (2,4)])
my_env.render()
print(my_env.current_state)
print(my_env.reward)
print(my_env.termination)
print(my_env.actions)


[['p' 'p' 'p' 'p' 'p' 'p']
 ['p' '1' '1' 'p' 'g' 'p']
 ['p' '1' 'p' 'p' '1' 'p']
 ['p' 'p' 'x' 'p' '1' 'p']
 ['g' 'p' 'p' '1' '1' 'p']
 ['p' 'p' 'p' 'p' 'p' 'p']]
(3, 2)
0.96
False
['up', 'left', 'down', 'right']


In [320]:
#test where I created an instance of the environment and executed a sequence of the 5th random actions
my_env.reset()
rnd = np.random.rand()
for i in range(0, 5):
    a = np.random.choice(["up", "down", "right", "left"], p=[0.25, 0.25, 0.25, 0.25])   
    print(a)
    print(my_env.step(a))
    my_env.render()
my_env.reset()
my_env.render()

down
Transition(state=(3, 2), action='down', next_state=(4, 2), reward=0.96, termination=False)
[['p' 'p' 'p' 'p' 'p' 'p']
 ['p' '1' '1' 'p' 'g' 'p']
 ['p' '1' 'p' 'p' '1' 'p']
 ['p' 'p' '0.0' 'p' '1' 'p']
 ['g' 'p' 'x' '1' '1' 'p']
 ['p' 'p' 'p' 'p' 'p' 'p']]
left
Transition(state=(4, 2), action='left', next_state=(4, 1), reward=0.96, termination=False)
[['p' 'p' 'p' 'p' 'p' 'p']
 ['p' '1' '1' 'p' 'g' 'p']
 ['p' '1' 'p' 'p' '1' 'p']
 ['p' 'p' '0.0' 'p' '1' 'p']
 ['g' 'x' '0.0' '1' '1' 'p']
 ['p' 'p' 'p' 'p' 'p' 'p']]
left
Transition(state=(4, 1), action='left', next_state=(4, 0), reward=-100.0, termination=False)
[['p' 'p' 'p' 'p' 'p' 'p']
 ['p' '1' '1' 'p' 'g' 'p']
 ['p' '1' 'p' 'p' '1' 'p']
 ['p' 'p' '0.0' 'p' '1' 'p']
 ['g' '0.0' '0.0' '1' '1' 'p']
 ['p' 'p' 'p' 'p' 'p' 'p']]
right
Transition(state=(4, 0), action='right', next_state=(4, 0), reward=-0.04, termination=False)
[['p' 'p' 'p' 'p' 'p' 'p']
 ['p' '1' '1' 'p' 'g' 'p']
 ['p' '1' 'p' 'p' '1' 'p']
 ['p' 'p' '0.0' 'p' '1' 'p']


## **Policy iteration by Q-learning algorithm**


In [327]:
#list of the parameters and constants
gamma = 1
epsilon = 0.1
alpha = 0.01
state_space_size = my_env.state_space_size()
action_space_size = my_env.action_space_size()
actions = my_env.actions
n_episodes = 1000
episode_size = 200
visited = {}

#action-value function to be updated
q: Dict[int, List[float]] = {i: [0. for a in actions] for i in range(state_space_size)}

#dict of actions which are taken by epsilon-greedy policy
def policy(epsilon: float, actions: List[str], episode: int) -> Dict[int, str]:
    if episode == n_episodes - 1:
        epsilon = 0.0
    pi = {i: actions[np.argmax(q[i])] if np.random.rand() >= epsilon 
          else np.random.choice(actions, p=[0.25, 0.25, 0.25, 0.25]) for i in q.keys()}
    return pi

#implementational cycle
for episode in range(n_episodes):
    if episode%100==0:
      print(f'Episode {episode}')

    #get a set of actions for this episode    
    my_env.reset()
    pi = policy(epsilon, actions, episode)
    
    for stage in range(episode_size):
        #check for visited states
        s = my_env.current_state
        if s in visited:
            s = visited[s]
        else:
            visited[s] = len(visited)
            s = visited[s]
        #step
        action = pi[s]
        transition = my_env.step(action)
        a_idx = actions.index(action)

        next_s = transition.next_state
        if next_s in visited:
            next_s = visited[next_s]
        else:
            visited[next_s] = len(visited)
            next_s = visited[next_s]
        #updating
        if transition.termination:
            q[s][a_idx] += alpha*(transition.reward - q[s][a_idx])
            #interrupting of the cycle once the termination occasioned
            break
        else:
            q[s][a_idx] += + alpha*(transition.reward + gamma * np.max(q[next_state]) - q[s][a_idx])


Episode 0
Episode 100
Episode 200


KeyboardInterrupt: ignored