## 1. Q-Learning

### 1.1 Define the State and Action Space

Define the state space, which is a $6\times 6$ grid world.

In addition,

- the start state is at $(0,0)$ -- the upper left corner
- the goal state is at $(5,5)$ -- the lower right corner

In addition, there are some obstacles.

The actions are defined based on the structure of the grid world.

In [14]:
import numpy as np
import random

# Define the grid world environment
grid_size = 6
obstacles = [(1, 1), (1, 2), (2, 2), (3,2)]  # Define obstacles
start_state = (0, 0)
goal_state = (5, 5)

# Define actions
action_set = [(0, 1), (0, -1), (1, 0), (-1, 0)]  # Right, Left, Down, Up

# Plot the grid world
def plot_grid():
    for i in range(grid_size):
        for j in range(grid_size):
            if (i, j) == start_state:
                print("S", end=" ")
            elif (i, j) == goal_state:
                print("G", end=" ")
            elif (i, j) in obstacles:
                print("X", end=" ")
            else:
                print(".", end=" ")
        print()

plot_grid()


S . . . . . 
. X X . . . 
. . X . . . 
. . X . . . 
. . . . . . 
. . . . . G 


Some additional functions about the environment, for example,

- evaluate whether a state is valid
- get the next step based on the current state and action

In [15]:
def is_valid_state(state):
    x, y = state
    return (0 <= x < grid_size) and (0 <= y < grid_size) and (state not in obstacles)

# Modification for (a)
def get_next_state_stochastic(current_state, action):

    # Modification for (a)
    p = random.random()
    if p < 0.85:
      chosen = action
    elif p < 0.90:
      chosen = (action[1], -action[0])
    else:
      chosen = (-action[1], action[0])
    next_state = (current_state[0] + chosen[0], current_state[1] + chosen[1])

    if is_valid_state(next_state):
        # If the next state is valid
        return next_state
    else:
        # Otherwise, stay in the same place
        return current_state

### 1.2 Reward Function

The reward function differentiates three conditions

- Achieve the goal
- Hit an obstacle
- Move one step: not that if this is a negative value, it means every step the agent takes, it will get a negative reward.

In [16]:
def get_reward(state):
    if state == goal_state:
        return 10
    elif state in obstacles:
        return -5
    else:
        return -1

### 1.3 Define the $Q(s,a)$ Function

Define the Q function $Q(s,a)$. When both the environment and the actions are discrete, we can use a table to represent this $Q$ function.

Note that, all cells are initialized as $0$.

In [17]:
# Initialize Q-table
q_table = np.zeros((grid_size, grid_size, 4))  # 4 actions: up, down, left, right

### 1.4 $\epsilon$-greedy Policy

Since $Q(s,a)$ is the not the optimal function during learning, a typical way of constructing a policy function is to use an $\epsilon$-greedy strategy.

