# Grid-world environment testing

In [48]:
import numpy as np
import random

In [53]:
# Define the actions available in the grid-world
actions = ['left', 'right', 'up', 'down']

### ⚙️ Standard Grid-World env.

In [49]:
class GridWorld:
    def __init__(self, size, start, goal):
        self.size = size
        self.start = start
        self.state = start
        self.goal = goal
        self.reset_board()

    def reset_board(self):
        """Creates the grid with the goal position marked."""
        self.board = np.zeros((self.size, self.size))
        self.board[self.goal] = 6  # Goal position marked as "2"
        self.board[self.start] = 5  # Start position marked as "0"

    def is_valid(self, state):
        """Checks if the new state is within grid bounds."""
        x, y = state
        return 0 <= x < self.size and 0 <= y < self.size

    def reset(self):
        """Resets the agent to the starting position and resets the board."""
        self.state = self.start
        self.reset_board()

    def step(self, action):
        """Moves the agent in the given direction and returns the reward."""
        x, y = self.state

        if action == 'left':
            new_state = (x, y - 1)
        elif action == 'right':
            new_state = (x, y + 1)
        elif action == 'up':
            new_state = (x - 1, y)
        elif action == 'down':
            new_state = (x + 1, y)
        else:
            raise ValueError('Invalid action')

        # If the move is valid, update state; otherwise, penalize
        if self.is_valid(new_state):
            self.state = new_state
            reward = 0
        else:
            reward = -0.01

        # Check if the goal is reached and, if so, give reward and reset
        if self.state == self.goal:
            reward = 1
            self.reset()

        return reward

    def get_board(self):
        """Returns a board representation with the agent's current position."""
        board_copy = self.board.copy()
        if self.state != self.goal:
            board_copy[self.state] = 1  # Mark agent position
        return board_copy

### 💎 Enriched Grid-World env. (with diamonds!) 
The diamonds will serve as **rewards**: the agent must learn to collect as many diamonds as possible while also reaching the exit.

In [None]:
class GridWorld_diamonds(GridWorld):
    def __init__(self, size, start, goal, num_diamonds):
        super().__init__(size, start, goal)
        self.num_diamonds = num_diamonds
        self.diamonds = []
        self.reset_diamonds()

    def reset_diamonds(self):
        '''Randomly places diamonds on the grid, ensuring no overlap with start and goal.'''
        self.diamonds = []
        while len(self.diamonds) < self.num_diamonds:
            x = random.randint(0, self.size - 1)
            y = random.randint(0, self.size - 1)
            diamond_loc = (x, y)
            if diamond_loc != self.start and diamond_loc != self.goal and diamond_loc not in self.diamonds:
                self.diamonds.append(diamond_loc)
                self.board[diamond_loc] = 3  # Mark diamond position

    def reset(self):
        """Resets the agent to the starting position, resets the board, and repositions diamonds."""
        super().reset()
        self.reset_diamonds()
        #print(f"Diamonds placed at: {self.diamonds}\n")
    
    def step(self, action):
        reward = super().step(action)
        
        # Check if the agent collects a diamond
        if self.state in self.diamonds:
            self.diamonds.remove(self.state)
            self.board[self.state] = 0
            reward += 0.5  # Reward for collecting a diamond

        return reward        

## (contextual) Multi-armed bandits
Remember that the contextual bandit algorithm is aware of the current **state**, but each action doesn't affect the next state, just the reward.
The decision-making process is driven only by the action-values, if we don't consider any term that accounts for frequency, namely using an Upper-Confidence-Bound selection.

In [None]:
# Parameters
epsilon = 0.05  # Exploration rate
episodes = 1000
grid_size = 6

# Initialize the grid-world environment
gridworld = GridWorld(grid_size, (0, 0), (grid_size-1, grid_size-1))

