In [1]:
import numpy as np
import random
from collections import defaultdict
import math
import gym


class PacManRL:
    def __init__(self, num_episodes=1000, gamma=0.99, alpha=0.5, epsilon=1.0, epsilon_decay=0.01, epsilon_min=0.01):
        self.q_table = {}
        self.num_episodes = num_episodes
        self.gamma = gamma
        self.alpha = alpha
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        # Define the action space and initialize the Q-table
        self.actions = ['up', 'down', 'left', 'right']
        self.q_table = defaultdict(lambda: np.zeros(len(self.actions)))

        # Define features for state representation
        self.food_left = None
        self.food_eaten = None

    def reset(self):
        self.food_left = None
        self.food_eaten = None
        
        
    def get_state(self, game_state):
        # Extract relevant information from the game state to represent it as a state
        if isinstance(game_state, dict):
            pacman_pos = game_state.get('pacman_pos', ())
            food_pos = tuple(map(tuple, np.argwhere(game_state['food'])))
            ghost_pos = tuple(map(tuple, game_state['ghost_pos']))
        else:
            pacman_pos = ()
            food_pos = ()
            ghost_pos = ()

        # Compute the remaining food left
        if self.food_left is None:
            self.food_left = len(food_pos)
        else:
            self.food_left -= self.food_eaten

        # Compute the distance to the nearest food and the nearest ghost
        dist_to_food = np.inf
        for pos in food_pos:
            dist = np.linalg.norm(np.array(pacman_pos) - np.array(pos))
            if dist < dist_to_food:
                dist_to_food = dist

        dist_to_ghost = np.inf
        for pos in ghost_pos:
            dist = np.linalg.norm(np.array(pacman_pos) - np.array(pos))
            if dist < dist_to_ghost:
                dist_to_ghost = dist

        # Compute the new features
        self.food_eaten = self.food_left - len(food_pos)
        features = {
            'bias': 1.0,
            'food_distance': dist_to_food,
            'ghost_distance': dist_to_ghost
        }

        return {'pacman_pos': pacman_pos, 'food_pos': food_pos, 'ghost_pos': ghost_pos}



#     def get_state(self, game_state):
#         # Extract relevant information from the game state to represent it as a state
#         pacman_pos = tuple(game_state['pacman_pos'])
#         food_pos = tuple(map(tuple, np.argwhere(game_state['food'])))
#         ghost_pos = tuple(map(tuple, game_state['ghost_pos']))

#         # Compute the remaining food left
#         if self.food_left is None:
#             self.food_left = len(food_pos)
#         else:
#             self.food_left -= self.food_eaten

#         # Compute the distance to the nearest food and the nearest ghost
#         dist_to_food = np.inf
#         for pos in food_pos:
#             dist = np.linalg.norm(np.array(pacman_pos) - np.array(pos))
#             if dist < dist_to_food:
#                 dist_to_food = dist

#         dist_to_ghost = np.inf
#         for pos in ghost_pos:
#             dist = np.linalg.norm(np.array(pacman_pos) - np.array(pos))
#             if dist < dist_to_ghost:
#                 dist_to_ghost = dist

#         # Compute the new features
#         self.food_eaten = self.food_left - len(food_pos)
#         features = {
#             'bias': 1.0,
#             'food_distance': dist_to_food,
#             'ghost_distance': dist_to_ghost
#         }

#         pacman_pos = tuple(game_state.get('pacman_pos', ()))
#         food_pos = tuple(map(tuple, np.argwhere(game_state['food'])))
#         ghost_pos = tuple(map(tuple, game_state['ghost_pos']))
#         return {'pacman_pos': pacman_pos, 'food_pos': food_pos, 'ghost_pos': ghost_pos}




    def choose_action(self, state, epsilon):
        # Choose an action using an epsilon-greedy policy
        if np.random.uniform(0, 1) < epsilon:
            action = np.random.choice(self.actions)
        else:
            q_values = self.q_table.get(state, {a: 0 for a in self.actions})
            max_q_value = max(q_values.values())
            actions_with_max_q_value = [a for a, q in q_values.items() if q == max_q_value]
            action = np.random.choice(actions_with_max_q_value)
        return action




    def update_q(self, state, action, next_state, reward, alpha, gamma):
        # Update Q-value for the given state-action pair
        td_target = reward + gamma * np.max(self.q_table[next_state])
        td_error = td_target - self.q_table[state][self.actions.index(action)]
        self.q_table[state][self.actions.index(action)] += alpha * td_error

    def run_episode(self, env):
        # Reset the environment and initialize the state
        obs = env.reset()
        state = self.get_state(obs)

        done = False
        total_reward = 0

        while not done:
            # Choose an action and take a step in the environment
            action = self.choose_action(state, self.epsilon)
            obs, reward, done = env.step(action)
            total_reward += reward

            # Update the state and Q-table
            next_state = self.get_state(obs)
            self.update_q(state, action, next_state, reward, self.alpha, self.discount_factor)

            state = next_state

        return total_reward

    def train(self, env):
        # Initialize the Q-table
        self.q_table = {}

        for i in range(self.num_episodes):
            obs = env.reset()
            state = tuple(self.get_state(obs).items())
            done = False
            total_reward = 0
            epsilon = self.get_epsilon(i)

            while not done:
                # Choose an action and take a step in the environment
                action = self.choose_action(state, epsilon)
                next_obs, reward, done = env.step(action)
                total_reward += reward

                # Update the Q-table
                next_state = tuple(self.get_state(next_obs).items())
                self.update_q_table(state, action, reward, next_state)
                state = next_state

            # Update the learning rate and epsilon
            self.lr = self.lr * self.lr_decay
            self.eps = self.eps * self.eps_decay

        return self.q_table
    
    
    def get_epsilon(self, episode):
        return max(self.epsilon_min, min(self.epsilon, 1.0 - math.log10((episode + 1) * self.epsilon_decay)))


    
    
    
    
    



In [2]:
# import urllib.request
# urllib.request.urlretrieve('http://www.atarimania.com/roms/Roms.rar','Roms.rar')
# !pip install unrar
# !unrar x Roms.rar
# !mkdir rars
# !mv HC\ ROMS.zip   rars
# !mv ROMS.zip  rars
# !python -m atari_py.import_roms rars

In [3]:
# print('done')

In [4]:
# !python -m atari_py.import_roms /path/to/roms/


In [5]:
# Create the environment
# !python -m atari_py.import_roms Roms
# env = gym.make("MsPacman-v4")
# rom_path = "/path/to/roms/ms_pacman.bin" 
# env = gym.make("ALE/MsPacman-v5", rom_path=rom_path)

# env = gym.make("ALE/MsPacman-v5")
env = gym.make("MsPacman-v5")

# Create the agent
pacman_rl = PacManRL(num_episodes=1000, epsilon_min=0.01)


# Train the agent
q_table = pacman_rl.train(env)

# Test the agent
obs = env.reset()
done = False
total_reward = 0
while not done:
    state = pacman_rl.get_state(obs)
    action = np.argmax(q_table[state])
    obs, reward, done, _ = env.step(action)
    total_reward += reward

print(f"Total reward: {total_reward}")


VersionNotFound: Environment version `v5` for environment `MsPacman` doesn't exist. It provides versioned environments: [ `v0`, `v4` ].