In [1]:
import numpy as np
import random

In [17]:
class GridWorld:
    def __init__(self, shape, start, goal, traps, rewards):
        self.shape = shape  # (height, width)
        self.start = start  # (row, col)
        self.goal = goal  # (row, col)
        self.traps = traps  # list of (row, col) tuples
        self.rewards = rewards  # dictionary of (row, col): reward pairs

        self.states = self.get_states()
        self.actions = ['up', 'down', 'left', 'right']

    def get_states(self):
        # Return a list of all possible states (positions)
        states = []
        for row in range(self.shape[0]):
            for col in range(self.shape[1]):
                states.append((row, col))
        return states

    def transition_func(self, state, action):
        # Implement the transition function
        row, col = state
        if action == 'up':
            new_row = max(row - 1, 0)
            new_col = col
        elif action == 'down':
            new_row = min(row + 1, self.shape[0] - 1)
            new_col = col
        elif action == 'left':
            new_row = row
            new_col = max(col - 1, 0)
        elif action == 'right':
            new_row = row
            new_col = min(col + 1, self.shape[1] - 1)

        new_state = (new_row, new_col)
        reward = self.rewards.get(new_state, 0)
        is_terminal = (new_state == self.goal) or (new_state in self.traps)
        return new_state, reward, is_terminal
    
    def reset(self):
        self.current_state = self.start
        return self.current_state

In [18]:
shape = (4, 4)
start = (0, 0)
goal = (3, 3)
traps = [(1, 1)]
rewards = {goal: 1, (1, 1): -1}

env = GridWorld(shape, start, goal, traps, rewards)
print("States:", env.states)

States: [(0, 0), (0, 1), (0, 2), (0, 3), (1, 0), (1, 1), (1, 2), (1, 3), (2, 0), (2, 1), (2, 2), (2, 3), (3, 0), (3, 1), (3, 2), (3, 3)]


In this implementation, we define a q_learning function that takes the environment (env) and hyperparameters like learning rate (alpha), discount factor (gamma), exploration rate (epsilon), and the number of episodes (episodes).

The function initializes the Q-values to 0 for all state-action pairs. Then, for each episode, it starts from the initial state and iteratively updates the Q-values using the Q-learning update rule:


