PART 02

1.1) Monte Carlo method with exploring starts

In [7]:
import numpy as np
import random
import matplotlib.pyplot as plt

# Define the gridworld size and discount factor
grid_size = 5
gamma = 0.95
MAX_STEPS = 100  # Maximum number of steps per episode

# Define the rewards and transitions
rewards = np.full((grid_size, grid_size), -0.2)  # Initialize rewards with -0.2 for each move
rewards[0, 1] = 5   # Blue square
rewards[0, 4] = 2.5  # Green square

# Define transitions for special squares
transitions = {}
transitions[(0, 1)] = [(5, (4, 2))]   # Blue -> Red
transitions[(0, 4)] = [(2.5, (4, 2)), (2.5, (4, 4))]  # Green -> Red or Yellow

# Define terminal states
terminal_states = [(4, 0), (2, 4)]  # Black squares are terminal states

# Define the four possible actions
actions = [(-1, 0), (1, 0), (0, -1), (0, 1)]  # Up, down, left, right

np.random.seed(42)  # Set random seed for reproducibility
random.seed(42)

# Function to get the next state and reward
def get_next_state_reward(state, action):
    if state in terminal_states:
        return (0, state)  # No reward, remain in terminal state
    if state in transitions:
        transition = transitions[state]
        idx = np.random.choice(len(transition))  # Choose one of the possible transitions
        return transition[idx]
    else:
        next_state = (state[0] + action[0], state[1] + action[1])
        if next_state[0] < 0 or next_state[0] >= grid_size or next_state[1] < 0 or next_state[1] >= grid_size:
            return (-0.5, state)  # Stepping off the grid
        return (-0.2, next_state)  # Normal move

# Initialize policy and value function
π = np.ones((grid_size, grid_size, len(actions))) / len(actions)  # Equiprobable policy
Q = np.zeros((grid_size, grid_size, len(actions)))  # Initialize action-value function
Returns = [[[] for _ in range(len(actions))] for _ in range(grid_size * grid_size)]  # Store returns for each state-action pair

# Function to generate an episode using exploring starts
def generate_episode(π):
    S0 = (random.randint(0, grid_size - 1), random.randint(0, grid_size - 1))
    A0 = random.choice(actions)
    episode = [(S0, A0, 0)]  # (state, action, reward)
    
    state = S0
    action = A0
    steps = 0  # Add a step counter to prevent infinite loops
    while state not in terminal_states and steps < MAX_STEPS:  # Limit the number of steps to prevent infinite loops
        reward, next_state = get_next_state_reward(state, action)
        episode.append((state, action, reward))
        state = next_state
        if state in terminal_states:
            break
        action = random.choices(actions, π[state[0], state[1]])[0]
        steps += 1  # Increment the step counter
    
    return episode

# Monte Carlo method with exploring starts
def monte_carlo_es(π, Q, Returns, num_episodes):
    avg_returns_per_episode = []
    for episode_num in range(num_episodes):
        episode = generate_episode(π)
        G = 0  # Initialize return
        visited = set()  # Track visited state-action pairs
        
        for t in reversed(range(len(episode) - 1)):
            St, At, Rt = episode[t]
            G = gamma * G + episode[t + 1][2]  # episode[t + 1][2] is the reward
            state_action = (St, actions.index(At))
            if state_action not in visited:
                state_index = St[0] * grid_size + St[1]
                Returns[state_index][actions.index(At)].append(G)
                Q[St[0], St[1], actions.index(At)] = np.mean(Returns[state_index][actions.index(At)])
                visited.add(state_action)
        
        # Calculate and store the average return of the current episode
        avg_returns_per_episode.append(G)
        
        # Update policy
        for i in range(grid_size):
            for j in range(grid_size):
                best_action = np.argmax(Q[i, j])
                π[i, j] = np.eye(len(actions))[best_action]
    
    return π, Q

# Perform Monte Carlo with exploring starts
optimal_policy_es, optimal_Q_es = monte_carlo_es(π, Q, Returns, num_episodes=20000)

