<a href="https://colab.research.google.com/github/prisar/ai_notebooks/blob/main/nb_090.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Task
Perform a reinforcement learning experiment by building a custom Gridworld environment from scratch. The agent should learn to navigate this 2D maze using a CNN-based Deep Q-Network (DQN). For a bonus, use PyGame or gymnasium to visually render the environment and the agent's learning process.

## Environment setup

### Subtask:
Install the necessary libraries, including `pygame` for visualization and `tensorflow` for the DQN model.


**Reasoning**:
I will install the necessary libraries, `pygame` and `tensorflow`, using pip as instructed. I will combine both installation commands into a single code block for efficiency.



In [None]:
!pip install pygame tensorflow



## Gridworld environment

### Subtask:
Create a custom Gridworld environment class. This class will handle the game logic, including the grid, agent, rewards, and state transitions.


**Reasoning**:
I will now create the `GridworldEnv` class as requested. This class will include the `__init__`, `reset`, `step`, and `render` methods as specified in the instructions. The `__init__` method will set up the grid, agent, goal, and obstacles. The `reset` method will bring the environment to its initial state. The `step` method will handle the agent's actions, state transitions, and rewards. Finally, the `render` method will be a placeholder for now.



In [None]:
import numpy as np

class GridworldEnv:
    def __init__(self, grid_size=(10, 10)):
        self.grid_size = grid_size
        self.agent_pos = [0, 0]
        self.goal_pos = [grid_size[0] - 1, grid_size[1] - 1]
        self.obstacles = [[1, 1], [1, 2], [2, 1], [2, 2], [5,5], [5,6], [6,5], [6,6]]
        self.state = self._get_state()

    def reset(self):
        self.agent_pos = [0, 0]
        self.state = self._get_state()
        return self.state

    def step(self, action):
        if action == 0:  # Up
            self.agent_pos[0] = max(0, self.agent_pos[0] - 1)
        elif action == 1:  # Down
            self.agent_pos[0] = min(self.grid_size[0] - 1, self.agent_pos[0] + 1)
        elif action == 2:  # Left
            self.agent_pos[1] = max(0, self.agent_pos[1] - 1)
        elif action == 3:  # Right
            self.agent_pos[1] = min(self.grid_size[1] - 1, self.agent_pos[1] + 1)

        self.state = self._get_state()
        done = False
        reward = -0.1  # Small negative reward for each step

        if self.agent_pos == self.goal_pos:
            reward = 10.0
            done = True
        elif self.agent_pos in self.obstacles:
            reward = -5.0
            done = True

        return self.state, reward, done, {}

    def render(self):
        # Placeholder for rendering
        grid = np.zeros(self.grid_size)
        grid[self.agent_pos[0], self.agent_pos[1]] = 0.5  # Agent
        grid[self.goal_pos[0], self.goal_pos[1]] = 1.0  # Goal
        for obs in self.obstacles:
            grid[obs[0], obs[1]] = -1.0 # Obstacle
        print(grid)


    def _get_state(self):
        state = np.zeros(self.grid_size)
        state[self.agent_pos[0], self.agent_pos[1]] = 1
        return state

## Cnn-based dqn model

### Subtask:
Define and build a Deep Q-Network (DQN) model using a Convolutional Neural Network (CNN) to process the grid-based state.


**Reasoning**:
I will now define the DQN model. I will import the necessary layers from TensorFlow and Keras, then create a function that builds the CNN-based model as specified in the instructions. The model will have two convolutional layers, a flatten layer, and two dense layers, with the final layer outputting the Q-values for each action. The model will be compiled with the Adam optimizer and mean squared error loss function.



In [None]:
from tensorflow.keras.layers import Input, Conv2D, Dense, Flatten
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

def create_dqn_model(input_shape, num_actions):
    """
    Creates a Deep Q-Network (DQN) model.

    Args:
        input_shape (tuple): The shape of the input state.
        num_actions (int): The number of possible actions.

    Returns:
        tensorflow.keras.models.Model: The compiled DQN model.
    """
    input_layer = Input(shape=input_shape)

    # Convolutional layers
    conv1 = Conv2D(32, (3, 3), activation='relu')(input_layer)
    conv2 = Conv2D(64, (3, 3), activation='relu')(conv1)

    # Flatten the output
    flatten = Flatten()(conv2)

    # Dense layers
    dense1 = Dense(128, activation='relu')(flatten)
    output_layer = Dense(num_actions, activation='linear')(dense1)

    # Create the model
    model = Model(inputs=input_layer, outputs=output_layer)

    # Compile the model
    model.compile(optimizer=Adam(), loss='mean_squared_error')

    return model

