Shyam Patel
sjp0059
Assignment 5

Problem 2:

1: 
The state space (S) is each cell in the 25x25 grid, each cell is a possible state.
The action space (A) are the possible moves the agent can make (Up, Down, Left, Right) NO diagonal moves.
The transition probabilities (P) All movement is deterministic so there is no randomness, which means that the probabilities will be 1 for moving in valid directions. For invalid movements the probaility is 1 that the agent will stay in place and 0 otherwise.
The reward function (R) each move will give the agent -1.
The discount factor (γ) will be at 0 temporarily, because from what I understand we just need to get the shortest path to goal, there are no rewards on the way and each step gives us -1 so we need to take the least amount of steps. We may move to a small discount factor of 0.1 or 0.2 if 0 does not work.



In [1]:
import numpy as np

grid_size = 25
blocked = -1
empty = 0
goal = (0, 0)

actions = ['up', 'down', 'left', 'right']
action_effects = {'up': (-1, 0), 'down': (1, 0), 'left': (0, -1), 'right': (0, 1)}

discount_factor = 0.95
threshold = 0.001

# Initial Random Policy
np.random.seed()
grid = np.zeros((grid_size, grid_size), dtype=int)
for i in range(grid_size):
    for j in range(grid_size):
        if np.random.rand() < 0.2 and (i, j) != goal:
            grid[i, j] = blocked
grid[goal] = empty

policy = {}
value_function = {}

for x in range(grid_size):
    for y in range(grid_size):
        if grid[x, y] != blocked:
            policy[(x, y)] = np.random.choice(actions)
            value_function[(x, y)] = 0
            
# Get state
def get_next_state(state, action):
    if state == goal:
        return state
    x, y = state
    dx, dy = action_effects[action]
    nx, ny = x + dx, y + dy
    if 0 <= nx < grid_size and 0 <= ny < grid_size and grid[nx, ny] != blocked:
        return (nx, ny)
    return state

# Policy Evaluation to calculate the value function for the policy.
def policy_evaluation(policy, value_function):
    while True:
        delta = 0
        for state in value_function:
            if state == goal:
                continue
            old_value = value_function[state]
            action = policy[state]
            next_state = get_next_state(state, action)
            new_value = -1 + discount_factor * value_function[next_state]
            value_function[state] = new_value
            delta = max(delta, abs(old_value - new_value))
        if delta < threshold:
            break

# Policy Improvement to update the policy based on the value function.
def policy_improvement(policy, value_function):
    policy_stable = True
    for state in value_function:
        if state == goal:
            continue
        old_policy = policy[state]
        best_action = None
        best_action_value = float('-inf')
        for action in actions:
            next_state = get_next_state(state, action)
            action_value = -1 + discount_factor * value_function[next_state]
            if action_value > best_action_value:
                best_action_value = action_value
                best_action = action
        policy[state] = best_action
        if best_action != old_policy:
            policy_stable = False
    return policy_stable

# Number of Iterations
iteration_count = 0
while True:
    policy_evaluation(policy, value_function)
    if policy_improvement(policy, value_function):
        break
    iteration_count += 1

print(f"Number of policy iterations: {iteration_count}")

# Visualization
def visualize(policy):
    for x in range(grid_size):
        for y in range(grid_size):
            state = (x, y)
            if state == goal:
                print(' G', end='')
            elif grid[x, y] == blocked:
                print(' B', end='')
            elif state in policy:
                arrow = ''
                if policy[state] == 'up':
                    arrow = '^'
                elif policy[state] == 'down':
                    arrow = 'v'
                elif policy[state] == 'left':
                    arrow = '<'
                elif policy[state] == 'right':
                    arrow = '>'
                print(f' {arrow}', end='')
        print()

print("\nOptimal policy:")
visualize(policy)


Number of policy iterations: 24

Optimal policy:
 G < B ^ < B v v v < v v v B v v v v v v v B > > v
 ^ ^ < B B v v v v B v v v B v v v v v v v < B B v
 ^ ^ ^ < < < < < < < < < < < < < < < < < < B B v v
 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ B ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ < < < <
 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ < ^ ^ ^ B ^ B B ^ ^ ^ ^ ^ ^ B
 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ B ^ ^ < ^ < B ^ ^ ^ ^ B ^ <
 ^ ^ ^ B ^ B ^ ^ B B ^ < ^ ^ ^ ^ ^ < ^ B ^ ^ B ^ B
 ^ ^ ^ < ^ < B B B v ^ ^ ^ ^ ^ B ^ ^ B v B B v B ^
 B ^ ^ ^ ^ ^ < < < < ^ ^ B ^ B ^ B ^ < < < < < < B
 B B ^ B B ^ ^ ^ ^ ^ B ^ < B v B > ^ ^ ^ ^ ^ ^ ^ <
 B > ^ < < ^ ^ ^ ^ ^ < ^ ^ < < B ^ ^ ^ ^ ^ B ^ ^ ^
 v B ^ ^ ^ B B ^ ^ B ^ ^ B ^ ^ B ^ ^ ^ ^ B v ^ ^ ^
 > > ^ ^ ^ < < B B > ^ B B ^ B v B B ^ ^ < < B ^ B
 ^ B ^ ^ ^ ^ ^ B v B ^ < < ^ < < < < ^ ^ B ^ B ^ <
 ^ B B ^ ^ ^ ^ < < < ^ ^ ^ ^ ^ B ^ B ^ B v ^ < ^ ^
 B > > ^ ^ B ^ ^ B ^ B B ^ ^ ^ B ^ B ^ < < B ^ ^ ^
 > ^ ^ B B > ^ B B ^ < < B ^ ^ < ^ < ^ ^ ^ < ^ ^ B
 ^ B ^ < B ^ ^ < < ^ ^ ^ < ^ B ^ ^ ^ ^ ^ B ^ ^ ^ B
 ^ < B ^ < ^ B ^ ^ ^ ^ ^ ^ ^ < ^ 