# Print the optimal policy using text
action_symbols = {(-1, 0): '↑', (1, 0): '↓', (0, -1): '←', (0, 1): '→'}
print("Optimal Policy with Exploring Starts:")
for i in range(grid_size):
    row = []
    for j in range(grid_size):
        best_action_idx = np.argmax(optimal_policy_es[i, j])
        best_action = actions[best_action_idx]
        row.append(action_symbols[best_action])
    print(" ".join(row))


Optimal Policy with Exploring Starts:
→ ↑ ← → ↑
→ ↑ ↑ ← ↑
→ ↑ ← ↑ ↑
→ → ↑ ↑ ←
↑ ↑ ↑ ↑ ↑


1.2). Monte Carlo method without Exploring Starts but using ϵ-soft approach

In [8]:
import numpy as np
import random
import matplotlib.pyplot as plt

# Define the gridworld size and discount factor
grid_size = 5
gamma = 0.95
epsilon = 0.1

# Define the rewards and transitions
rewards = np.full((grid_size, grid_size), -0.2)
rewards[0, 1] = 5   # Blue square
rewards[0, 4] = 2.5  # Green square

# Define transitions for special squares
transitions = {}
transitions[(0, 1)] = [(5, (4, 2))]   # Blue -> Red
transitions[(0, 4)] = [(2.5, (4, 2)), (2.5, (4, 4))]  # Green -> Red or Yellow

# Define terminal states
terminal_states = [(4, 0), (2, 4)]

# Define the four possible actions
actions = [(-1, 0), (1, 0), (0, -1), (0, 1)]
action_indices = {(-1, 0): 0, (1, 0): 1, (0, -1): 2, (0, 1): 3}

np.random.seed(42)
random.seed(42)

# Function to get the next state and reward
def get_next_state_reward(state, action):
    if state in terminal_states:
        return (0, state)
    if state in transitions:
        transition = transitions[state]
        idx = np.random.choice(len(transition))
        return transition[idx]
    else:
        next_state = (state[0] + action[0], state[1] + action[1])
        if next_state[0] < 0 or next_state[0] >= grid_size or next_state[1] < 0 or next_state[1] >= grid_size:
            return (-0.5, state)  # Stepping off the grid
        return (-0.2, next_state)  # Normal move

# Initialize policy and value function
π = np.ones((grid_size, grid_size, len(actions))) / len(actions)  # ϵ-soft policy
Q = np.zeros((grid_size, grid_size, len(actions)))  # Initialize action-value function
Returns = [[[] for _ in range(len(actions))] for _ in range(grid_size * grid_size)]  # Store returns for each state-action pair

# Function to generate an episode following the epsilon-soft policy
def generate_episode(π):
    state = (random.randint(0, grid_size - 1), random.randint(0, grid_size - 1))
    episode = []
    
    steps = 0  # Add a step counter to prevent infinite loops
    while state not in terminal_states and steps < 100:  # Limit the number of steps to prevent infinite loops
        if random.uniform(0, 1) < epsilon:
            action = random.choice(actions)  # Random action with probability epsilon
        else:
            action = random.choices(actions, π[state[0], state[1]])[0]  # Greedy action with probability 1-epsilon
        reward, next_state = get_next_state_reward(state, action)
        episode.append((state, action, reward))
        state = next_state
        steps += 1  # Increment the step counter
    
    return episode

# Monte Carlo method with epsilon-soft policy
def monte_carlo_eps_soft(π, Q, Returns, num_episodes):
    for episode_num in range(num_episodes):
        episode = generate_episode(π)
        G = 0  # Initialize return
        visited = set()  # Track visited state-action pairs
        
        for t in reversed(range(len(episode))):
            state, action, reward = episode[t]
            G = gamma * G + reward
            state_action = (state, actions.index(action))
            if state_action not in visited:
                state_index = state[0] * grid_size + state[1]
                Returns[state_index][actions.index(action)].append(G)
                Q[state[0], state[1], actions.index(action)] = np.mean(Returns[state_index][actions.index(action)])
                visited.add(state_action)
                
                best_action = np.argmax(Q[state[0], state[1]])
                for a in range(len(actions)):
                    if a == best_action:
                        π[state[0], state[1], a] = 1 - epsilon + epsilon / len(actions)
                    else:
                        π[state[0], state[1], a] = epsilon / len(actions)
    
    return π, Q

