In [1]:
import gymnasium as gym
import numpy as np
from collections import defaultdict
from tqdm import tqdm
import cv2
import matplotlib.pyplot as plt
import pickle
import time

class ActionUncertaintyWrapper(gym.Wrapper):
    def __init__(self, env, uncertainty_prob=0.01):
        super().__init__(env)
        self.uncertainty_prob = uncertainty_prob
        self.action_space = env.action_space

    def step(self, action):
        if np.random.random() < self.uncertainty_prob:
            action = self.action_space.sample()
        return self.env.step(action)

class QLearning:
    def __init__(self, env, learning_rate=3e-4, gamma=0.99, epsilon_start=1.0, epsilon_end=0.01, epsilon_decay=0.9995):
        self.env = env
        self.n_actions = env.action_space.n
        self.learning_rate = learning_rate
        self.gamma = gamma
        self.epsilon = epsilon_start
        self.epsilon_end = epsilon_end
        self.epsilon_decay = epsilon_decay
        self.Q = defaultdict(lambda: np.zeros(self.n_actions))
        
    def preprocess_observation(self, observation):
        gray = cv2.cvtColor(observation, cv2.COLOR_RGB2GRAY)
        resized = cv2.resize(gray, (84, 84), interpolation=cv2.INTER_AREA)
        return resized

    def extract_features(self, preprocessed_obs):
        # Find the paddle
        paddle_row = np.argmax(preprocessed_obs[-20:].sum(axis=1))
        paddle_position = paddle_row + (preprocessed_obs.shape[0] - 20)

        # Find the ball
        ball_positions = np.argwhere(preprocessed_obs[10:-20] > 200)
        if len(ball_positions) > 0:
            ball_y, ball_x = ball_positions.mean(axis=0).astype(int)
            ball_y += 10 
        else:
            ball_x, ball_y = -1, -1  # Ball not found

        # Count remaining bricks
        bricks = np.sum(preprocessed_obs[10:30] > 0)

        # Create feature vector
        features = (
            paddle_position // 5,
            ball_x // 5,
            ball_y // 5,
            bricks // 5,
            (ball_x - paddle_position) // 5
        )

        return features

    def epsilon_greedy_policy(self, state):
        if np.random.random() < self.epsilon:
            return np.random.randint(self.n_actions)
        else:
            return np.argmax(self.Q[state])

    def train(self, n_episodes):
        episode_rewards = []
        episode_lengths = []

        for episode in tqdm(range(n_episodes)):
            obs, _ = self.env.reset()
            state = self.extract_features(self.preprocess_observation(obs))
            episode_reward = 0
            episode_length = 0
            last_bricks = state[3] * 5  # Store initial brick count

            while True:
                action = self.epsilon_greedy_policy(state)
                next_obs, reward, done, truncated, _ = self.env.step(action)
                next_state = self.extract_features(self.preprocess_observation(next_obs))
                
                # Reward shaping
                if next_state[3] * 5 < last_bricks:  # If a brick was destroyed
                    reward += 1
                    last_bricks = next_state[3] * 5
                
                if next_state[4] == 0:  # If ball is above the paddle
                    reward += 0.1

                # Q-learning update
                best_next_action = np.argmax(self.Q[next_state])
                td_target = reward + self.gamma * self.Q[next_state][best_next_action]
                td_error = td_target - self.Q[state][action]
                self.Q[state][action] += self.learning_rate * td_error

                state = next_state
                episode_reward += reward
                episode_length += 1

                if done or truncated:
                    break

            episode_rewards.append(episode_reward)
            episode_lengths.append(episode_length)

            # Decay epsilon and learning rate
            self.epsilon = max(self.epsilon_end, self.epsilon * self.epsilon_decay)
            self.learning_rate *= 0.9999

        return episode_rewards, episode_lengths

    def save_model(self, filename):
        with open(filename, 'wb') as f:
            pickle.dump(dict(self.Q), f)

    @classmethod
    def load_model(cls, env, filename):
        agent = cls(env)
        with open(filename, 'rb') as f:
            agent.Q = defaultdict(lambda: np.zeros(agent.n_actions), pickle.load(f))
        return agent

    def play(self, n_episodes=5):
        for episode in range(n_episodes):
            obs, _ = self.env.reset()
            state = self.extract_features(self.preprocess_observation(obs))
            episode_reward = 0

            while True:
                self.env.render()
                action = np.argmax(self.Q[state])
                next_obs, reward, done, truncated, _ = self.env.step(action)
                next_state = self.extract_features(self.preprocess_observation(next_obs))
                
                state = next_state
                episode_reward += reward

                if done or truncated:
                    break

            print(f"Episode {episode + 1} reward: {episode_reward}")
            time.sleep(1)  # Add a small delay between episodes

        self.env.close()


In [2]:
def plot_results(episode_rewards, episode_lengths):
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 10))

    # Plot episode rewards
    ax1.plot(episode_rewards)
    ax1.set_title('Episode Rewards')
    ax1.set_xlabel('Episode')
    ax1.set_ylabel('Total Reward')

    # Plot episode lengths
    ax2.plot(episode_lengths)
    ax2.set_title('Episode Lengths')
    ax2.set_xlabel('Episode')
    ax2.set_ylabel('Steps')

    plt.tight_layout()
    plt.savefig('learning_curves.png')
    plt.close()

# Training
env = gym.make('Breakout-v4', render_mode='rgb_array')
env = ActionUncertaintyWrapper(env, uncertainty_prob=0.01)

agent = QLearning(env)
n_episodes = 5000
episode_rewards, episode_lengths = agent.train(n_episodes)

# Save the trained model
agent.save_model('breakout_q_learning.pkl')

# Plot results
plot_results(episode_rewards, episode_lengths)

# Close the training environment
env.close()


100%|██████████| 5000/5000 [08:43<00:00,  9.54it/s]


In [3]:
# Play the game
play_env = gym.make('Breakout-v4', render_mode='rgb_array')

play_env = ActionUncertaintyWrapper(play_env, uncertainty_prob=0.01)

# Load the trained model
loaded_agent = QLearning.load_model(play_env, 'breakout_q_learning.pkl')

# Play 5 episodes
loaded_agent.play(n_episodes=10)
play_env.close()

  logger.warn(


Episode 1 reward: 0.0
Episode 2 reward: 6.0
Episode 3 reward: 0.0
Episode 4 reward: 0.0
Episode 5 reward: 0.0
Episode 6 reward: 0.0
Episode 7 reward: 2.0
Episode 8 reward: 1.0
Episode 9 reward: 2.0
Episode 10 reward: 1.0
