In [1]:
import numpy as np

# Grid size
GRID_SIZE = 5
NUM_STATES = GRID_SIZE * GRID_SIZE

# Define special states (row, col)
BLUE = (0, 1)
GREEN = (0, 3)
RED = (4, 1)
YELLOW = (4, 3)

# Convert 2D state to 1D index
def to_index(pos):
    return pos[0] * GRID_SIZE + pos[1]

# Discount factor
gamma = 0.95

# Actions: up, down, left, right
actions = {
    0: (-1, 0),  # Up
    1: (1, 0),   # Down
    2: (0, -1),  # Left
    3: (0, 1)    # Right
}

action_probs = [0.25, 0.25, 0.25, 0.25]  # Equiprobable random policy


In [2]:
def step(state, action):
    row, col = divmod(state, GRID_SIZE)

    # Special states
    if (row, col) == BLUE:
        return to_index(RED), 5.0

    if (row, col) == GREEN:
        return (
            to_index(RED) if np.random.rand() < 0.5 else to_index(YELLOW),
            2.5,
        )

    # Attempt normal movement
    d_row, d_col = actions[action]
    new_row, new_col = row + d_row, col + d_col

    # Check for boundary
    if not (0 <= new_row < GRID_SIZE and 0 <= new_col < GRID_SIZE):
        return state, -0.5  # Attempted to step off the grid

    return to_index((new_row, new_col)), 0.0  # Normal move


In [3]:
def solve_bellman_explicit():
    A = np.zeros((NUM_STATES, NUM_STATES))
    b = np.zeros(NUM_STATES)

    for s in range(NUM_STATES):
        row, col = divmod(s, GRID_SIZE)

        if (row, col) == BLUE:
            s_prime = to_index(RED)
            A[s, s_prime] = gamma
            b[s] = 5.0
            continue

        if (row, col) == GREEN:
            s_red = to_index(RED)
            s_yellow = to_index(YELLOW)
            A[s, s_red] += 0.5 * gamma
            A[s, s_yellow] += 0.5 * gamma
            b[s] = 2.5
            continue

        for a in range(4):
            s_prime, r = step(s, a)
            A[s, s_prime] += action_probs[a] * gamma
            b[s] += action_probs[a] * r

    V = np.linalg.solve(np.eye(NUM_STATES) - A, b)
    return V.reshape(GRID_SIZE, GRID_SIZE)


In [4]:
V_explicit = solve_bellman_explicit()
print("Value Function (Explicit Bellman Solution):")
print(np.round(V_explicit, 2))


Value Function (Explicit Bellman Solution):
[[ 1.73  4.07  1.88  1.52  0.16]
 [ 0.81  1.45  0.96  0.54 -0.11]
 [-0.04  0.27  0.16 -0.1  -0.53]
 [-0.7  -0.44 -0.43 -0.61 -0.95]
 [-1.23 -0.98 -0.94 -1.08 -1.4 ]]


In [5]:
def iterative_policy_evaluation(theta=1e-6, max_iters=1000):
    V = np.zeros(NUM_STATES)

    for _ in range(max_iters):
        delta = 0
        V_new = np.zeros(NUM_STATES)

        for s in range(NUM_STATES):
            row, col = divmod(s, GRID_SIZE)

            if (row, col) == BLUE:
                V_new[s] = 5.0 + gamma * V[to_index(RED)]
                continue

            if (row, col) == GREEN:
                V_new[s] = 2.5 + gamma * 0.5 * (V[to_index(RED)] + V[to_index(YELLOW)])
                continue

            value = 0.0
            for a in range(4):
                s_prime, r = step(s, a)
                value += 0.25 * (r + gamma * V[s_prime])
            V_new[s] = value

            delta = max(delta, abs(V[s] - V_new[s]))

        V = V_new
        if delta < theta:
            break

    return V.reshape(GRID_SIZE, GRID_SIZE)


In [6]:
V_iterative = iterative_policy_evaluation()
print("Value Function (Iterative Evaluation):")
print(np.round(V_iterative, 2))


Value Function (Iterative Evaluation):
[[ 1.73  4.07  1.88  1.52  0.16]
 [ 0.82  1.45  0.96  0.54 -0.11]
 [-0.04  0.27  0.16 -0.1  -0.53]
 [-0.7  -0.44 -0.43 -0.61 -0.95]
 [-1.23 -0.98 -0.94 -1.08 -1.4 ]]