## Agent training

### Subtask:
Implement the DQN training loop, including experience replay and target network updates.


**Reasoning**:
Implement the DQN training loop, including experience replay and target network updates. This involves instantiating the environment and models, creating a target network, and implementing the main training loop with epsilon-greedy action selection, experience replay, and target network updates.



In [None]:
import random
from collections import deque
import tensorflow as tf

# Hyperparameters
num_episodes = 1000
max_steps_per_episode = 100
epsilon = 1.0  # Exploration rate
epsilon_decay_rate = 0.995
min_epsilon = 0.01
learning_rate = 0.001
gamma = 0.99  # Discount factor
replay_buffer_size = 10000
batch_size = 32
target_update_frequency = 10

# Instantiate the environment and the main DQN model
env = GridworldEnv()
state_shape = (env.grid_size[0], env.grid_size[1], 1) # Add channel dimension
num_actions = 4 # Up, Down, Left, Right
main_dqn = create_dqn_model(state_shape, num_actions)

# Create the target network and copy initial weights
target_dqn = create_dqn_model(state_shape, num_actions)
target_dqn.set_weights(main_dqn.get_weights())

# Experience Replay Buffer
replay_buffer = deque(maxlen=replay_buffer_size)

# Training loop
for episode in range(num_episodes):
    state = env.reset()
    state = np.expand_dims(state, axis=-1) # Add channel dimension
    state = np.expand_dims(state, axis=0) # Add batch dimension
    total_reward = 0

    for step in range(max_steps_per_episode):
        # Epsilon-greedy action selection
        if random.uniform(0, 1) < epsilon:
            action = random.randrange(num_actions)
        else:
            q_values = main_dqn.predict(state, verbose=0)
            action = np.argmax(q_values[0])

        # Take the selected action
        next_state, reward, done, _ = env.step(action)
        next_state = np.expand_dims(next_state, axis=-1) # Add channel dimension
        next_state = np.expand_dims(next_state, axis=0) # Add batch dimension
        total_reward += reward

        # Store experience in replay buffer
        replay_buffer.append((state, action, reward, next_state, done))

        state = next_state

        # Train the main DQN model if buffer is large enough
        if len(replay_buffer) > batch_size:
            minibatch = random.sample(replay_buffer, batch_size)
            states, actions, rewards, next_states, dones = zip(*minibatch)

            states = np.vstack(states)
            next_states = np.vstack(next_states)

            target_q_values = target_dqn.predict(next_states, verbose=0)
            # Use argmax on the main network's predictions for the next state
            main_q_values_next = main_dqn.predict(next_states, verbose=0)
            next_actions = np.argmax(main_q_values_next, axis=1)

            # Bellman equation
            target_q = rewards + gamma * np.max(target_q_values, axis=1) * (1 - np.array(dones))

            # Create target Q-values for the main network
            with tf.GradientTape() as tape:
                current_q_values = main_dqn(states)
                one_hot_actions = tf.one_hot(actions, num_actions)
                predicted_q_values = tf.reduce_sum(current_q_values * one_hot_actions, axis=1)
                loss = tf.keras.losses.MSE(target_q, predicted_q_values)

            grads = tape.gradient(loss, main_dqn.trainable_variables)
            main_dqn.optimizer.apply_gradients(zip(grads, main_dqn.trainable_variables))


        if done:
            break

    # Decay epsilon
    epsilon = max(min_epsilon, epsilon * epsilon_decay_rate)

    # Update target network
    if episode % target_update_frequency == 0:
        target_dqn.set_weights(main_dqn.get_weights())

    if episode % 100 == 0:
        print(f"Episode {episode}: Total Reward = {total_reward}, Epsilon = {epsilon:.2f}")

print("Training finished.")

Episode 0: Total Reward = -5.2, Epsilon = 0.99


KeyboardInterrupt: 

## Visualization

### Subtask:
Use `pygame` to visualize the agent's learning process in the Gridworld environment.


**Reasoning**:
Import the pygame library and initialize it. Set up the display window.



In [None]:
import pygame

# Initialize pygame
pygame.init()

# Set up display dimensions
GRID_SIZE = env.grid_size
CELL_SIZE = 50
DISPLAY_WIDTH = GRID_SIZE[1] * CELL_SIZE
DISPLAY_HEIGHT = GRID_SIZE[0] * CELL_SIZE
screen = pygame.display.set_mode((DISPLAY_WIDTH, DISPLAY_HEIGHT))
pygame.display.set_caption("Gridworld DQN Training")