Q(s, a) += alpha * (reward + gamma * max_a'(Q(s', a')) - Q(s, a))
After training, the function returns the learned Q-values.

We then create a policy by selecting the action with the maximum Q-value for each state. Finally, we print the optimal policy by iterating over the grid and printing the optimal action for each state.

In [4]:
def q_learning(env, alpha=0.1, gamma=0.9, epsilon=0.1, episodes=10000):
    # Initialize Q-values
    q_values = {}
    for state in env.states:
        q_values[state] = {action: 0.0 for action in env.actions}

    for episode in range(episodes):
        state = env.start
        done = False

        while not done:
            # Choose an action using epsilon-greedy
            if random.random() < epsilon:
                action = random.choice(env.actions)
            else:
                action = max(q_values[state], key=q_values[state].get)

            # Take the action and observe the new state, reward, and terminal flag
            new_state, reward, done = env.transition_func(state, action)

            # Update Q-value
            q_values[state][action] += alpha * (
                reward + gamma * max(q_values[new_state].values()) - q_values[state][action]
            )

            state = new_state

    return q_values

# Train the agent
q_values = q_learning(env)

# Print the optimal policy
policy = {}
for state in env.states:
    policy[state] = max(q_values[state], key=q_values[state].get)

print("Optimal Policy:")
for row in range(env.shape[0]):
    for col in range(env.shape[1]):
        state = (row, col)
        action = policy[state]
        print(f"{action[0].upper()}", end=" ")
    print()

Optimal Policy:
R R R D 
U U U D 
U D U D 
U R R U 


In the basic implementation, we used a tabular representation to store the Q-values for each state-action pair. However, this approach becomes intractable for problems with large or continuous state spaces, as the memory requirements grow exponentially.

Neural networks can be used as function approximators to represent the Q-value function Q(s, a). Instead of storing individual Q-values for each state-action pair, we can train a neural network to approximate the Q-value function, taking the state and action as inputs and outputting the corresponding Q-value.

In [6]:
import torch
import torch.nn as nn

class QNetwork(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim + action_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, 1)

    def forward(self, state, action):
        x = torch.cat([state, action], dim=1)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        q_value = self.fc3(x)
        return q_value

# Example usage
state_dim = 4  # Dimensions of the state space
action_dim = 2  # Dimensions of the action space
hidden_dim = 32

q_network = QNetwork(state_dim, action_dim, hidden_dim)
state = torch.randn(1, state_dim)  # Sample state
action = torch.randn(1, action_dim)  # Sample action
q_value = q_network(state, action)

In [34]:
import numpy as np

class GridWorld:
    def __init__(self, shape, start, goal, traps, rewards):
        self.shape = shape  # (height, width)
        self.start = start  # (row, col)
        self.goal = goal  # (row, col)
        self.traps = traps  # list of (row, col) tuples
        self.rewards = rewards  # dictionary of (row, col): reward pairs

        self.states = self.get_states()
        self.actions = [0, 1, 2, 3]  # up, down, left, right
        self.action_space = range(len(self.actions))
        self.state_dim = 2  # (row, col)
        self.action_dim = 1  # one-hot encoding of actions
        self.current_state = start

    def get_states(self):
        # Return a list of all possible states (positions)
        states = []
        for row in range(self.shape[0]):
            for col in range(self.shape[1]):
                states.append((row, col))
        return states

    def transition_func(self, state, action):
        # Implement the transition function
        row, col = state
        if action == 0:  # up
            new_row = max(row - 1, 0)
            new_col = col
        elif action == 1:  # down
            new_row = min(row + 1, self.shape[0] - 1)
            new_col = col
        elif action == 2:  # left
            new_row = row
            new_col = max(col - 1, 0)
        elif action == 3:  # right
            new_row = row
            new_col = min(col + 1, self.shape[1] - 1)

        next_state = (new_row, new_col)
        reward = self.rewards.get(next_state, 0)
        is_terminal = (next_state == self.goal) or (next_state in self.traps)
        self.current_state = next_state
        return next_state, reward, is_terminal

    def reset(self):
        # Reset the environment to the start state
        return self.start

    def step(self, action):
        # Take a step in the environment
        state = self.current_state
        next_state, reward, is_terminal = self.transition_func(state, action)
        self.current_state = next_state
        return next_state, reward, is_terminal, {}

# Example Grid World
shape = (4, 4)
start = (0, 0)
goal = (3, 3)
traps = [(1, 1)]
rewards = {goal: 1, (1, 1): -1}

env = GridWorld(shape, start, goal, traps, rewards)

In [59]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque

class QNetwork(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim + action_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, 1)

    def forward(self, state, action):
        x = torch.cat([state, action], dim=1)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        q_value = self.fc3(x)
        return q_value

# Hyperparameters
state_dim = 4
action_dim = 2
hidden_dim = 32
batch_size = 64
gamma = 0.99
epsilon = 1.0
epsilon_min = 0.01
epsilon_decay = 0.995
replay_buffer_size = 10000
update_target_every = 1000
num_episodes = 10000

# Initialize networks
q_network = QNetwork(state_dim, action_dim, hidden_dim)
target_network = QNetwork(state_dim, action_dim, hidden_dim)
target_network.load_state_dict(q_network.state_dict())  # Initialize target network weights
optimizer = optim.Adam(q_network.parameters())
replay_buffer = deque(maxlen=replay_buffer_size)

# Training loop
for episode in range(num_episodes):
    state = env.reset()
    done = False
    episode_reward = 0

    while not done:
        # Choose action using epsilon-greedy policy
        if random.random() < epsilon:
            action = random.choice(list(env.action_space))  # Explore
        else:
            state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
            q_values = q_network(state_tensor)
            action = torch.argmax(q_values).item()  # Exploit

        # Take action and observe next state and reward
        next_state, reward, done, _ = env.step(action)
        episode_reward += reward

        # Store transition in replay buffer
        replay_buffer.append((state, action, reward, next_state, done))

        # Sample batch from replay buffer
        if len(replay_buffer) >= batch_size:
            batch = random.sample(replay_buffer, batch_size)
            states, actions, rewards, next_states, dones = zip(*batch)

        # Compute target Q-values
        next_state_tensor = torch.tensor(next_state, dtype=torch.float32).unsqueeze(0)
        if action < action_dim:
            action_tensor = torch.nn.functional.one_hot(torch.tensor(action), num_classes=action_dim)
        else:
            print(f"Invalid action: {action}")

        action_tensor = action_tensor.unsqueeze(0)
        print(next_state_tensor.shape, action_tensor.shape)
        target_q_values = target_network(next_state_tensor, action_tensor).max(dim=1)[0]
        target_q_values = (rewards + gamma * target_q_values * (1 - dones))

        # Compute Q-values for current state-action pairs
        state_tensor = torch.tensor(states, dtype=torch.float32)
        action_tensor = torch.tensor(actions, dtype=torch.int64)
        q_values = q_network(state_tensor, action_tensor.unsqueeze(1)).squeeze()

        # Compute loss and update Q-network
        loss = nn.MSELoss()(q_values, target_q_values.detach())
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Update target network weights
        if episode % update_target_every == 0:
            target_network.load_state_dict(q_network.state_dict())

        # Update epsilon
        epsilon = max(epsilon_min, epsilon * epsilon_decay)

        state = next_state

    print(f"Episode {episode}, Reward: {episode_reward}")

Invalid action: 2
torch.Size([1, 2]) torch.Size([1, 1, 2])


RuntimeError: Tensors must have same number of dimensions: got 2 and 3