The state with the highest value is (0, 1), which is the blue square. This is expected since every action in this state gives a reward of 5 and causes a jump to the red square. States close to it, such as (0, 0), (0, 2), and (0, 3), also have high values because the agent can reach the blue or green square quickly from them. Therefore, the result is not surprising given the reward structure.


In [8]:
def initialize_policy_and_value():
    policy = np.zeros(NUM_STATES, dtype=int)
    V = np.zeros(NUM_STATES)
    return policy, V


In [9]:
def policy_evaluation(policy, gamma=0.95, theta=1e-3, max_iters=100):
    V = np.zeros(NUM_STATES)

    for i in range(max_iters):
        delta = 0
        for s in range(NUM_STATES):
            row, col = divmod(s, GRID_SIZE)

            if (row, col) == BLUE:
                V[s] = 5 + gamma * V[to_index(RED)]
                continue
            elif (row, col) == GREEN:
                V[s] = 2.5 + gamma * 0.5 * (V[to_index(RED)] + V[to_index(YELLOW)])
                continue

            a = policy[s]
            s_prime, r = step(s, a)
            new_value = r + gamma * V[s_prime]
            delta = max(delta, abs(V[s] - new_value))
            V[s] = new_value

        if delta < theta:
            break

    return V


In [10]:
def policy_improvement(V, gamma=0.95):
    policy_stable = True
    new_policy = np.zeros(NUM_STATES, dtype=int)

    for s in range(NUM_STATES):
        row, col = divmod(s, GRID_SIZE)

        if (row, col) in [BLUE, GREEN]:
            new_policy[s] = 0
            continue

        action_values = []
        for a in range(4):
            s_prime, r = step(s, a)
            value = r + gamma * V[s_prime]
            action_values.append(value)

        best_action = np.argmax(action_values)

        if best_action != new_policy[s]:
            policy_stable = False

        new_policy[s] = best_action

    return new_policy, policy_stable


In [11]:
def policy_iteration(gamma=0.95, max_outer_iters=200):
    policy, V = initialize_policy_and_value()

    for i in range(max_outer_iters):
        V = policy_evaluation(policy, gamma)
        policy, stable = policy_improvement(V, gamma)
        if stable:
            print(f"Policy converged after {i+1} iterations.")
            break
    else:
        print("Warning: Reached max iterations without convergence.")

    return policy.reshape(GRID_SIZE, GRID_SIZE), V.reshape(GRID_SIZE, GRID_SIZE)


In [12]:
policy_opt, V_opt = policy_iteration()

action_symbols = ['↑', '↓', '←', '→']
policy_symbols = np.vectorize(lambda x: action_symbols[x])(policy_opt)

print("Optimal Policy:")
print(policy_symbols)

print("\nOptimal Value Function:")
print(np.round(V_opt, 2))


Optimal Policy:
[['→' '↑' '←' '↑' '←']
 ['→' '↑' '↑' '←' '←']
 ['→' '↑' '↑' '↑' '↑']
 ['→' '↑' '↑' '↑' '↑']
 ['→' '↑' '↑' '↑' '↑']]

Optimal Value Function:
[[20.99 22.1  21.   18.77 17.83]
 [19.94 21.   19.95 18.95 18.  ]
 [18.95 19.95 18.95 18.   17.1 ]
 [18.   18.95 18.   17.1  16.25]
 [17.1  18.   17.1  16.25 15.43]]


In [13]:
def value_iteration(gamma=0.95, theta=1e-3, max_iters=1000):
    V = np.zeros(NUM_STATES)

    for i in range(max_iters):
        delta = 0
        V_new = np.zeros_like(V)

        for s in range(NUM_STATES):
            row, col = divmod(s, GRID_SIZE)

            if (row, col) == BLUE:
                V_new[s] = 5 + gamma * V[to_index(RED)]
                continue
            elif (row, col) == GREEN:
                V_new[s] = 2.5 + gamma * 0.5 * (V[to_index(RED)] + V[to_index(YELLOW)])
                continue

            max_value = float('-inf')
            for a in range(4):
                s_prime, r = step(s, a)
                val = r + gamma * V[s_prime]
                max_value = max(max_value, val)

            V_new[s] = max_value
            delta = max(delta, abs(V[s] - V_new[s]))

        V = V_new

        if delta < theta:
            print(f"Converged at iteration {i}")
            break

    return V