# Initialize policy and value function for epsilon-soft
policy_eps_soft = np.ones((grid_size, grid_size, len(actions))) / len(actions)
Q_eps_soft = np.zeros((grid_size, grid_size, len(actions)))
returns_eps_soft = [[[] for _ in range(len(actions))] for _ in range(grid_size * grid_size)]

# Perform Monte Carlo with epsilon-soft policy
optimal_policy_eps_soft, optimal_Q_eps_soft = monte_carlo_eps_soft(policy_eps_soft, Q_eps_soft, returns_eps_soft, num_episodes=20000)

# Print the optimal policy using text
action_symbols = {(-1, 0): '↑', (1, 0): '↓', (0, -1): '←', (0, 1): '→'}
print("Optimal Policy with Epsilon-Soft Policy:")
for i in range(grid_size):
    row = []
    for j in range(grid_size):
        best_action_idx = np.argmax(optimal_policy_eps_soft[i, j])
        best_action = actions[best_action_idx]
        row.append(action_symbols[best_action])
    print(" ".join(row))


Optimal Policy with Epsilon-Soft Policy:
↑ ↓ ← ← ↓
→ ↑ ↑ ↑ ←
↑ ↑ ↑ ↑ ↑
↑ ← ↑ ← ←
↑ ↓ ↑ ↑ ↑


2.1). Behaviour policy with equiprobable moves to learn an optimal policy.

In [11]:
import numpy as np
import random
import matplotlib.pyplot as plt

# Define the gridworld size and discount factor
grid_size = 5
gamma = 0.95
alpha = 0.1  # Learning rate
epsilon = 0.1  # Epsilon for epsilon-greedy policy

# Define the rewards and transitions
rewards = np.full((grid_size, grid_size), -0.2)
rewards[0, 1] = 5   # Blue square
rewards[0, 4] = 2.5  # Green square

# Define transitions for special squares
transitions = {}
transitions[(0, 1)] = [(5, (4, 2))]   # Blue -> Red
transitions[(0, 4)] = [(2.5, (4, 2)), (2.5, (4, 4))]  # Green -> Red or Yellow

# Define terminal states
terminal_states = [(4, 0), (2, 4)]

# Define the four possible actions
actions = [(-1, 0), (1, 0), (0, -1), (0, 1)]
action_indices = {(-1, 0): 0, (1, 0): 1, (0, -1): 2, (0, 1): 3}

np.random.seed(42)
random.seed(42)

# Function to get the next state and reward
def get_next_state_reward(state, action):
    if state in terminal_states:
        return (0, state)
    if state in transitions:
        transition = transitions[state]
        idx = np.random.choice(len(transition))
        return transition[idx]
    else:
        next_state = (state[0] + action[0], state[1] + action[1])
        if next_state[0] < 0 or next_state[0] >= grid_size or next_state[1] < 0 or next_state[1] >= grid_size:
            return (-0.5, state)  # Stepping off the grid
        return (-0.2, next_state)  # Normal move

# Initialize the behavior policy and value function
behavior_policy = np.ones((grid_size, grid_size, len(actions))) / len(actions)  # Equiprobable policy
target_policy = np.ones((grid_size, grid_size, len(actions))) / len(actions)  # Initial target policy
Q = np.zeros((grid_size, grid_size, len(actions)))  # Initialize action-value function
Returns = [[[] for _ in range(len(actions))] for _ in range(grid_size * grid_size)]  # Store returns for each state-action pair

# Function to generate an episode following the behavior policy
def generate_episode(policy):
    state = (random.randint(0, grid_size - 1), random.randint(0, grid_size - 1))
    episode = []
    
    steps = 0  # Add a step counter to prevent infinite loops
    while state not in terminal_states and steps < 100:  # Limit the number of steps to prevent infinite loops
        action = random.choices(actions, policy[state[0], state[1]])[0]  # Choose action based on the policy
        reward, next_state = get_next_state_reward(state, action)
        episode.append((state, action, reward))
        state = next_state
        steps += 1  # Increment the step counter
    
    return episode