# Define colors
WHITE = (255, 255, 255)
BLACK = (0, 0, 0)
BLUE = (0, 0, 255)  # Agent
GREEN = (0, 255, 0) # Goal
RED = (255, 0, 0) # Obstacle (using RED for obstacles)

In [None]:
!pip install pygame tensorflow



In [None]:
import numpy as np
import pygame

class GridworldEnv:
    def __init__(self, grid_size=(10, 10)):
        self.grid_size = grid_size
        self.agent_pos = [0, 0]
        self.goal_pos = [grid_size[0] - 1, grid_size[1] - 1]
        self.obstacles = [[1, 1], [1, 2], [2, 1], [2, 2], [5,5], [5,6], [6,5], [6,6]]
        self.state = self._get_state()

    def reset(self):
        self.agent_pos = [0, 0]
        self.state = self._get_state()
        return self.state

    def step(self, action):
        if action == 0:  # Up
            self.agent_pos[0] = max(0, self.agent_pos[0] - 1)
        elif action == 1:  # Down
            self.agent_pos[0] = min(self.grid_size[0] - 1, self.agent_pos[0] + 1)
        elif action == 2:  # Left
            self.agent_pos[1] = max(0, self.agent_pos[1] - 1)
        elif action == 3:  # Right
            self.agent_pos[1] = min(self.grid_size[1] - 1, self.agent_pos[1] + 1)

        self.state = self._get_state()
        done = False
        reward = -0.1  # Small negative reward for each step

        if self.agent_pos == self.goal_pos:
            reward = 10.0
            done = True
        elif self.agent_pos in self.obstacles:
            reward = -5.0
            done = True

        return self.state, reward, done, {}

    def render(self, screen, cell_size):
        screen.fill(WHITE)
        for row in range(self.grid_size[0]):
            for col in range(self.grid_size[1]):
                rect = pygame.Rect(col * cell_size, row * cell_size, cell_size, cell_size)
                pygame.draw.rect(screen, BLACK, rect, 1)

        # Draw obstacles
        for obs in self.obstacles:
            rect = pygame.Rect(obs[1] * cell_size, obs[0] * cell_size, cell_size, cell_size)
            pygame.draw.rect(screen, RED, rect)

        # Draw goal
        goal_rect = pygame.Rect(self.goal_pos[1] * cell_size, self.goal_pos[0] * cell_size, cell_size, cell_size)
        pygame.draw.rect(screen, GREEN, goal_rect)

        # Draw agent
        agent_rect = pygame.Rect(self.agent_pos[1] * cell_size, self.agent_pos[0] * cell_size, cell_size, cell_size)
        pygame.draw.rect(screen, BLUE, agent_rect)

        pygame.display.flip()


    def _get_state(self):
        state = np.zeros(self.grid_size)
        state[self.agent_pos[0], self.agent_pos[1]] = 1
        return state

In [None]:
from tensorflow.keras.layers import Input, Conv2D, Dense, Flatten
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

def create_dqn_model(input_shape, num_actions):
    """
    Creates a Deep Q-Network (DQN) model.

    Args:
        input_shape (tuple): The shape of the input state.
        num_actions (int): The number of possible actions.

    Returns:
        tensorflow.keras.models.Model: The compiled DQN model.
    """
    input_layer = Input(shape=input_shape)

    # Convolutional layers
    conv1 = Conv2D(32, (3, 3), activation='relu')(input_layer)
    conv2 = Conv2D(64, (3, 3), activation='relu')(conv1)

    # Flatten the output
    flatten = Flatten()(conv2)

    # Dense layers
    dense1 = Dense(128, activation='relu')(flatten)
    output_layer = Dense(num_actions, activation='linear')(dense1)

    # Create the model
    model = Model(inputs=input_layer, outputs=output_layer)

    # Compile the model
    model.compile(optimizer=Adam(), loss='mean_squared_error')

    return model

In [None]:
import numpy as np
import pygame
import random
from collections import deque
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, Dense, Flatten
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

