In [None]:
import gymnasium as gym
import numpy as np
import ipywidgets as widgets
import sys

sys.path.append('../')
import support_modules as sm

# Pacman

## Description

<div style="text-align: justify">    
A classic arcade game. Move Pac Man around a maze collecting food and avoiding ghosts- unless you eat a Power Pellet, then you can eat the ghosts too!
</div>

https://gymnasium.farama.org/environments/atari/pacman/

# Random policy

## Single episode

In [None]:
env = gym.make('ALE/Pacman-v5', render_mode='rgb_array')
state, _ = env.reset()  
ep_reward = 0
done = False

while not done:
    action = env.action_space.sample()
    state, reward, terminated, truncated, info = env.step(action)
    ep_reward += reward
    env.render()
    
    done = sm.evaluate_done(terminated,truncated)

env.close()

## Exploratory 1000 episodes

In [None]:
env = gym.make('ALE/Pacman-v5',render_mode='rgb_array')

rewards = list()
success = list()

for episode in range(1000):
    state, _ = env.reset()
    ep_reward = 0
    done = False

    while not done:
        action = env.action_space.sample()
        state, reward, terminated, truncated, info = env.step(action)
        ep_reward += reward
        
        done = sm.evaluate_done(terminated,truncated)
    
    rewards.append(ep_reward)
    success.append(terminated)

env.close()
print(f'Average reward: {sum(rewards)/len(rewards)}')

# [BETA] Deep Q-Learning (DQN)

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
from collections import deque
import random

class DQNAgent:
    def __init__(self, state_shape, action_size, replay_buffer_size=10000, batch_size=32,
                 gamma=0.99, epsilon_start=1.0, epsilon_end=0.01, epsilon_decay=0.995,
                 learning_rate=0.001, target_update_freq=100):
        self.state_shape = state_shape
        self.action_size = action_size
        self.replay_buffer = deque(maxlen=replay_buffer_size)
        self.batch_size = batch_size
        self.gamma = gamma
        self.epsilon = epsilon_start
        self.epsilon_end = epsilon_end
        self.epsilon_decay = epsilon_decay
        self.learning_rate = learning_rate
        self.target_update_freq = target_update_freq

        # Create main and target Q-networks
        self.q_network = self._build_q_network()
        self.target_q_network = self._build_q_network()
        self.update_target_network()

    def _build_q_network(self):
        model = models.Sequential([
            layers.Input(shape=self.state_shape),
            layers.Conv2D(32, (8, 8), strides=(4, 4), activation='relu'),
            layers.Conv2D(64, (4, 4), strides=(2, 2), activation='relu'),
            layers.Conv2D(64, (3, 3), activation='relu'),
            layers.Flatten(),
            layers.Dense(512, activation='relu'),
            layers.Dense(self.action_size, activation=None)
        ])
        model.compile(loss='mse', optimizer=optimizers.Adam(learning_rate=self.learning_rate))
        return model

    def update_target_network(self):
        self.target_q_network.set_weights(self.q_network.get_weights())

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return np.random.choice(self.action_size)
        q_values = self.q_network.predict(state)[0]
        return np.argmax(q_values)

    def remember(self, state, action, reward, next_state, done):
        self.replay_buffer.append((state, action, reward, next_state, done))

    def replay(self):
        if len(self.replay_buffer) < self.batch_size:
            return

        batch = random.sample(self.replay_buffer, self.batch_size)
        states, targets = [], []
        for state, action, reward, next_state, done in batch:
            target = reward
            if not done:
                target = reward + self.gamma * np.amax(self.target_q_network.predict(next_state)[0])
            target_f = self.q_network.predict(state)
            target_f[0][action] = target
            states.append(state[0])
            targets.append(target_f[0])
        self.q_network.fit(np.array(states), np.array(targets), epochs=1, verbose=0)

        if self.epsilon > self.epsilon_end:
            self.epsilon *= self.epsilon_decay

        if self.target_update_freq > 0 and len(self.replay_buffer) % self.target_update_freq == 0:
            self.update_target_network()

import numpy as np
import cv2

def preprocess_screen(screen):
    # Convert the screen to grayscale
    gray_screen = cv2.cvtColor(screen, cv2.COLOR_RGB2GRAY)
    # Resize the screen to the desired input shape
    resized_screen = cv2.resize(gray_screen, (84, 84))
    # Normalize the pixel values to the range [0, 1]
    normalized_screen = resized_screen / 255.0
    # Add a batch dimension to the screen
    preprocessed_screen = np.expand_dims(normalized_screen, axis=0)
    return preprocessed_screen


In [None]:
# Create the CartPole environment
env = gym.make('ALE/Pacman-v5', render_mode = 'rgb_array')

# Set the number of episodes for training
num_episodes = 1000

# Initialize the DQN agent
agent = DQNAgent(env.observation_space.shape[0], env.action_space.n)

# Iterate over episodes
for episode in range(num_episodes):
    # Reset the environment for each episode
    state = env.reset()
    done = False
    total_reward = 0

    # Iterate over time steps within the episode
    while not done:
        # Get the screen image from the environment
        screen = env.render(mode='rgb_array')
        # Preprocess the screen image
        preprocessed_screen = preprocess_screen(screen)
        # Pass the preprocessed screen image to the DQN agent
        action = agent.act(preprocessed_screen)


        # Choose an action using the DQN agent's policy
        action = agent.act(state)

        # Take a step in the environment
        next_state, reward, done, _ = env.step(action)

        # Store the experience (state, action, reward, next_state, done) in the agent's memory
        agent.remember(state, action, reward, next_state, done)

        # Update the agent's Q-network by sampling experiences from memory
        agent.replay()

        # Update the current state
        state = next_state

        # Accumulate the total reward for the episode
        total_reward += reward

    # Print the total reward achieved in the episode
    print(f"Episode {episode + 1}: Total Reward = {total_reward}")

# Close the environment
env.close()


# Stable Baselines 3