In [14]:
def extract_policy_from_value(V, gamma=0.95):
    policy = np.zeros(NUM_STATES, dtype=int)

    for s in range(NUM_STATES):
        row, col = divmod(s, GRID_SIZE)

        if (row, col) in [BLUE, GREEN]:
            policy[s] = 0
            continue

        best_action = 0
        best_value = float('-inf')
        for a in range(4):
            s_prime, r = step(s, a)
            val = r + gamma * V[s_prime]
            if val > best_value:
                best_value = val
                best_action = a

        policy[s] = best_action

    return policy


In [15]:
V_vi = value_iteration()
policy_vi = extract_policy_from_value(V_vi)

V_vi = V_vi.reshape(GRID_SIZE, GRID_SIZE)
policy_vi = policy_vi.reshape(GRID_SIZE, GRID_SIZE)

print("Optimal Value Function from Value Iteration:")
print(np.round(V_vi, 2))

action_symbols = ['↑', '↓', '←', '→']
policy_symbols = np.vectorize(lambda x: action_symbols[x])(policy_vi)

print("\nOptimal Policy (Value Iteration):")
print(policy_symbols)


Converged at iteration 0
Optimal Value Function from Value Iteration:
[[0.  5.  0.  2.5 0. ]
 [0.  0.  0.  0.  0. ]
 [0.  0.  0.  0.  0. ]
 [0.  0.  0.  0.  0. ]
 [0.  0.  0.  0.  0. ]]

Optimal Policy (Value Iteration):
[['→' '↑' '←' '↑' '←']
 ['↑' '↑' '↑' '↑' '↑']
 ['↑' '↑' '↑' '↑' '↑']
 ['↑' '↑' '↑' '↑' '↑']
 ['↑' '↑' '↑' '↑' '↑']]


In [16]:
# Terminal states: Black squares
TERMINAL_STATES = [to_index((2, 2)), to_index((4, 4))]

def is_terminal(state):
    return state in TERMINAL_STATES

# New step function for Part 2
def step2(state, action):
    if is_terminal(state):
        return state, 0, True  # Stays in terminal, episode ends

    row, col = divmod(state, GRID_SIZE)

    if (row, col) == BLUE:
        return to_index(RED), 5.0, False

    if (row, col) == GREEN:
        target = RED if np.random.rand() < 0.5 else YELLOW
        return to_index(target), 2.5, False

    d_row, d_col = actions[action]
    new_row, new_col = row + d_row, col + d_col

    # Off-grid move
    if not (0 <= new_row < GRID_SIZE and 0 <= new_col < GRID_SIZE):
        return state, -0.5, False

    new_state = to_index((new_row, new_col))
    done = is_terminal(new_state)
    return new_state, -0.2, done


In [17]:
import random
from collections import defaultdict

def generate_episode_es(policy, gamma=0.95):
    states = list(range(NUM_STATES))
    start_state = random.choice([s for s in states if not is_terminal(s)])
    start_action = random.choice(list(actions.keys()))
    
    episode = []
    state = start_state
    action = start_action
    done = False

    while not done:
        next_state, reward, done = step2(state, action)
        episode.append((state, action, reward))
        if done:
            break
        action = random.choices(list(actions.keys()), weights=policy[state])[0]
        state = next_state

    return episode


In [18]:
def mc_exploring_starts(num_episodes=1000, gamma=0.95):
    Q = defaultdict(lambda: np.zeros(4))
    returns = defaultdict(list)

    policy = {s: [0.25] * 4 for s in range(NUM_STATES)}

    for episode_idx in range(num_episodes):
        episode = generate_episode_es(policy, gamma)
        G = 0
        visited = set()

        for t in reversed(range(len(episode))):
            s, a, r = episode[t]
            G = gamma * G + r

            if (s, a) not in visited:
                returns[(s, a)].append(G)
                Q[s][a] = np.mean(returns[(s, a)])
                visited.add((s, a))

        for s in policy:
            if is_terminal(s):
                continue
            best_a = np.argmax(Q[s])
            policy[s] = [1 if a == best_a else 0 for a in range(4)]

    return Q, policy
