<a href="https://colab.research.google.com/github/tanishqpanipat/RL/blob/main/Experiment7.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import random

# Arguments
REWARD = -0.01 # constant reward for non-terminal states
DISCOUNT = 0.8
MAX_ERROR = 10**(-3)

# Set up the initial environment
NUM_ACTIONS = 4
ACTIONS = [(1, 0), (0, -1), (-1, 0), (0, 1)] # Down, Left, Up, Right
NUM_ROW = 3
NUM_COL = 4
U = [[0, 0, 0, 1], [0, 0, 0, -1], [0, 0, 0, 0], [0, 0, 0, 0]]
policy = [[random.randint(0, 3) for j in range(NUM_COL)] for i in range(NUM_ROW)] # construct a random policy

# Visualization
def printEnvironment(arr, policy=False):
    res = ""
    for r in range(NUM_ROW):
        res += "|"
        for c in range(NUM_COL):
            if r == c == 1:
                val = "WALL"
            elif r <= 1 and c == 3:
                val = "+1" if r == 0 else "-1"
            else:
                val = ["Down", "Left", "Up", "Right"][arr[r][c]]
            res += " " + val[:5].ljust(5) + " |" # format
        res += "\n"
    print(res)

# Get the utility of the state reached by performing the given action from the given state
def getU(U, r, c, action):
    dr, dc = ACTIONS[action]
    newR, newC = r+dr, c+dc
    if newR < 0 or newC < 0 or newR >= NUM_ROW or newC >= NUM_COL or (newR == newC == 1): # collide with the boundary or the wall
        return U[r][c]
    else:
        return U[newR][newC]

# Calculate the utility of a state given an action
def calculateU(U, r, c, action):
    u = REWARD
    u += 0.1 * DISCOUNT * getU(U, r, c, (action-1)%4)
    u += 0.8 * DISCOUNT * getU(U, r, c, action)
    u += 0.1 * DISCOUNT * getU(U, r, c, (action+1)%4)
    return u

# Perform some simplified value iteration steps to get an approximation of the utilities
def policyEvaluation(policy, U):
    while True:
        nextU = [[0, 0, 0, 1], [0, 0, 0, -1], [0, 0, 0, 0], [0, 0, 0, 0]]
        error = 0
        for r in range(NUM_ROW):
            for c in range(NUM_COL):
                if (r <= 1 and c == 3) or (r == c == 1):
                    continue
                nextU[r][c] = calculateU(U, r, c, policy[r][c]) # simplified Bellman update
                error = max(error, abs(nextU[r][c]-U[r][c]))
        U = nextU
        if error < MAX_ERROR * (1-DISCOUNT) / DISCOUNT:
            break
    return U

def policyIteration(policy, U):
    print("During the policy iteration:\n")
    while True:
        U = policyEvaluation(policy, U)
        unchanged = True
        for r in range(NUM_ROW):
            for c in range(NUM_COL):
                if (r <= 1 and c == 3) or (r == c == 1):
                    continue
                maxAction, maxU = None, -float("inf")
                for action in range(NUM_ACTIONS):
                    u = calculateU(U, r, c, action)
                    if u > maxU:
                        maxAction, maxU = action, u
                if maxU > calculateU(U, r, c, policy[r][c]):
                    policy[r][c] = maxAction # the action that maximizes the utility
                    unchanged = False
        if unchanged:
            break
        printEnvironment(policy)
    return policy

# Print the initial environment
print("The initial random policy is:\n")
printEnvironment(policy)

# Policy iteration
policy = policyIteration(policy, U)

# Print the optimal policy
print("The optimal policy is:\n")
printEnvironment(policy)

The initial random policy is:

| Down  | Up    | Left  | +1    |
| Right | WALL  | Left  | -1    |
| Right | Left  | Left  | Down  |

During the policy iteration:

| Down  | Up    | Right | +1    |
| Right | WALL  | Left  | -1    |
| Right | Left  | Left  | Down  |

| Right | Right | Right | +1    |
| Up    | WALL  | Up    | -1    |
| Up    | Right | Up    | Down  |

| Right | Right | Right | +1    |
| Up    | WALL  | Up    | -1    |
| Up    | Right | Up    | Left  |

The optimal policy is:

| Right | Right | Right | +1    |
| Up    | WALL  | Up    | -1    |
| Up    | Right | Up    | Left  |



In [2]:
import random

# Arguments
REWARD = -0.01 # constant reward for non-terminal states
DISCOUNT = 0.8
MAX_ERROR = 10**(-3)

# Set up the initial environment
NUM_ACTIONS = 4
ACTIONS = [(1, 0), (0, -1), (-1, 0), (0, 1)] # Down, Left, Up, Right
NUM_ROW = 3
NUM_COL = 4
U = [[0, 0, 0, 1], [0, 0, 0, -1], [0, 0, 0, 0], [0, 0, 0, 0]]
policy = [[random.randint(0, 3) for j in range(NUM_COL)] for i in range(NUM_ROW)] # construct a random policy

