## LAB 9: Markov Decision Process

Seepja Payasi



A Markov Decision Process (MDP) model contains: 

A set of possible world states S.

A set of Models.

A set of possible actions A.

A real-valued reward function R(s,a).

A policy the solution of Markov Decision Process.

In [13]:
import numpy as np
import gym

def main():
    # Starting new environment
    env = gym.make('FrozenLake-v1')

    # Run policy Iteration
    optimal_policy, optimal_value_function = policy_iteration(env, gamma=0.9)

    # Print the optimal policy and optimal value function
    print_output(optimal_policy, optimal_value_function)

    # Test the optimal policy
    run_policy(optimal_policy)

def policy_iteration(env,gamma = 1.0):
    # Initialize a random policy. Hint - Check numpy function np.zeros.
    random_policy = np.zeros(env.observation_space.n)
    no_of_iterations = 2000

    for i in range(no_of_iterations):
        new_value_function = policy_evaluation(env, random_policy, gamma)
        new_policy = policy_improvement(env, new_value_function, gamma)
        if (np.all(random_policy == new_policy)):
            print ('Policy-Iteration converged at step %d.' %(i+1))
            break
        random_policy = new_policy
    return new_policy, new_value_function


def policy_evaluation(env, policy, gamma=1.0):
    # Initialize a value function with all zeros. Hint - Check numpy function np.zeros.
    value_table = np.zeros(env.nS)

    # Delta threshold as termination condition
    threshold = 1e-10

    while True:
        # Make a copy of current value function to calculate delta change. Hint - numpy function for copy
        updated_value_table = np.copy(value_table)

        # Iterate over all the states
        for state in range(env.nS):
            # Select action for the state from policy being evaluated
            action = policy[state]

            # Calculate value of this state. Refer to slide 15 V(s) update. Transition probabilities can be accessed by env.P
            value_table[state] = sum(
                [trans_prob * (reward_prob + gamma * updated_value_table[next_state])
                 for trans_prob, next_state, reward_prob, _ in env.P[state][action]])

            s = 0
            for trans_prob, next_state, reward_prob, _ in env.P[state][action]:
                s += trans_prob * (reward_prob + gamma * updated_value_table[next_state])
            value_table[state] = s
        if (np.sum((np.fabs(updated_value_table - value_table))) <= threshold):
            break
    return value_table


def policy_improvement(env, value_table, gamma=1.0):
    # Initialize array for policy with all zeros. Hint - Check numpy function np.zeros.
    policy = np.zeros(env.observation_space.n)

    # Iterate over all the states.
    for state in range(env.observation_space.n):
        Q_table = np.zeros(env.action_space.n)
        for action in range(env.action_space.n):
            for next_sr in env.P[state][action]:
                trans_prob, next_state, reward_prob, _ = next_sr

                # Implement (transition probability + gamma * value function of a next state)
                Q_table[action] += (trans_prob * (reward_prob + gamma * value_table[next_state]))

        # find out for which index (action) Q_table has max  value and assign it to state in policy array. Hint numpy argmax function.
        policy[state] = np.argmax(Q_table)

    return policy


def print_output(policy=None, value_fn = None):
    print("\n\n")
    print(np.reshape(value_fn, (-1, 4)))
    print("\n")
    print(np.reshape(policy, (-1, 4)))

def run_policy(optimal_policy):
    env = gym.make('FrozenLake-v1')

    # Implement this using lab 1 code to test optimal policy

    # From lab 1 code, instead of random action, now choose action from optimal policy

    # Calculate successful episodes %
    num_episodes = 100
    success = 0

    for i in range(num_episodes):
        new_state = env.reset()
        steps = 0
        while True:
            steps += 1

            #  Select optimal action
            action = optimal_policy[new_state]
            # action = env.action_space.sample()
            new_state, reward, done, info = env.step(action)
            # env.render()
            if done:
                if reward:
                    success += 1
                break

    #  Close the environment
    env.close()
    env.env.close()

    #  Calculate average number of steps we survived for all the episodes
    print("Success Rate: {}%".format((success / num_episodes) * 100))

if __name__ == "__main__":
    main()

Policy-Iteration converged at step 6.



[[0.0688909  0.06141457 0.07440976 0.05580732]
 [0.09185454 0.         0.11220821 0.        ]
 [0.14543635 0.24749695 0.29961759 0.        ]
 [0.         0.3799359  0.63902015 0.        ]]


[[0. 3. 0. 3.]
 [0. 0. 0. 0.]
 [3. 1. 0. 0.]
 [0. 2. 1. 0.]]
Success Rate: 79.0%