Report:
a. I used numpy arrays because it is easy to implement compared to other libraries. NumPy arrays provide well-suited data structure choices for MDP. It is efficient and supports vectorized operations which work well with the computational requirements of MDP solution algorithms.



In [3]:
import numpy as np

grid_size = 25
blocked = -1
empty = 0
goal = (0, 0)

actions = ['up', 'down', 'left', 'right']
action_effects = {'up': (-1, 0), 'down': (1, 0), 'left': (0, -1), 'right': (0, 1)}

discount_factor = 0.95
threshold = 0.001

# Predefined grid with no blocked cells
grid = np.zeros((grid_size, grid_size), dtype=int)
grid[10:15, 10] = blocked  # Adding vertical obstacle
grid[20, 15:20] = blocked  # Adding horizontal obstacle
grid[5:10, 5:10] = blocked  # Adding square obstacle

grid[goal] = empty

policy = {}
value_function = {}

for x in range(grid_size):
    for y in range(grid_size):
        if grid[x, y] != blocked:
            policy[(x, y)] = np.random.choice(actions)  # Initialize policy randomly
            value_function[(x, y)] = 0  # Initialize value function to zeros

# Number of Iterations
iteration_count = 0
while True:
    policy_evaluation(policy, value_function)
    if policy_improvement(policy, value_function):
        break
    iteration_count += 1


# Visualization
def visualize(policy):
    for x in range(grid_size):
        for y in range(grid_size):
            state = (x, y)
            if state == goal:
                print(' G', end='')
            elif grid[x, y] == blocked:
                print(' B', end='')
            elif state in policy:
                arrow = ''
                if policy[state] == 'up':
                    arrow = '^'
                elif policy[state] == 'down':
                    arrow = 'v'
                elif policy[state] == 'left':
                    arrow = '<'
                elif policy[state] == 'right':
                    arrow = '>'
                print(f' {arrow}', end='')
        print()

print("Optimal policy:")
visualize(policy)

# Function to simulate policy execution
def simulate_policy(policy, grid, start_state):
    current_state = start_state
    steps = 0
    while current_state != goal:
        action = policy[current_state]
        next_state = get_next_state(current_state, action)
        if next_state == current_state:  # Agent hits a blocked cell
            return float('inf')
        current_state = next_state
        steps += 1
    return steps

# Function to compute performance of a policy
def compute_policy_performance(policy, grid, num_trials=3):
    start_states = [np.random.randint(0, grid_size, size=2) for _ in range(num_trials)]
    steps_to_goal = []
    for start_state in start_states:
        steps = simulate_policy(policy, grid, tuple(start_state))
        steps_to_goal.append(steps)
    median_steps = np.median(steps_to_goal)
    return median_steps

# Run the simulation
performance = compute_policy_performance(policy, grid)
print("Median number of steps to reach the goal state (Random):", performance)

grid[goal] = empty

# Function to initialize policy and value function
def initialize_policy_and_value(grid):
    policy = {}
    value_function = {}
    for x in range(grid_size):
        for y in range(grid_size):
            if grid[x, y] != blocked:
                policy[(x, y)] = np.random.choice(actions)  # Initialize policy randomly
                value_function[(x, y)] = 0  # Initialize value function to zeros
    return policy, value_function


# Function to run policy iteration and return the policy and value function
def run_policy_iteration(grid):
    policy, value_function = initialize_policy_and_value(grid)
    iteration_count = 0
    while True:
        policy_evaluation(policy, value_function)
        if policy_improvement(policy, value_function):
            break
        iteration_count += 1
    return policy

# Generate random start states for all tests
start_states = [np.random.randint(0, grid_size, size=2) for _ in range(3)]

# Run policy iteration for intermediate policy
intermediate_policy = run_policy_iteration(grid)
intermediate_performance = compute_policy_performance(intermediate_policy, grid)
print("Median number of steps to reach the goal state (Intermediate):", intermediate_performance)

# Run policy iteration for optimal policy
optimal_policy= run_policy_iteration(grid)
optimal_performance = compute_policy_performance(optimal_policy, grid)
print("Median number of steps to reach the goal state (Optimal):", optimal_performance)


Optimal policy:
 G < < < < < < < < < < < < < < < < < < < < < < < <
 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
 ^ ^ ^ ^ ^ B B B B B ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
 ^ ^ ^ ^ ^ B B B B B ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
 ^ ^ ^ ^ ^ B B B B B ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
 ^ ^ ^ ^ ^ B B B B B ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
 ^ ^ ^ ^ ^ B B B B B ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
 ^ ^ ^ ^ ^ < < < < < B ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ B ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ B ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ B ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ B ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ < ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
 ^ ^ ^ ^ ^ ^ ^ 

For the median tests, this took me a really long time because I was using my code in the previous section that randomly generates a grid. The code kept running into an infinite loop. What I had no realized after a long time was that the agent kept moving between the same 2 states, resulting in an infinite loop. This can happen in instances where there are block cells that do not allow the agent to move anywhere but 1 place, thus it creates an infinite loop. So, what I did was create a predefined maze where there is 0 chance of gettning stuck in an infinite loop. 

The agent starts at a randomly generated seed, and that position is used for all 3 tests (Random, Optimal, Intermediate). One thing to note is that if the agent spawns at a blocked cell the program will output an error, so just simply rerun it until the agent starts at a non-blocked cell.