In [51]:
!pip 

^C


In [30]:
# import gymnasium as gym
# from gymnasium import spaces
import gym
from gym import spaces

import numpy as np
from math import sqrt
import cv2
import time

from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env

In [100]:
class PacmanEnv(gym.Env):
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 4}

    def __init__(self, render_mode=None, grid_size=[50, 50]):
        super(PacmanEnv, self).__init__()
        self.grid_size = grid_size
        
        # Define action and observation space
        # They must be gym.spaces objects
        # Example when using discrete actions:
        
        # [agent_x, agent_y, pellet_x, pellet_y, prev_agent_action] + 
        #     [left_wall_dist, right_wall_dist, top_wall_dist, bottom_wall_dist]
        self.observation_space = spaces.MultiDiscrete([
                self.grid_size[0], # agent_x
                self.grid_size[1], # agent_y
                self.grid_size[0], # pellet_x
                self.grid_size[1], # pellet_y
                4, #  prev_agent_action
                self.grid_size[0], # left_wall_dist
                self.grid_size[0], # right_wall_dist
                self.grid_size[1], # top_wall_dist
                self.grid_size[1], # bottom_wall_dist
            ])

        # We have 4 actions, corresponding to "left", "up", "right", "down"
        self.action_space = spaces.Discrete(4)
        
    def reset(self):
        self.grid = np.zeros(self.grid_size)
        # set wall/obstacle to 1
        self.grid[0, :] = 1
        self.grid[-1, :] = 1
        self.grid[:, -1] = 1
        self.grid[:, 0] = 1
        # the position of pacman and pellet will not be mapped on the grid.
        # instead, we will keep track of it independently
        # initiate pacman and pellet position
        self.pacman_position = [5, 5] #[np.random.randint(self.grid_size[0]), np.random.randint(self.grid_size[1])]
        self.pellet_position = [np.random.randint(1, self.grid_size[0]-1), np.random.randint(1, self.grid_size[1]-1)]
        self.score = 0
        self.prev_reward = 0
        self.action = 0
        
        self.done = False

        agent_x = self.pacman_position[0]
        agent_y = self.pacman_position[1]
        pellet_x = self.pellet_position[0]
        pellet_y = self.pellet_position[1]
        self.prev_action = 0
        left_wall_dist = agent_x - 1
        right_wall_dist = (self.grid_size[0]-2) - agent_x
        top_wall_dist = agent_y - 1
        bottom_wall_dist = (self.grid_size[1]-2) - agent_y

        # create observation:
        observation = [agent_x, agent_y, pellet_x, pellet_y, self.prev_action, 
                           left_wall_dist, right_wall_dist, top_wall_dist, bottom_wall_dist]
        observation = np.array(observation)

        return observation
    
    def step(self, action):
        if action == 0:
            self.pacman_position[0] -= 1
        elif action == 1:
            self.pacman_position[1] -= 1
        elif action == 2:
            self.pacman_position[0] += 1
        elif action == 3:
            self.pacman_position[1] += 1 
        
        # On collision kill the snake and print the score
        if self.grid[self.pacman_position[0], self.pacman_position[1]] == 1:
#             print('hit a wall')
            self.done = True
            self.reward = -10
        
        # reward is based on the distance between pacman and pellet
        grid_hypothenus = sqrt(self.grid_size[0]**2 + self.grid_size[1]**2)
        pacman_pellet_distance = sqrt(abs(self.pacman_position[0] - self.pellet_position[0])**2 + (self.pacman_position[1] - self.pellet_position[1])**2)
        self.total_reward = 1 - (pacman_pellet_distance/grid_hypothenus)
        self.reward = self.total_reward - self.prev_reward
        self.prev_reward = self.total_reward
            
        if self.pacman_position == self.pellet_position:
#             print('found pellet')
            self.reward = 5
            self.done = True

        agent_x = self.pacman_position[0]
        agent_y = self.pacman_position[1]
        pellet_x = self.pellet_position[0]
        pellet_y = self.pellet_position[1]
        self.prev_actions = action
        left_wall_dist = agent_x - 1
        right_wall_dist = (self.grid_size[0]-2) - agent_x
        top_wall_dist = agent_y - 1
        bottom_wall_dist = (self.grid_size[1]-2) - agent_y

        observation = [agent_x, agent_y, pellet_x, pellet_y, self.prev_action, 
                           left_wall_dist, right_wall_dist, top_wall_dist, bottom_wall_dist]
        observation = np.array(observation)
        
        info = {}
        return observation, self.reward, self.done, info
    
    def render(self, step_count):
        game_visual = np.zeros((self.grid_size[0], self.grid_size[1], 3), np.uint8)
        wall_pos = np.where(self.grid==1)
        for index, _ in enumerate(wall_pos[0]):
            game_visual[wall_pos[0][index], wall_pos[1][index]] = (89, 28, 17) 
        
        if self.pacman_position == self.pellet_position:
            game_visual[self.pellet_position[0], self.pellet_position[1]] = (0, 255, 0) 