# Monte Carlo method with importance sampling
def monte_carlo_importance_sampling(behavior_policy, target_policy, Q, Returns, num_episodes):
    for episode_num in range(num_episodes):
        episode = generate_episode(behavior_policy)
        G = 0  # Initialize return
        
        for t in reversed(range(len(episode))):
            state, action, reward = episode[t]
            G = gamma * G + reward
            state_action = (state, actions.index(action))
            state_index = state[0] * grid_size + state[1]
            
            if state_action not in [(e[0], actions.index(e[1])) for e in episode[:t]]:
                Returns[state_index][actions.index(action)].append(G)
                Q[state[0], state[1], actions.index(action)] = np.mean(Returns[state_index][actions.index(action)])
                
                best_action = np.argmax(Q[state[0], state[1]])
                for a in range(len(actions)):
                    if a == best_action:
                        target_policy[state[0], state[1], a] = 1 - epsilon + epsilon / len(actions)
                    else:
                        target_policy[state[0], state[1], a] = epsilon / len(actions)
    
    return target_policy, Q

# Initialize policy and value function for importance sampling
policy_importance_sampling = np.ones((grid_size, grid_size, len(actions))) / len(actions)
Q_importance_sampling = np.zeros((grid_size, grid_size, len(actions)))
returns_importance_sampling = [[[] for _ in range(len(actions))] for _ in range(grid_size * grid_size)]

# Perform Monte Carlo with importance sampling
optimal_policy_importance_sampling, optimal_Q_importance_sampling = monte_carlo_importance_sampling(behavior_policy, policy_importance_sampling, Q_importance_sampling, returns_importance_sampling, num_episodes=20000)

# Print the optimal policy using text
action_symbols = {(-1, 0): '↑', (1, 0): '↓', (0, -1): '←', (0, 1): '→'}
print("Optimal Policy with Importance Sampling:")
for i in range(grid_size):
    row = []
    for j in range(grid_size):
        best_action_idx = np.argmax(optimal_policy_importance_sampling[i, j])
        best_action = actions[best_action_idx]
        row.append(action_symbols[best_action])
    print(" ".join(row))

# Print the Q-values for each state-action pair
#print("\nQ-values:")
#for i in range(grid_size):
    #for j in range(grid_size):
        #print(f"State ({i},{j}): {Q_importance_sampling[i, j]}")



Optimal Policy with Importance Sampling:
→ ↑ ← → ←
↑ ↑ ↑ ↑ ↑
↑ ↑ ↑ → ↑
↓ ← ↑ ↑ ↑
↑ ← ← ↑ ↑


3.1). Permute the locations of the green and blue squares with probability 0.1, while preserving the rewards and transition structure as before. Use policy iteration to determine a suitable policy 

In [37]:
import numpy as np
import random
import matplotlib.pyplot as plt

# Define the gridworld size and discount factor
grid_size = 5
gamma = 0.95

# Define the rewards and transitions
rewards = np.full((grid_size, grid_size), -0.2)
initial_blue_pos = (0, 1)
initial_green_pos = (0, 4)
rewards[initial_blue_pos] = 5   # Initial Blue square
rewards[initial_green_pos] = 2.5  # Initial Green square

# Define terminal states
terminal_states = [(4, 0), (2, 4)]

# Define the four possible actions
actions = [(-1, 0), (1, 0), (0, -1), (0, 1)]
action_indices = {(-1, 0): 0, (1, 0): 1, (0, -1): 2, (0, 1): 3}

# Set random seeds for reproducibility
np.random.seed(42)
random.seed(42)

# Function to get the next state and reward with permutation
def get_next_state_reward(state, action, blue_pos, green_pos):
    if state in terminal_states:
        return (0, state, blue_pos, green_pos)
    # Swap positions of blue and green squares with 0.1 probability
    if random.uniform(0, 1) < 0.1:
        blue_pos, green_pos = green_pos, blue_pos
    next_state = (state[0] + action[0], state[1] + action[1])
    if next_state[0] < 0 or next_state[0] >= grid_size or next_state[1] < 0 or next_state[1] >= grid_size:
        return (-0.5, state, blue_pos, green_pos)  # Stepping off the grid
    if next_state == blue_pos:
        return (5, next_state, blue_pos, green_pos)  # Reward for Blue square
    if next_state == green_pos:
        return (2.5, next_state, blue_pos, green_pos)  # Reward for Green square
    return (-0.2, next_state, blue_pos, green_pos)  # Normal move