$$a = \left\{\begin{array}{ll} \arg\max_a Q(s,a) & \text{with probability}~ 1-\epsilon \\ \text{a random action} & \epsilon\\ \end{array} \right.$$

The construction explicitly balances the idea of exploration vs. exploitation.

Note that, the action list is {up, down, left, right}.



In [18]:
def choose_action(state, epsilon):
    if random.uniform(0, 1) < epsilon:
        return random.choice(action_set)  # Explore
    else:
        return action_set[np.argmax(q_table[state[0], state[1], :])]  # Exploit

### 1.5 Learning

$$Q(s_t, a_t)\leftarrow Q(s_t, a_t) + \eta\cdot (r_t + \gamma \max_{a}Q(s_{t+1}, a) - Q(s_t, a_t))$$

In [19]:
# Hyperparameters
learning_rate = 0.8
discount_factor = 0.95
epsilon = 0.1  # Exploration rate
num_episodes = 1000

# Q-learning algorithm
for episode in range(num_episodes):
    current_state = start_state
    while current_state != goal_state:
        action = choose_action(current_state, epsilon)

        # Modification for (b)
        next_state = get_next_state_stochastic(current_state, action)

        reward = get_reward(next_state)
        action_index = action_set.index(action)

        q_table[current_state[0], current_state[1], action_index] = q_table[current_state[0], current_state[1], action_index] + learning_rate * (
                reward + discount_factor * np.max(q_table[next_state[0], next_state[1], :]) - q_table[current_state[0], current_state[1], action_index]
        )
        current_state = next_state

print("DONE")

DONE


Print out the $Q(s,a)$ function/table

In [20]:
print("The four actions in order are: Right, Left, Down, Up")
for i in range(grid_size):
    for j in range(grid_size):
        print(f"At state ({i}, {j}), {q_table[i,j,:]}")

The four actions in order are: Right, Left, Down, Up
At state (0, 0), [-2.55886739 -2.91836362 -1.29154499 -2.94193794]
At state (0, 1), [-0.95870975 -2.80853581 -1.7933103  -2.63842151]
At state (0, 2), [ 0.43357907 -2.02591185 -1.70618109 -1.88528106]
At state (0, 3), [-0.09397467 -0.94005127  1.8116372  -0.86293304]
At state (0, 4), [-2.68346967 -2.40373599  0.76463684 -2.15188991]
At state (0, 5), [-2.72132741 -4.30976021  2.50526258 -3.32724556]
At state (1, 0), [-2.43506935 -2.35269613 -0.53093328 -3.03487833]
At state (1, 1), [0. 0. 0. 0.]
At state (1, 2), [0. 0. 0. 0.]
At state (1, 3), [ 1.95803909  0.39929958  3.02078726 -0.89488129]
At state (1, 4), [ 3.83217962 -1.70417489  1.60918463  0.1393552 ]
At state (1, 5), [-1.74481358 -2.43258474  4.52058278  0.89716346]
At state (2, 0), [-2.9931751  -2.31522414  0.63601675 -1.4996405 ]
At state (2, 1), [-3.13536995 -3.16868888  1.92414162 -3.07845969]
At state (2, 2), [0. 0. 0. 0.]
At state (2, 3), [2.38662578 2.09964221 2.99714673

### 1.6 Evaluation

In [21]:
# Example usage: Find the best path from start to goal
current_state = start_state
path = [current_state]
while current_state != goal_state:
    q_values = q_table[current_state[0], current_state[1], :]
    print(f"At state {current_state}: {q_values}")
    action = action_set[np.argmax(q_values)]

    # Modification for (b)
    current_state = get_next_state_stochastic(current_state, action)

    path.append(current_state)
print("Optimal Path:\n")

# Plot the path
def plot_grid():
    for i in range(grid_size):
        for j in range(grid_size):
            if (i, j) == start_state:
                print("S", end=" ")
            elif (i, j) == goal_state:
                print("G", end=" ")
            elif (i, j) in obstacles:
                print("X", end=" ")
            elif (i, j) in path:
                print("-", end=" ")
            else:
                print(".", end=" ")
        print()

plot_grid()

At state (0, 0): [-2.55886739 -2.91836362 -1.29154499 -2.94193794]
At state (1, 0): [-2.43506935 -2.35269613 -0.53093328 -3.03487833]
At state (2, 0): [-2.9931751  -2.31522414  0.63601675 -1.4996405 ]
At state (3, 0): [ 0.82805264 -2.76924873  1.61809346 -1.23072092]
At state (4, 0): [ 2.26944933 -1.51570786  1.72424042  0.30480781]
At state (4, 1): [2.2562402  1.14206205 3.95479168 0.89612123]
At state (5, 1): [5.58876659 0.13440572 0.59297569 0.75110866]
At state (5, 2): [7.057894   3.39993174 4.11750157 4.33147611]
At state (5, 3): [8.49999326 4.72046093 6.1117117  5.28583618]
At state (5, 4): [9.99999995 6.85008015 6.97076873 6.95089051]
Optimal Path:

S . . . . . 
- X X . . . 
- . X . . . 
- . X . . . 
- - . . . . 
. - - - - G 


# 2 Policy Gradient

To make the gradient computation easier, this code uses PyTorch.

### 2.1 Define Policy Function

In [22]:
import torch

import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

# Define the policy network
class PolicyNetwork(nn.Module):
    def __init__(self, input_size, output_size):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(input_size, 24)
        self.fc2 = nn.Linear(24, output_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.softmax(x, dim=-1)

# Initialize policy network

# Modification for (c)
input_size = 2 + len(action_set)

output_size = 4  # up, down, left, right (as before)
policy = PolicyNetwork(input_size, output_size)


### 2.2 Hyper-parameters

In [23]:
# Hyperparameters
num_episodes = 3000 # number of episodes (examples) for training
max_steps = 100 # maximum steps per episode, to avoid infinite loops
gamma = 0.99 # discount factor
optimizer = optim.Adam(policy.parameters(), lr=0.01)

### 2.3 Learning with Policy Gradient

The implementation of the following code block contains the three steps as explained in class

- Step 1: sample from policy function
- Step 2: calculate the return
- Step 3: calculate the gradient and update the policy function

In [24]:
# Storing intermediate results
all_rewards = []
all_lengths = []

# Policy Gradient algorithm
for episode in range(num_episodes):
    states = []
    actions = []
    rewards = []

    current_state = start_state
    episode_reward = 0

    # Modification for (c)
    last_action = None

    # ===========================
    # Step 1: Sample from the policy function
    for step in range(max_steps):

        # Modification for (c)
        one_hot = [0] * len(action_set)
        if last_action is not None:
            one_hot[action_set.index(last_action)] = 1
        state_tensor = torch.FloatTensor([current_state[0], current_state[1]] + one_hot)

        # Get action probabilities
        probs = policy(state_tensor)

        # Sample action
        action_dist = torch.distributions.Categorical(probs)
        action_idx = action_dist.sample()
        # print(f"action_idx: {action_idx}")
        action = action_set[action_idx]

        # Take action

        # Modification for (c)
        next_state = get_next_state_stochastic(current_state, action)

        reward = get_reward(next_state)

        # Store experience
        states.append(state_tensor)
        actions.append(action_idx)
        rewards.append(reward)

        episode_reward += reward
        current_state = next_state

        if current_state == goal_state:
            break

    # ===========================
    # Step 2: Calculate returns
    returns = []
    G = 0
    for r in reversed(rewards):
        G = r + gamma * G
        returns.insert(0, G)
    returns = torch.FloatTensor(returns)

    # Normalize returns
    if len(returns) > 1:
        returns = (returns - returns.mean()) / (returns.std() + 1e-9)

    # =============================
    # Step 3: Calculate the gradient and update the policy
    policy_loss = []
    for log_prob, G in zip([torch.log(policy(states[i])[actions[i]]) for i in range(len(states))], returns):
        policy_loss.append(-log_prob * G)
    policy_loss = torch.stack(policy_loss).sum()

    # Update policy
    optimizer.zero_grad()
    policy_loss.backward()
    optimizer.step()

    all_rewards.append(episode_reward)
    all_lengths.append(step + 1)

    if episode % 500 == 0:
        print(f"Episode {episode}, Reward: {episode_reward}, Steps: {step+1}")
print("DONE")

Episode 0, Reward: -83, Steps: 94
Episode 500, Reward: -15, Steps: 26
Episode 1000, Reward: 0, Steps: 11
Episode 1500, Reward: 0, Steps: 11
Episode 2000, Reward: -7, Steps: 18
Episode 2500, Reward: 1, Steps: 10
DONE


### 2.4 Evaluation

In [25]:
current_state = start_state
pg_path = [current_state]

while current_state != goal_state and len(pg_path) < grid_size*grid_size:

    # Modification for (c)
    one_hot = [0] * len(action_set)
    if last_action is not None:
        one_hot[action_set.index(last_action)] = 1
    state_tensor = torch.FloatTensor([current_state[0], current_state[1]] + one_hot)

    probs = policy(state_tensor)
    action_idx = probs.argmax().item()
    action = action_set[action_idx]

    # Modification for (c)
    current_state = get_next_state_stochastic(current_state, action)

    pg_path.append(current_state)

    # Modification for (c)
    last_action = action

print("Policy Gradient Path:")
for i in range(grid_size):
    for j in range(grid_size):
        if (i, j) == start_state:
            print("S", end=" ")
        elif (i, j) == goal_state:
            print("G", end=" ")
        elif (i, j) in obstacles:
            print("X", end=" ")
        elif (i, j) in pg_path:
            print("+", end=" ")
        else:
            print(".", end=" ")
    print()

Policy Gradient Path:
S + + + + . 
. X X . + . 
. . X . + . 
. . X . + . 
. . . . + + 
. . . . . G 