#             game_visual = cv2.putText(game_visual, f'1111', (-5, 6), cv2.FONT_HERSHEY_SIMPLEX, 
#                    1, (255,255,255), 2, cv2.LINE_AA)
        else:
            game_visual[self.pacman_position[0], self.pacman_position[1]] = (255, 255, 255) 
            game_visual[self.pellet_position[0], self.pellet_position[1]] = (0, 0, 255) 
            
        game_visual = game_visual.repeat(10, axis=0).repeat(2, axis=1)
        cv2.namedWindow("Pacman Simplified", cv2.WINDOW_NORMAL)
        cv2.resizeWindow("Pacman Simplified", 500, 500)
        cv2.imshow('Pacman Simplified', game_visual)
        cv2.waitKey(1)
    
    def close(self):
        cv2.destroyAllWindows()
        print('done')
        pass

In [101]:
env = PacmanEnv(grid_size=[10, 10])
check_env(env)

In [102]:
model = PPO('MlpPolicy', env, verbose=1, tensorboard_log='logs')
model.learn(total_timesteps=40000, tb_log_name='PPO')
# model.save()

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Logging to logs\PPO_2
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 19.4     |
|    ep_rew_mean     | 1.83     |
| time/              |          |
|    fps             | 1527     |
|    iterations      | 1        |
|    time_elapsed    | 1        |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 21.7        |
|    ep_rew_mean          | 1.92        |
| time/                   |             |
|    fps                  | 1074        |
|    iterations           | 2           |
|    time_elapsed         | 3           |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.008365192 |
|    clip_fraction        | 0.0573      |
|    clip_range           | 0.2         

----------------------------------------
| rollout/                |            |
|    ep_len_mean          | 9.57       |
|    ep_rew_mean          | 5.66       |
| time/                   |            |
|    fps                  | 867        |
|    iterations           | 11         |
|    time_elapsed         | 25         |
|    total_timesteps      | 22528      |
| train/                  |            |
|    approx_kl            | 0.01855303 |
|    clip_fraction        | 0.211      |
|    clip_range           | 0.2        |
|    entropy_loss         | -1.03      |
|    explained_variance   | 0.165      |
|    learning_rate        | 0.0003     |
|    loss                 | 0.659      |
|    n_updates            | 100        |
|    policy_gradient_loss | -0.0239    |
|    value_loss           | 0.881      |
----------------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 8.94        |
|    ep_rew_m

<stable_baselines3.ppo.ppo.PPO at 0x1f4385b6dc0>

In [104]:
episodes = 100
for eps in range(episodes):
    observation = env.reset()
    done = False
    step_count = 0
    env.render(step_count)
    while not done:
#         action = env.action_space.sample()
        action = model.predict(observation)[0]
        observation, reward, done, info = env.step(action)
        env.render(step_count)
        time.sleep(0.1)
#         print(f'total reward: {env.total_reward}')
        step_count+=1
    print(step_count)
    time.sleep(2)
env.close()

7
4
10
3
5
3
1
4
4
5
8
4
5
4
2
5
4
5
6


KeyboardInterrupt: 

In [210]:
observation = env.reset()
episodes = 10
for eps in range(episodes):
    action = env.action_space.sample()  # agent policy that uses the observation and info
    observation, reward, done, info = env.step(action)
    time.sleep(0.1)
    print(f'total reward: {env.total_reward}')
    if done:
        break

print(env.reward)
env.close()

total reward: 0.7298148782778742
total reward: 0.72
total reward: 0.7298148782778742
total reward: 0.72
total reward: 0.7298148782778742
total reward: 0.719286623047636
total reward: 0.7098276374290617
total reward: 0.6993340724325419
total reward: 0.6898387516145836
total reward: 0.7
0.010161248385416322
done


In [14]:
env.pellet_position == [3, 4]

True

In [13]:
env.pacman_position

[6, 4]