# Function to get the next state and reward without permutation
def get_next_state_reward_fixed(state, action, blue_pos, green_pos):
    if state in terminal_states:
        return (0, state, blue_pos, green_pos)
    next_state = (state[0] + action[0], state[1] + action[1])
    if next_state[0] < 0 or next_state[0] >= grid_size or next_state[1] < 0 or next_state[1] >= grid_size:
        return (-0.5, state, blue_pos, green_pos)  # Stepping off the grid
    if next_state == blue_pos:
        return (5, next_state, blue_pos, green_pos)  # Reward for Blue square
    if next_state == green_pos:
        return (2.5, next_state, blue_pos, green_pos)  # Reward for Green square
    return (-0.2, next_state, blue_pos, green_pos)  # Normal move

# Initialize policy and value function
policy = np.ones((grid_size, grid_size, len(actions))) / len(actions)
V = np.zeros((grid_size, grid_size))

# Policy evaluation function
def policy_evaluation(policy, V, blue_pos, green_pos, theta=1e-6, max_iterations=1000):
    for _ in range(max_iterations):
        delta = 0
        for i in range(grid_size):
            for j in range(grid_size):
                v = V[i, j]
                new_v = 0
                for a in range(len(actions)):
                    action = actions[a]
                    prob = policy[i, j, a]
                    reward, next_state, new_blue_pos, new_green_pos = get_next_state_reward((i, j), action, blue_pos, green_pos)
                    new_v += prob * (reward + gamma * V[next_state[0], next_state[1]])
                V[i, j] = new_v
                delta = max(delta, abs(v - V[i, j]))
        if delta < theta:
            break
    return V

# Policy improvement function
def policy_improvement(V, blue_pos, green_pos):
    policy_stable = True
    new_policy = np.zeros((grid_size, grid_size, len(actions)))
    for i in range(grid_size):
        for j in range(grid_size):
            old_action = np.argmax(policy[i, j])
            action_values = np.zeros(len(actions))
            for a in range(len(actions)):
                action = actions[a]
                reward, next_state, new_blue_pos, new_green_pos = get_next_state_reward((i, j), action, blue_pos, green_pos)
                action_values[a] = reward + gamma * V[next_state[0], next_state[1]]
            new_action = np.argmax(action_values)
            if old_action != new_action:
                policy_stable = False
            new_policy[i, j] = np.eye(len(actions))[new_action]
    return new_policy, policy_stable

# Policy iteration function
def policy_iteration(policy, V, blue_pos, green_pos, max_iterations=1000):
    iteration = 0
    while iteration < max_iterations:
        #print(f"Iteration: {iteration}")
        V = policy_evaluation(policy, V, blue_pos, green_pos)
        policy, policy_stable = policy_improvement(V, blue_pos, green_pos)
        iteration += 1
        if policy_stable:
            break
    return policy, V

# Policy evaluation function for fixed squares
def policy_evaluation_fixed(policy, V, blue_pos, green_pos, theta=1e-6, max_iterations=1000):
    for _ in range(max_iterations):
        delta = 0
        for i in range(grid_size):
            for j in range(grid_size):
                v = V[i, j]
                new_v = 0
                for a in range(len(actions)):
                    action = actions[a]
                    prob = policy[i, j, a]
                    reward, next_state, new_blue_pos, new_green_pos = get_next_state_reward_fixed((i, j), action, blue_pos, green_pos)
                    new_v += prob * (reward + gamma * V[next_state[0], next_state[1]])
                V[i, j] = new_v
                delta = max(delta, abs(v - V[i, j]))
        if delta < theta:
            break
    return V