class GridworldEnv:
    def __init__(self, grid_size=(10, 10)):
        self.grid_size = grid_size
        self.agent_pos = [0, 0]
        self.goal_pos = [grid_size[0] - 1, grid_size[1] - 1]
        self.obstacles = [[1, 1], [1, 2], [2, 1], [2, 2], [5,5], [5,6], [6,5], [6,6]]
        self.state = self._get_state()

    def reset(self):
        self.agent_pos = [0, 0]
        self.state = self._get_state()
        return self.state

    def step(self, action):
        if action == 0:  # Up
            self.agent_pos[0] = max(0, self.agent_pos[0] - 1)
        elif action == 1:  # Down
            self.agent_pos[0] = min(self.grid_size[0] - 1, self.agent_pos[0] + 1)
        elif action == 2:  # Left
            self.agent_pos[1] = max(0, self.agent_pos[1] - 1)
        elif action == 3:  # Right
            self.agent_pos[1] = min(self.grid_size[1] - 1, self.agent_pos[1] + 1)

        self.state = self._get_state()
        done = False
        reward = -0.1  # Small negative reward for each step

        if self.agent_pos == self.goal_pos:
            reward = 10.0
            done = True
        elif self.agent_pos in self.obstacles:
            reward = -5.0
            done = True

        return self.state, reward, done, {}

    def render(self, screen, cell_size):
        screen.fill(WHITE)
        for row in range(self.grid_size[0]):
            for col in range(self.grid_size[1]):
                rect = pygame.Rect(col * cell_size, row * cell_size, cell_size, cell_size)
                pygame.draw.rect(screen, BLACK, rect, 1)

        # Draw obstacles
        for obs in self.obstacles:
            rect = pygame.Rect(obs[1] * cell_size, obs[0] * cell_size, cell_size, cell_size)
            pygame.draw.rect(screen, RED, rect)

        # Draw goal
        goal_rect = pygame.Rect(self.goal_pos[1] * cell_size, self.goal_pos[0] * cell_size, cell_size, cell_size)
        pygame.draw.rect(screen, GREEN, goal_rect)

        # Draw agent
        agent_rect = pygame.Rect(self.agent_pos[1] * cell_size, self.agent_pos[0] * cell_size, cell_size, cell_size)
        pygame.draw.rect(screen, BLUE, agent_rect)

        pygame.display.flip()


    def _get_state(self):
        state = np.zeros(self.grid_size)
        state[self.agent_pos[0], self.agent_pos[1]] = 1
        return state

def create_dqn_model(input_shape, num_actions):
    """
    Creates a Deep Q-Network (DQN) model.

    Args:
        input_shape (tuple): The shape of the input state.
        num_actions (int): The number of possible actions.

    Returns:
        tensorflow.keras.models.Model: The compiled DQN model.
    """
    input_layer = Input(shape=input_shape)

    # Convolutional layers
    conv1 = Conv2D(32, (3, 3), activation='relu')(input_layer)
    conv2 = Conv2D(64, (3, 3), activation='relu')(conv1)

    # Flatten the output
    flatten = Flatten()(conv2)

    # Dense layers
    dense1 = Dense(128, activation='relu')(flatten)
    output_layer = Dense(num_actions, activation='linear')(dense1)

    # Create the model
    model = Model(inputs=input_layer, outputs=output_layer)

    # Compile the model
    model.compile(optimizer=Adam(), loss='mean_squared_error')

    return model

# Initialize pygame
pygame.init()

# Set up display dimensions
env = GridworldEnv()
GRID_SIZE = env.grid_size
CELL_SIZE = 50
DISPLAY_WIDTH = GRID_SIZE[1] * CELL_SIZE
DISPLAY_HEIGHT = GRID_SIZE[0] * CELL_SIZE
screen = pygame.display.set_mode((DISPLAY_WIDTH, DISPLAY_HEIGHT))
pygame.display.set_caption("Gridworld DQN Training")

# Define colors
WHITE = (255, 255, 255)
BLACK = (0, 0, 0)
BLUE = (0, 0, 255)  # Agent
GREEN = (0, 255, 0) # Goal
RED = (255, 0, 0) # Obstacle (using RED for obstacles)

# Hyperparameters
num_episodes = 100
max_steps_per_episode = 100
epsilon = 1.0  # Exploration rate
epsilon_decay_rate = 0.995
min_epsilon = 0.01
learning_rate = 0.001
gamma = 0.99  # Discount factor
replay_buffer_size = 10000
batch_size = 32
target_update_frequency = 10

# Instantiate the environment and the main DQN model
state_shape = (env.grid_size[0], env.grid_size[1], 1) # Add channel dimension
num_actions = 4 # Up, Down, Left, Right
main_dqn = create_dqn_model(state_shape, num_actions)