# Visualization
def printEnvironment(arr, policy=False):
    res = ""
    for r in range(NUM_ROW):
        res += "|"
        for c in range(NUM_COL):
            if r == c == 1:
                val = "WALL"
            elif r <= 1 and c == 3:
                val = "+1" if r == 0 else "-1"
            else:
                val = ["Down", "Left", "Up", "Right"][arr[r][c]]
            res += " " + val[:5].ljust(5) + " |" # format
        res += "\n"
    print(res)

# Get the utility of the state reached by performing the given action from the given state
def getU(U, r, c, action):
    dr, dc = ACTIONS[action]
    newR, newC = r+dr, c+dc
    if newR < 0 or newC < 0 or newR >= NUM_ROW or newC >= NUM_COL or (newR == newC == 1): # collide with the boundary or the wall
        return U[r][c]
    else:
        return U[newR][newC]

# Calculate the utility of a state given an action
def calculateU(U, r, c, action):
    u = REWARD
    u += 0.1 * DISCOUNT * getU(U, r, c, (action-1)%4)
    u += 0.8 * DISCOUNT * getU(U, r, c, action)
    u += 0.1 * DISCOUNT * getU(U, r, c, (action+1)%4)
    return u

# Perform some simplified value iteration steps to get an approximation of the utilities
def policyEvaluation(policy, U):
    while True:
        nextU = [[0, 0, 0, -1], [0, 0, 0, 1], [0, 0, 0, 0], [0, 0, 0, 0]]
        error = 0
        for r in range(NUM_ROW):
            for c in range(NUM_COL):
                if (r <= 1 and c == 3) or (r == c == 1):
                    continue
                nextU[r][c] = calculateU(U, r, c, policy[r][c]) # simplified Bellman update
                error = max(error, abs(nextU[r][c]-U[r][c]))
        U = nextU
        if error < MAX_ERROR * (1-DISCOUNT) / DISCOUNT:
            break
    return U

def policyIteration(policy, U):
    print("During the policy iteration:\n")
    while True:
        U = policyEvaluation(policy, U)
        unchanged = True
        for r in range(NUM_ROW):
            for c in range(NUM_COL):
                if (r <= 1 and c == 3) or (r == c == 1):
                    continue
                maxAction, maxU = None, -float("inf")
                for action in range(NUM_ACTIONS):
                    u = calculateU(U, r, c, action)
                    if u > maxU:
                        maxAction, maxU = action, u
                if maxU > calculateU(U, r, c, policy[r][c]):
                    policy[r][c] = maxAction # the action that maximizes the utility
                    unchanged = False
        if unchanged:
            break
        printEnvironment(policy)
    return policy

# Print the initial environment
print("The initial random policy is:\n")
printEnvironment(policy)

# Policy iteration
policy = policyIteration(policy, U)

# Print the optimal policy
print("The optimal policy is:\n")
printEnvironment(policy)

The initial random policy is:

| Right | Left  | Up    | +1    |
| Down  | WALL  | Up    | -1    |
| Down  | Down  | Down  | Down  |

During the policy iteration:

| Right | Left  | Left  | +1    |
| Down  | WALL  | Right | -1    |
| Down  | Down  | Down  | Up    |

| Down  | Right | Down  | +1    |
| Down  | WALL  | Right | -1    |
| Right | Right | Right | Up    |

| Right | Right | Down  | +1    |
| Down  | WALL  | Right | -1    |
| Right | Right | Right | Up    |

The optimal policy is:

| Right | Right | Down  | +1    |
| Down  | WALL  | Right | -1    |
| Right | Right | Right | Up    |



In [3]:
import random

# Arguments
REWARD = -0.01 # constant reward for non-terminal states
DISCOUNT = 0.8
MAX_ERROR = 10**(-3)

# Set up the initial environment
NUM_ACTIONS = 4
ACTIONS = [(1, 0), (0, -1), (-1, 0), (0, 1)] # Down, Left, Up, Right
NUM_ROW = 3
NUM_COL = 4
U = [[0, 0, 0, 1], [0, 0, 0, -1], [0, 0, 0, 0], [0, 0, 0, 0]]
policy = [[random.randint(0, 3) for j in range(NUM_COL)] for i in range(NUM_ROW)] # construct a random policy

# Visualization
def printEnvironment(arr, policy=False):
    res = ""
    for r in range(NUM_ROW):
        res += "|"
        for c in range(NUM_COL):
            if r == c == 1:
                val = "WALL"
            elif r <= 1 and c == 3:
                val = "+1" if r == 0 else "-1"
            else:
                val = ["Down", "Left", "Up", "Right"][arr[r][c]]
            res += " " + val[:5].ljust(5) + " |" # format
        res += "\n"
    print(res)

# Get the utility of the state reached by performing the given action from the given state
def getU(U, r, c, action):
    dr, dc = ACTIONS[action]
    newR, newC = r+dr, c+dc
    if newR < 0 or newC < 0 or newR >= NUM_ROW or newC >= NUM_COL or (newR == newC == 1): # collide with the boundary or the wall
        return U[r][c]
    else:
        return U[newR][newC]