# Policy improvement function for fixed squares
def policy_improvement_fixed(V, blue_pos, green_pos):
    policy_stable = True
    new_policy = np.zeros((grid_size, grid_size, len(actions)))
    for i in range(grid_size):
        for j in range(grid_size):
            old_action = np.argmax(policy[i, j])
            action_values = np.zeros(len(actions))
            for a in range(len(actions)):
                action = actions[a]
                reward, next_state, new_blue_pos, new_green_pos = get_next_state_reward_fixed((i, j), action, blue_pos, green_pos)
                action_values[a] = reward + gamma * V[next_state[0], next_state[1]]
            new_action = np.argmax(action_values)
            if old_action != new_action:
                policy_stable = False
            new_policy[i, j] = np.eye(len(actions))[new_action]
    return new_policy, policy_stable

# Policy iteration function for fixed squares
def policy_iteration_fixed(policy, V, blue_pos, green_pos, max_iterations=1000):
    iteration = 0
    while iteration < max_iterations:
        #print(f"Iteration: {iteration}")
        V = policy_evaluation_fixed(policy, V, blue_pos, green_pos)
        policy, policy_stable = policy_improvement_fixed(V, blue_pos, green_pos)
        iteration += 1
        if policy_stable:
            break
    return policy, V

# Initial positions of blue and green squares
blue_pos = (0, 1)
green_pos = (0, 4)

# Perform policy iteration for permutation scenario
print("Policy Iteration with Permutation:")
optimal_policy_permutation, optimal_V_permutation = policy_iteration(policy.copy(), V.copy(), blue_pos, green_pos)

# Perform policy iteration for fixed squares scenario
print("\nPolicy Iteration without Permutation:")
optimal_policy_fixed, optimal_V_fixed = policy_iteration_fixed(policy.copy(), V.copy(), blue_pos, green_pos)

# Print the optimal policy for permutation scenario
action_symbols = {(-1, 0): '↑', (1, 0): '↓', (0, -1): '←', (0, 1): '→'}
print("\nOptimal Policy with Permutation (using arrows):")
for i in range(grid_size):
    row = []
    for j in range(grid_size):
        best_action_idx = np.argmax(optimal_policy_permutation[i, j])
        best_action = actions[best_action_idx]
        row.append(action_symbols[best_action])
    print(" ".join(row))

# Print the optimal value function for permutation scenario
print("\nOptimal Value Function with Permutation:")
print(optimal_V_permutation)

# Print the optimal policy for fixed squares scenario
print("\nOptimal Policy without Permutation (using arrows):")
for i in range(grid_size):
    row = []
    for j in range(grid_size):
        best_action_idx = np.argmax(optimal_policy_fixed[i, j])
        best_action = actions[best_action_idx]
        row.append(action_symbols[best_action])
    print(" ".join(row))

# Print the optimal value function for fixed squares scenario
print("\nOptimal Value Function without Permutation:")
print(optimal_V_fixed)


Policy Iteration with Permutation:

Policy Iteration without Permutation:

Optimal Policy with Permutation (using arrows):
→ → ← ← ←
↑ ↑ ↑ ← ↑
↑ ↑ ↑ ← ↑
↑ ↑ ↑ ← ←
↑ ↑ ↑ ← ↑

Optimal Value Function with Permutation:
[[45.39788919 42.92799473 45.78159499 24.01287014 22.61222663]
 [42.92799473 43.28159499 43.29251524 22.61222663 23.9816153 ]
 [40.58159499 40.91751524 40.92788948 21.2816153   0.        ]
 [38.35251524 38.67163948 38.681495   20.01753453 18.81665781]
 [ 0.         36.5380575  36.54742025 18.81665781 17.67582492]]

Optimal Policy without Permutation (using arrows):
→ ↓ ← ← ←
↑ ↑ ↑ ↑ ↑
↑ ↑ ↑ ↑ ↑
↑ ↑ ↑ ↑ ←
↑ ↑ ↑ ↑ ↑

Optimal Value Function without Permutation:
[[49.33333333 46.66666667 49.33333333 46.66666667 44.13333333]
 [46.66666667 49.33333333 46.66666667 44.13333333 44.42666667]
 [44.13333333 46.66666667 44.13333333 41.72666667  0.        ]
 [41.72666667 44.13333333 41.72666667 39.44033333 37.26831667]
 [ 0.         41.72666667 39.44033333 37.26831667 35.20490083]]