def contextual_bandit(gridworld, epsilon=0.05, episodes=1000, opt_iv = False, ucb = False):
    """Contextual bandit algorithm for a stateless GridWorld agent."""
    
    # Reset the environment
    gridworld.reset()

    # Initialize action values and action counts
    action_values = {action: 0.0 for action in actions} if not opt_iv else {action: 1.5 for action in actions}
    action_counts = {action: 0 for action in actions}

    # Logging trackers
    diamonds_collected = 0
    total_goals_reached = 0
    total_reward = 0

    # Flag to check if the gridworld is a diamond world
    is_diamond = not isinstance(gridworld, GridWorld)

    print(f"--- Starting Contextual Bandit Run ---")
    print(f"Grid Size: {gridworld.size}x{gridworld.size}")
    print(f"Start Position: {gridworld.start}, Goal Position: {gridworld.goal}")
    print(f"Epsilon (exploration rate): {epsilon}")
    print(f"Total Episodes: {episodes}\n")

    # Run the bandit loop
    for episode in range(1, episodes + 1):
        print(f"\n📘 Episode {episode}/{episodes}")
        
        # Epsilon-greedy action selection
        explore = random.random() < epsilon
        if ucb: # UCB term: c * sqrt(log(n) / N(a))
            print("→ Using UCB for action selection")
            c = 2  # Exploration parameter for UCB
            ucb_terms = [c * np.sqrt(np.log(episode) / action_counts[action] if action_counts[action] > 0 else 1) for action in actions]

            # Combine action_values and ucb_terms into a dictionary of (action -> value + UCB term)
            combined_values = {
                action: action_values[action] + ucb_terms[i]
                for i, action in enumerate(actions)
            }
            action = random.choice(actions) if explore else max(combined_values, key=action_values.get)
        else:
            action = random.choice(actions) if explore else max(action_values, key=action_values.get)
        print(f"→ Action chosen: {action} ({'Exploration' if explore else 'Exploitation'})")

        # Take action in the environment
        reward = gridworld.step(action)
        action_counts[action] += 1

        # Update action value estimate incrementally:
        # Q_new = Q_old + (1/n) * (reward - Q_old)
        old_value = action_values[action]
        alpha = 1 / action_counts[action]
        action_values[action] += alpha * (reward - old_value)

        # Verbose feedback
        print(f"→ Reward received: {reward}")
        print(f"→ Updated Q[{action}]: {action_values[action]:.5f} (was {old_value:.5f})")
        print(f"→ Agent State: {gridworld.state}")
        print("→ Current Grid:")
        print(gridworld.get_board())

        # Reward-specific reactions
        if reward == 1:
            total_goals_reached += 1
            print("\n✅ GOAL REACHED!")
            print(f"🎯 Total goals reached so far: {total_goals_reached}")
            print(f"💎 Diamonds collected before goal: {diamonds_collected}") if is_diamond else None
            diamonds_collected = 0  # Reset after goal
        elif reward == 0.5:
            diamonds_collected += 1
            print("💎 Diamond collected!")
        elif reward == -0.01:
            print("🚫 Invalid move - wall hit.")

        # Accumulate total reward
        total_reward += reward

    # Final report
    print("\n=== Summary ===")
    print("Action Value Estimates:")
    for action, value in action_values.items():
        print(f"  {action:<5}: {value:.5f}")

    print("\nAction Selection Counts:")
    for action, count in action_counts.items():
        print(f"  {action:<5}: {count} times")

    print(f"\n🎯 Total Goals Reached: {total_goals_reached}")
    print(f"\n📈 Average Reward per Episode: {total_reward / episodes:.5f}")
    print("=== End of Bandit Run ===")

# Run the contextual bandit algo, with the standard grid-world
contextual_bandit(gridworld, epsilon, episodes=1000, opt_iv=True, ucb=False)    

--- Starting Contextual Bandit Run ---
Grid Size: 6x6
Start Position: (0, 0), Goal Position: (5, 5)
Epsilon (exploration rate): 0.05
Total Episodes: 1000


📘 Episode 1/1000
→ Action chosen: left (Exploitation)
→ Reward received: -0.01
→ Updated Q[left]: -0.01000 (was 1.50000)
→ Agent State: (0, 0)
→ Current Grid:
[[1. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 6.]]
🚫 Invalid move - wall hit.

📘 Episode 2/1000
→ Action chosen: right (Exploitation)
→ Reward received: 0
→ Updated Q[right]: 0.00000 (was 1.50000)
→ Agent State: (0, 1)
→ Current Grid:
[[5. 1. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 6.]]