# Create the target network and copy initial weights
target_dqn = create_dqn_model(state_shape, num_actions)
target_dqn.set_weights(main_dqn.get_weights())

# Experience Replay Buffer
replay_buffer = deque(maxlen=replay_buffer_size)

# Training loop
for episode in range(num_episodes):
    state = env.reset()
    state = np.expand_dims(state, axis=-1) # Add channel dimension
    state = np.expand_dims(state, axis=0) # Add batch dimension
    total_reward = 0

    for step in range(max_steps_per_episode):
        # Epsilon-greedy action selection
        if random.uniform(0, 1) < epsilon:
            action = random.randrange(num_actions)
        else:
            q_values = main_dqn.predict(state, verbose=0)
            action = np.argmax(q_values[0])

        # Take the selected action
        next_state, reward, done, _ = env.step(action)
        next_state = np.expand_dims(next_state, axis=-1) # Add channel dimension
        next_state = np.expand_dims(next_state, axis=0) # Add batch dimension
        total_reward += reward

        # Store experience in replay buffer
        replay_buffer.append((state, action, reward, next_state, done))

        state = next_state

        # Render the environment
        env.render(screen, CELL_SIZE)
        pygame.time.wait(100) # Add a small delay for visualization

        # Train the main DQN model if buffer is large enough
        if len(replay_buffer) > batch_size:
            minibatch = random.sample(replay_buffer, batch_size)
            states, actions, rewards, next_states, dones = zip(*minibatch)

            states = np.vstack(states)
            next_states = np.vstack(next_states)

            target_q_values = target_dqn.predict(next_states, verbose=0)
            # Use argmax on the main network's predictions for the next state
            main_q_values_next = main_dqn.predict(next_states, verbose=0)
            next_actions = np.argmax(main_q_values_next, axis=1)

            # Bellman equation
            target_q = rewards + gamma * np.max(target_q_values, axis=1) * (1 - np.array(dones))

            # Create target Q-values for the main network
            with tf.GradientTape() as tape:
                current_q_values = main_dqn(states)
                one_hot_actions = tf.one_hot(actions, num_actions)
                predicted_q_values = tf.reduce_sum(current_q_values * one_hot_actions, axis=1)
                loss = tf.keras.losses.MSE(target_q, predicted_q_values)

            grads = tape.gradient(loss, main_dqn.trainable_variables)
            main_dqn.optimizer.apply_gradients(zip(grads, main_dqn.trainable_variables))


        if done:
            break

    # Decay epsilon
    epsilon = max(min_epsilon, epsilon * epsilon_decay_rate)

    # Update target network
    if episode % target_update_frequency == 0:
        target_dqn.set_weights(main_dqn.get_weights())

    if episode % 10 == 0:
        print(f"Episode {episode}: Total Reward = {total_reward}, Epsilon = {epsilon:.2f}")

print("Training finished.")
pygame.quit()

pygame 2.6.1 (SDL 2.28.4, Python 3.11.13)
Hello from the pygame community. https://www.pygame.org/contribute.html
Episode 0: Total Reward = -6.0, Epsilon = 0.99
Episode 10: Total Reward = -5.2, Epsilon = 0.95
Episode 20: Total Reward = -6.0, Epsilon = 0.90
Episode 30: Total Reward = -10.899999999999995, Epsilon = 0.86
Episode 40: Total Reward = -6.7, Epsilon = 0.81
Episode 50: Total Reward = -6.3, Epsilon = 0.77
Episode 60: Total Reward = -6.4, Epsilon = 0.74
Episode 70: Total Reward = -5.4, Epsilon = 0.70
Episode 80: Total Reward = -6.2, Epsilon = 0.67
Episode 90: Total Reward = -9.99999999999998, Epsilon = 0.63
Training finished.


In [3]:
import pygame

# Initialize pygame
pygame.init()

# Set up display dimensions
GRID_SIZE = env.grid_size
CELL_SIZE = 50
DISPLAY_WIDTH = GRID_SIZE[1] * CELL_SIZE
DISPLAY_HEIGHT = GRID_SIZE[0] * CELL_SIZE
screen = pygame.display.set_mode((DISPLAY_WIDTH, DISPLAY_HEIGHT))
pygame.display.set_caption("Gridworld DQN Training")

# Define colors
WHITE = (255, 255, 255)
BLACK = (0, 0, 0)
BLUE = (0, 0, 255)  # Agent
GREEN = (0, 255, 0) # Goal
RED = (255, 0, 0) # Obstacle (using RED for obstacles)