# Calculate the utility of a state given an action
def calculateU(U, r, c, action):
    u = REWARD
    u += 0.1 * DISCOUNT * getU(U, r, c, (action-1)%4)
    u += 0.8 * DISCOUNT * getU(U, r, c, action)
    u += 0.1 * DISCOUNT * getU(U, r, c, (action+1)%4)
    return u

# Perform some simplified value iteration steps to get an approximation of the utilities
def policyEvaluation(policy, U):
    while True:
        nextU = [[0, 0, 0, 1], [0, 0, 0, -1], [0, 0, 0, 0], [0, 0, 0, 0]]
        error = 0
        for r in range(NUM_ROW):
            for c in range(NUM_COL):
                if (r <= 1 and c == 3) or (r == c == 1):
                    continue
                nextU[r][c] = calculateU(U, r, c, policy[r][c]) # simplified Bellman update
                error = max(error, abs(nextU[r][c]-U[r][c]))
        U = nextU
        if error < MAX_ERROR * (1-DISCOUNT) / DISCOUNT:
            break
        print("Iteration U:")
        for row in U:
            print(row)
    return U

def policyIteration(policy, U):
    print("During the policy iteration:\n")
    iteration_count = 0
    while True:
        U = policyEvaluation(policy, U)
        unchanged = True
        for r in range(NUM_ROW):
            for c in range(NUM_COL):
                if (r <= 1 and c == 3) or (r == c == 1):
                    continue
                maxAction, maxU = None, -float("inf")
                for action in range(NUM_ACTIONS):
                    u = calculateU(U, r, c, action)
                    if u > maxU:
                        maxAction, maxU = action, u
                if maxU > calculateU(U, r, c, policy[r][c]):
                    policy[r][c] = maxAction # the action that maximizes the utility
                    unchanged = False
        if unchanged:
            break
        print("Policy after iteration", iteration_count, ":")
        for row in policy:
            print(row)
        iteration_count += 1
    return policy

# Print the initial environment
print("The initial random policy is:\n")
printEnvironment(policy)

# Policy iteration
policy = policyIteration(policy, U)

# Print the optimal policy
print("The optimal policy is:\n")
printEnvironment(policy)

The initial random policy is:

| Left  | Left  | Right | +1    |
| Left  | WALL  | Right | -1    |
| Right | Left  | Up    | Right |

During the policy iteration:

Iteration U:
[-0.01, -0.01, 0.6300000000000001, 1]
[-0.01, 0, -0.6500000000000001, -1]
[-0.01, -0.01, -0.01, -0.09000000000000001]
[0, 0, 0, 0]
Iteration U:
[-0.018, -0.018, 0.6284000000000001, 1]
[-0.018, 0, -0.6004000000000002, -1]
[-0.018, -0.018, -0.4340000000000001, -0.15480000000000005]
[0, 0, 0, 0]
Iteration U:
[-0.0244, -0.0244, 0.6322400000000001, 1]
[-0.0244, 0, -0.6344480000000001, -1]
[-0.0244, -0.0244, -0.40808000000000016, -0.20145600000000008]
[0, 0, 0, 0]
Iteration U:
[-0.029520000000000005, -0.029520000000000005, 0.6298233600000002, 1]
[-0.029520000000000005, 0, -0.6320672, -1]
[-0.029520000000000005, -0.029520000000000005, -0.43411520000000015, -0.23504832000000012]
[0, 0, 0, 0]
Iteration U:
[-0.03361600000000001, -0.03361600000000001, 0.6298204928000002, 1]
[-0.03361600000000001, 0, -0.6343433472000001, -1

In [4]:
import numpy as np

def policy_iteration(env, gamma=0.9, max_iterations=1000, tolerance=1e-6):
    num_states = env.observation_space.n
    num_actions = env.action_space.n

    # Initialize a random policy and value function
    policy = np.ones((num_states, num_actions)) / num_actions
    value_function = np.zeros(num_states)

    for _ in range(max_iterations):
        # Policy Evaluation
        while True:
            delta = 0
            for state in range(num_states):
                v = value_function[state]
                new_v = 0
                for action in range(num_actions):
                    action_prob = policy[state][action]
                    for prob, next_state, reward, _ in env.P[state][action]:
                        new_v += action_prob * prob * (reward + gamma * value_function[next_state])
                value_function[state] = new_v
                delta = max(delta, abs(v - value_function[state]))
            if delta < tolerance:
                break

        policy_stable = True

        # Policy Improvement
        for state in range(num_states):
            old_action = np.argmax(policy[state])
            action_values = np.zeros(num_actions)
            for action in range(num_actions):
                for prob, next_state, reward, _ in env.P[state][action]:
                    action_values[action] += prob * (reward + gamma * value_function[next_state])

            best_action = np.argmax(action_values)
            policy[state] = np.eye(num_actions)[best_action]

            if old_action != best_action:
                policy_stable = False

        if policy_stable:
            break

    return policy

# Example usage with OpenAI Gym's FrozenLake environment
import gym

env = gym.make('FrozenLake-v1')
optimal_policy = policy_iteration(env)

# Display the optimal policy
print("Optimal Policy:")
print(optimal_policy)

Optimal Policy:
[[1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]]


  deprecation(
  deprecation(