📘 Episode 3/1000
→ Action chosen: up (Exploitation)
→ Reward received: -0.01
→ Updated Q[up]: -0.01000 (was 1.50000)
→ Agent State: (0, 1)
→ Current Grid:
[[5. 1. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0.

In [57]:
gw_diamonds = GridWorld_diamonds(grid_size, (0, 0), (grid_size-1, grid_size-1), num_diamonds=3)

# Run the contextual bandit algo, with the diamond grid-world
contextual_bandit(gw_diamonds, epsilon, episodes=1000)

Diamonds placed at: [(4, 3), (3, 1), (0, 1)]

--- Starting Contextual Bandit Run ---
Grid Size: 6x6
Start Position: (0, 0), Goal Position: (5, 5)
Epsilon (exploration rate): 0.05
Total Episodes: 1000


📘 Episode 1/1000
→ Action chosen: left (Exploitation)
→ Reward received: -0.01
→ Updated Q[left]: -0.01000 (was 0.00000)
→ Agent State: (0, 0)
→ Current Grid:
[[1. 3. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 3. 0. 0. 0. 0.]
 [0. 0. 0. 3. 0. 0.]
 [0. 0. 0. 0. 0. 6.]]
🚫 Invalid move - wall hit.

📘 Episode 2/1000
→ Action chosen: right (Exploitation)
→ Reward received: 0.5
→ Updated Q[right]: 0.50000 (was 0.00000)
→ Agent State: (0, 1)
→ Current Grid:
[[5. 1. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 3. 0. 0. 0. 0.]
 [0. 0. 0. 3. 0. 0.]
 [0. 0. 0. 0. 0. 6.]]
💎 Diamond collected!

📘 Episode 3/1000
→ Action chosen: right (Exploitation)
→ Reward received: 0
→ Updated Q[right]: 0.25000 (was 0.50000)
→ Agent State: (0, 2)
→ Current Grid:
[[5. 0. 1. 0. 0. 0.]


## Temporal-difference methods

In [None]:
actions = ['left', 'right', 'up', 'down']  # Define the actions available in the grid-world

def epsilon_greedy(state, Q_values, epsilon=0.1):
    """Epsilon-greedy policy for action selection."""
    if random.random() < epsilon:
        return random.choice(actions)  # Explore
    else:
        return max(actions, key=lambda a:Q_values.get((state, a), 0))  # Exploit best action

def random_policy():
    """Random policy for action selection."""
    return random.choice(actions)  # Random action 

# The policy needs to simulate what next_state would be, for each action,
# without actually stepping in the environment yet.
def epsilon_greedy_policy(state, state_value, epsilon=0.1):
    """Epsilon-greedy policy using state values."""
    if random.random() < epsilon:
        return random.choice(actions)  # Explore
    else:
        # Choose the action leading to the highest-valued neighbor state
        best_action = None
        best_value = -float('inf')
        x, y = state

        for action in actions:
            if action == 'up':
                next_state = (x - 1, y)
            elif action == 'down':
                next_state = (x + 1, y)
            elif action == 'left':
                next_state = (x, y - 1)
            elif action == 'right':
                next_state = (x, y + 1)
            
            if gridworld.is_valid(next_state):
                value = state_value[next_state]
                if value > best_value:
                    best_value = value
                    best_action = action
        
        return best_action if best_action else random.choice(actions)    

### TD(0): policy evaluation

In [None]:
# TD(0) algorithm for estimating state values   
def td_zero(gridworld, episodes, policy):
    alpha = 0.1
    gamma = 0.7
    state_value = np.zeros((grid_size, grid_size))

    for _ in range(episodes):
        gridworld.reset()
        state = gridworld.start

        for _ in range(100):  # max steps per episode
            action = policy(state, state_value, epsilon=0.1)
            reward = gridworld.step(action)
            next_state = gridworld.state

            # TD(0) update
            state_value[state] += alpha * (reward + gamma * state_value[next_state] - state_value[state])
            state = next_state

            if state == gridworld.goal:
                break

    # Final state value table
    print("\n📊 Final state value function:")
    for row in state_value:
        print(" ".join(f"{v:>8.5f}" for v in row))

In [59]:
grid_size = 6
gridworld = GridWorld(grid_size, (0, 0), (grid_size-1, grid_size-1))
td_zero(gridworld, episodes = 1000, policy=epsilon_greedy_policy) # TD(0) only looks one step ahead when updating the value of a state.


📊 Final state value function:
-0.00038  0.00011 -0.00075 -0.00052 -0.00022 -0.00065
-0.00087  0.00020  0.00009  0.00019  0.00002 -0.00029
-0.00014  0.00018  0.00020  0.00018  0.00019  0.00020
-0.00009  0.00020  0.00019  0.00019  0.00019 -0.00062
-0.00071 -0.00006  0.00020  0.00018  0.00019 -0.00030
-0.00046 -0.00079 -0.00002  0.00020 -0.00156  0.00000


❗ The goal state doesn’t have a high value — it has zero value — because there are no future rewards after it.

### Policy control

#### SARSA

In [None]:
def sarsa(gridworld, policy, episodes):
    """SARSA algorithm for on-policy learning."""
    alpha = 0.1  # Learning rate
    gamma = 0.7  # Discount factor
    Q_values = {} # Lazy initialization of Q-values

    # Loop through episodes
    for episode in range(episodes):
        gridworld.reset()
        state = gridworld.start
        action = policy(state, Q_values)

        # Loop through steps in the episode
        for _ in range(100): # steps per episode
            reward = gridworld.step(action)
            next_state = gridworld.state
            next_action = policy(next_state, Q_values)

            # SARSA update
            Q_values[(state, action)] = Q_values.get((state, action), 0.0) + alpha * (
                reward + gamma * Q_values.get((next_state, next_action), 0.0) - Q_values.get((state, action), 0.0))
            
            state = next_state
            action = next_action

        # Check if the goal is reached
        if gridworld.state == gridworld.goal:
            break

    return Q_values

In [66]:
# Example usage of the GridWorld class and the learning algorithms
gridworld = GridWorld(grid_size, (0, 0), (5, 5))

# Run the learning algorithms
q_values = sarsa(gridworld, policy=epsilon_greedy, episodes=1000)

print("Q-values after training:")
for (state, action), value in q_values.items():
    print(f"State: {state}, Action: {action}, Q-value: {value}")
    
max_state = max(q_values, key=q_values.get)
print(f"\nMax Q-value state: {max_state}, Value: {q_values[max_state]}")

Q-values after training:
State: (0, 0), Action: left, Q-value: 0.01045086124764508
State: (0, 0), Action: right, Q-value: 0.03293683942666527
State: (0, 1), Action: left, Q-value: 0.014707748966010107
State: (0, 1), Action: right, Q-value: 0.04010135676401819
State: (0, 2), Action: down, Q-value: 0.06473156943227487
State: (1, 2), Action: left, Q-value: 0.04353998110427265
State: (1, 1), Action: left, Q-value: 0.029717184015158257
State: (1, 0), Action: left, Q-value: 0.016984777483033682
State: (1, 0), Action: right, Q-value: 0.04325655711181308
State: (1, 1), Action: up, Q-value: 0.02977296263177732
State: (0, 0), Action: down, Q-value: 0.026763818708386514
State: (1, 0), Action: up, Q-value: 0.01963558623255038
State: (0, 1), Action: up, Q-value: 0.017996158172225062
State: (0, 0), Action: up, Q-value: 0.011042796755872537
State: (0, 2), Action: left, Q-value: 0.008292948574338283
State: (0, 2), Action: right, Q-value: 0.011777900076288504
State: (0, 3), Action: left, Q-value: 0.017

In [77]:
# Example usage of the GridWorld class and the learning algorithms
gridworld = GridWorld_diamonds(grid_size, (0, 0), (5, 5), num_diamonds=3)

# Run the learning algorithms
q_values = sarsa(gridworld, policy=epsilon_greedy, episodes=1000)

print("Q-values after training:")
for (state, action), value in q_values.items():
    print(f"State: {state}, Action: {action}, Q-value: {value}")
    
max_state = max(q_values, key=q_values.get)
print(f"\nMax Q-value state: {max_state}, Value: {q_values[max_state]}")

Q-values after training:
State: (0, 0), Action: left, Q-value: 0.09875247470354409
State: (0, 0), Action: right, Q-value: 0.11349477865967755
State: (0, 1), Action: left, Q-value: 0.0988122736640191
State: (0, 1), Action: right, Q-value: 0.12175226521680887
State: (0, 2), Action: left, Q-value: 0.0519583156466827
State: (0, 2), Action: up, Q-value: 0.008010052955418496
State: (0, 2), Action: down, Q-value: 0.09385984754663058
State: (1, 2), Action: left, Q-value: 0.11032482962831021
State: (1, 1), Action: left, Q-value: 0.11587320469611165
State: (1, 0), Action: left, Q-value: 0.11639275001585983
State: (1, 0), Action: right, Q-value: 0.1344397115428005
State: (1, 1), Action: down, Q-value: 0.1784324644543239
State: (2, 1), Action: left, Q-value: 0.14062557888372296
State: (2, 0), Action: left, Q-value: 0.0909233617248472
State: (2, 0), Action: right, Q-value: 0.2187233990997037
State: (0, 2), Action: right, Q-value: 0.008655596743256855
State: (0, 3), Action: down, Q-value: 0.00922913

#### Q-learning

In [71]:
def q_learning(gridworld, policy, episodes):
    """Q-learning for Off-policy Learning."""
    alpha = 0.1  # Learning rate
    gamma = 0.7  # Discount factor
    Q_values = {}  # Dictionary to store Q-values

     # Loop through episodes
    for _ in range(episodes):
        gridworld.reset()
        state = gridworld.start

        # Loop through steps in the episode
        for _ in range(100): # steps per episode
            action = policy(state, Q_values)
            reward = gridworld.step(action)
            next_state = gridworld.state
            next_action = max(actions, key=lambda a: Q_values.get((next_state, a), 0.0)) # Off-policy action selection (greedy)

            # Q-learning update
            Q_values[(state, action)] = Q_values.get((state, action), 0.0) + alpha * (
                reward + gamma * Q_values.get((next_state, next_action), 0.0) - Q_values.get((state, action), 0.0))
            
            state = next_state

        # Check if the goal is reached
        if gridworld.state == gridworld.goal:
            break

    return Q_values

##### Standard Gridworld

In [72]:
# Example usage of the GridWorld class and the learning algorithms
gridworld = GridWorld(grid_size, (0, 0), (5, 5))

# Run the learning algorithms
q_values = q_learning(gridworld, policy=epsilon_greedy, episodes=1000)

print("Q-values after training:")
for (state, action), value in q_values.items():
    print(f"State: {state}, Action: {action}, Q-value: {value}")
    
max_state = max(q_values, key=q_values.get)
print(f"\nMax Q-value state: {max_state}, Value: {q_values[max_state]}")

Q-values after training:
State: (0, 0), Action: left, Q-value: -0.009999999999999992
State: (0, 0), Action: right, Q-value: 0.0
State: (0, 1), Action: left, Q-value: 0.0
State: (0, 0), Action: down, Q-value: 0.0
State: (1, 0), Action: left, Q-value: -0.009999999999999988
State: (1, 0), Action: right, Q-value: 0.0
State: (1, 1), Action: left, Q-value: 0.0
State: (1, 1), Action: right, Q-value: 0.0
State: (1, 2), Action: left, Q-value: 0.0
State: (1, 1), Action: down, Q-value: 0.0
State: (2, 1), Action: left, Q-value: 0.0
State: (2, 0), Action: left, Q-value: -0.009999997424145314
State: (2, 0), Action: right, Q-value: 0.0
State: (2, 0), Action: up, Q-value: 0.0
State: (2, 1), Action: down, Q-value: 0.0
State: (3, 1), Action: down, Q-value: 0.0
State: (4, 1), Action: left, Q-value: 0.0
State: (4, 0), Action: left, Q-value: -0.008499053647030009
State: (0, 1), Action: up, Q-value: -0.009999999999999992
State: (0, 1), Action: right, Q-value: 0.0
State: (0, 2), Action: left, Q-value: 0.0
St

>If the reward is only at the goal and the agent doesn’t reach it in early episodes, all Q-values will update using only zero or negative rewards, so there’s no positive signal to propagate.
This happens because the only reward can be obtained reaching the goal, that is quite difficult to reach without exploring. In this way, the agent learns just to minimize damage, not reach a reward, because the Q-values are mostly negative.

##### Diamonds Gridworld

In [78]:
# Example usage of the GridWorld class and the learning algorithms
gridworld = GridWorld_diamonds(grid_size, (0, 0), (5, 5), num_diamonds=3)

# Run the learning algorithms
q_values = q_learning(gridworld, policy=epsilon_greedy, episodes=1000)

print("Q-values after training:")
for (state, action), value in q_values.items():
    print(f"State: {state}, Action: {action}, Q-value: {value}")
    
max_state = max(q_values, key=q_values.get)
print(f"\nMax Q-value state: {max_state}, Value: {q_values[max_state]}")

Q-values after training:
State: (0, 0), Action: left, Q-value: 0.11645552940520185
State: (0, 0), Action: right, Q-value: 0.1730146238564294
State: (0, 1), Action: left, Q-value: 0.10314146289627221
State: (0, 1), Action: right, Q-value: 0.09223978628205648
State: (0, 2), Action: left, Q-value: 0.14456104500187966
State: (0, 0), Action: down, Q-value: 0.15487770571358786
State: (1, 0), Action: left, Q-value: 0.12208954389542381
State: (1, 0), Action: right, Q-value: 0.15696580951375155
State: (1, 1), Action: left, Q-value: 0.1433548938795396
State: (1, 0), Action: up, Q-value: 0.1339348086496393
State: (0, 0), Action: up, Q-value: 0.12262016409362758
State: (0, 1), Action: down, Q-value: 0.21901679345266076
State: (1, 1), Action: down, Q-value: 0.31668207423834505
State: (2, 1), Action: left, Q-value: 0.1927825729869129
State: (2, 0), Action: left, Q-value: 0.12015753463310869
State: (2, 0), Action: up, Q-value: 0.12178477882888589
State: (1, 0), Action: down, Q-value: 0.24547283044985

>Indeed, if we use the GridWorld with diamonds as environment, which contains way more possible intermediate rewards (the diamonds themselves, as 0.5) the agent is more encouraged to explore, even though in a indirect way, being able to reach the goal.

## Value-function approximation via DL techniques

In [51]:
from tensorflow.keras import models
from tensorflow.keras.layers import Flatten, Dense, Conv2D
from tensorflow.keras.optimizers import Adam
from collections import deque

With a DQN, I'm going to replace the tabular state-value function with a neural network that approximates the Q-value function Q(s, a).

### DQN agent for GridWorld

In [None]:
# Naive DQN implementation for GridWorld
class DQN:
    def __init__(self, state_shape, n_actions, gamma=0.99, lr=0.001):
        self.state_shape = state_shape
        self.n_actions = n_actions
        self.gamma = gamma
        self.model = self.build_model(state_shape, n_actions, lr)

    def build_model(self, state_shape, n_actions, lr):
        """Builds a simple feedforward neural network for DQN."""
        model = models.Sequential()
        model.add(Flatten(input_shape=state_shape))
        model.add(Dense(64, activation='relu'))
        model.add(Dense(64, activation='relu'))
        model.add(Dense(n_actions, activation='linear'))

        model.compile(optimizer=Adam(learning_rate=lr), loss='mse')
        return model    

    def build_conv_model(self, state_shape, n_actions, lr):
        """Builds a convolutional neural network for DQN."""
        model = models.Sequential()
        model.add(Conv2D(32, (3, 3), activation='relu', input_shape=state_shape))
        model.add(Conv2D(64, (3, 3), activation='relu'))
        model.add(Flatten())
        model.add(Dense(64, activation='relu'))
        model.add(Dense(n_actions, activation='linear'))

        model.compile(optimizer=Adam(learning_rate=lr), loss='mse')
        return model

    def epsilon_greedy_policy(self, state, epsilon=0.1):
        """Epsilon-greedy policy for action selection."""
        if np.random.rand() < epsilon:
            return random.choice(range(self.n_actions))
        else:
            q_values = self.model.predict(np.array([state]), verbose=0)[0]
            return np.argmax(q_values)

    def train(self, env, episodes=500, max_steps=100, epsilon=0.1):
        """Naive DQN training without experience replay buffer."""
        for _ in range(episodes):
            env.reset()
            state = env.get_board()

            for _ in range(max_steps):
                action_idx = self.epsilon_greedy_policy(state, epsilon)
                action = actions[action_idx] # e.g. action[0] = 'left'
                reward = env.step(action)
                next_state = env.get_board()
                done = (env.state == env.goal)

                # Q-learning target (no replay buffer)
                q_values = self.model.predict(np.array([state]), verbose=0)[0] # in q-learning they are rnd initialized
                next_q_values = self.model.predict(np.array([next_state]), verbose=0)[0]
                target = q_values.copy()

                # Update the target for the action taken
                if done: # If the episode is done, we only consider the immediate reward
                    target[action_idx] = reward
                else:
                    target[action_idx] = reward + self.gamma * np.max(next_q_values)

                # Train the model
                self.model.fit(np.array([state]), np.array([target]), epochs=1, verbose=0)

                # Update state
                state = next_state
                if done:
                    break
        print("Naive DQN training complete.")
    
    def train_replay_buffer(self, env, episodes=500, max_steps=100, epsilon=0.1, batch_size=32):
        """DQN with experience replay buffer."""

        # Initialize the replay buffer
        replay_buffer = deque(maxlen=2000)

        for _ in range(episodes):
            env.reset()
            state = env.get_board()

            for _ in range(max_steps):
                action_idx = self.epsilon_greedy_policy(state, epsilon)
                action = actions[action_idx]
                reward = env.step(action)
                next_state = env.get_board()
                done = (env.state == env.goal)

                # Store the experience in the replay buffer
                replay_buffer.append((state, action_idx, reward, next_state, done))

                # If the replay buffer is not full, skip training, otherwise sample a batch
                # The model only starts training once there is enough data in the buffer to sample a meaningful batch,
                if len(replay_buffer) < batch_size:
                    state = next_state
                    if done:
                        break
                    continue

                # Sample a batch from the replay buffer
                batch = random.sample(replay_buffer, batch_size)
                batch_states, batch_action_idxs, batch_rewards, batch_next_states, batch_dones = zip(*batch)
                batch_states = np.array(batch_states)
                batch_action_idxs = np.array(batch_action_idxs)
                batch_rewards = np.array(batch_rewards)
                batch_next_states = np.array(batch_next_states)
                batch_dones = np.array(batch_dones)
                
                # Q-learning target (with replay buffer)
                q_values = self.model.predict(batch_states, verbose=0)
                next_q_values = self.model.predict(batch_next_states, verbose=0)
                targets = q_values.copy()
                
                # Update the target for the actions taken, for each sample in the batch
                for i in range(batch_size):
                    if batch_dones[i]:
                        targets[i][batch_action_idxs[i]] = batch_rewards[i]
                    else:
                        targets[i][batch_action_idxs[i]] = batch_rewards[i] + self.gamma * np.max(next_q_values[i])

                # Train the model
                self.model.fit(batch_states, targets, epochs=1, verbose=0)

                # Update state
                state = next_state
                if done:
                    break
        print("DQN (with replay buffer) training complete.")

#### ⚙️ Standard Grid-World env.

##### Example usage of the DQN class: naive

In [61]:
# Example usage of the NaiveDQN class
grid_size = 7  # Define the grid size
actions = ['left', 'right', 'up', 'down']  # Define the actions available in the grid-world
gridworld = GridWorld(grid_size, (0, 0), (grid_size-1, grid_size-1))

# Initialize and train the Naive DQN agent
naive_dqn = DQN(state_shape=(grid_size, grid_size), n_actions=len(actions))
naive_dqn.train(gridworld, episodes=10, max_steps=120)

Naive DQN training complete.


In [62]:
gridworld.reset()  # Reset the environment before making predictions

# Simulate some steps in the grid-world
gridworld.step("right")
gridworld.step("down")
gridworld.step("down")
gridworld.step("right")
gridworld.step("right")
gridworld.step("down")
gridworld.step("down")
gridworld.step("right")
gridworld.step("down")
gridworld.step("down")
gridworld.step("right")

# Print the current state of the grid-world and predict Q-values
print(f"Current state:\n{gridworld.get_board()}")  # Print the current board state
print("\nPredicting Q-values for the current state:")
pred_q_values = naive_dqn.model.predict(np.array([gridworld.get_board()]), verbose=0)  # Predict Q-values for the current state
for action, q_value in zip(actions, pred_q_values[0]):
    print(f"Action: {action}, Q-value: {q_value:.4f}")

Current state:
[[5. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 1. 6.]]

Predicting Q-values for the current state:
Action: left, Q-value: 1.2429
Action: right, Q-value: 1.1693
Action: up, Q-value: 1.0924
Action: down, Q-value: 1.1578


> As we can see, the naive DQN algo (without the experience replay mechanism) doesn't learn good Q-values.

##### Example usage of the DQN class with experience replay

In [None]:
# Example usage of the DQN class with experience replay
grid_size = 7  # Define the grid size
gridworld = GridWorld(grid_size, (0, 0), (grid_size-1, grid_size-1))

# Initialize and train the Naive DQN agent
dqn = DQN(state_shape=(grid_size, grid_size), n_actions=len(actions))
dqn.train_replay_buffer(gridworld, episodes=10, max_steps=120, epsilon=0.1, batch_size=32)

DQN (with replay buffer) training complete.


##### Printing learned Q-values

In [None]:
gridworld.reset()  # Reset the environment before making predictions

# Simulate some steps in the grid-world
gridworld.step("right")
gridworld.step("down")
gridworld.step("down")
gridworld.step("right")
gridworld.step("right")
gridworld.step("down")
gridworld.step("down")
gridworld.step("right")
gridworld.step("down")
gridworld.step("down")
gridworld.step("right")

# Print the current state of the grid-world and predict Q-values
print(f"Current state:\n{gridworld.get_board()}")  # Print the current board state
print("\nPredicting Q-values for the current state:")
pred_q_values = naive_dqn.model.predict(np.array([gridworld.get_board()]), verbose=0)  # Predict Q-values for the current state
for action, q_value in zip(actions, pred_q_values[0]):
    print(f"Action: {action}, Q-value: {q_value:.4f}")

Current state:
[[5. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 1. 6.]]

Predicting Q-values for the current state:
Action: left, Q-value: 9.0078
Action: right, Q-value: 9.2409
Action: up, Q-value: 8.8651
Action: down, Q-value: 9.1393


> The DQN algo is behaving quite well: it prefers the optimal action "right" in that state, such that the Q-values reflect the expected future reward.

#### 💎 Diamond Grid-World env.

##### Example usage of the DQN class

In [64]:
# Example usage of the NaiveDQN class
grid_size = 7  # Define the grid size
actions = ['left', 'right', 'up', 'down']  # Define the actions available in the grid-world
gridworld = GridWorld_diamonds(grid_size, (0, 0), (grid_size-1, grid_size-1), num_diamonds=int(0.1* grid_size**2))

# Initialize and train the DQN agent
dqn = DQN(state_shape=(grid_size, grid_size), n_actions=len(actions))
dqn.train_replay_buffer(gridworld, episodes=10, max_steps=120)

DQN (with replay buffer) training complete.


##### Printing learned Q-values

In [67]:
gridworld.reset()  # Reset the environment before making predictions

# Simulate some steps in the grid-world
gridworld.step("right")
gridworld.step("down")
gridworld.step("down")
gridworld.step("right")

# Print the current state of the grid-world and predict Q-values
print(f"Current state:\n{gridworld.get_board()}")  # Print the current board state
print("\nPredicting Q-values for the current state:")
pred_q_values = dqn.model.predict(np.array([gridworld.get_board()]), verbose=0)  # Predict Q-values for the current state
for action, q_value in zip(actions, pred_q_values[0]):
    print(f"Action: {action}, Q-value: {q_value:.4f}")

Current state:
[[5. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 1. 0. 0. 0. 3.]
 [0. 0. 3. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 3. 0.]
 [0. 0. 0. 0. 0. 0. 6.]]

Predicting Q-values for the current state:
Action: left, Q-value: 305.1731
Action: right, Q-value: 303.2148
Action: up, Q-value: 303.3550
Action: down, Q-value: 297.1938


> Since the reward landscape is more complex and noisy, it makes harder for the agent to learn the optimal path to the goal. Hence, the learned Q-values are do not reflect the expected cumulative reward. Moreover, they are much higher than the Q-values predicted within the standard environment because the expected sum of future reward can be much larger due to of the intermediate rewards that come from the collection of the diamonds.

## Policy gradient methods
In this section, we will experiment the so-called policy gradient methods, where the agent will learn to approximate its policy without relying on the Q-values. Basically, it will directly approximate the policy, without using Q-values as a proxy to update it.

### REINFORCE

REINFORCE is a Monte Carlo policy gradient method. In its most basic version, it uses the cumulative reward \( G_t \) as the target for updating the policy. An improved version, REINFORCE with baseline, uses a state-value function to reduce the variance of the updates. Unlike A2C, this state-value function is not updated using bootstrapping (i.e., it does not exploit the value estimates of subsequent states).

### A2C

Advantage Actor-Critic (A2C) is a one-step temporal-difference policy gradient method. It uses bootstrapping to estimate the state-value function, which serves as the "critic" to evaluate the actions suggested by the "actor" (the policy). The critic helps reduce the variance of the policy gradient updates, making learning more stable and efficient compared to pure policy gradient methods like REINFORCE.

### PPO