In [63]:
import gymnasium as gym
import copy
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.font_manager
import random

In [64]:

class StochasticMazeEnv(gym.Env):

    metadata = {'render.modes': ['human']}
    
    def __init__(self,initial_state=0,no_states=12,no_actions=4):
        
        self.initial_state = initial_state
        self.state = self.initial_state
        self.nA = no_actions
        self.nS = no_states
        self.actions_dict = {"L":0, "U":1, "R":2, "D":3}
        self.prob_dynamics = {
            # Write what does this MDP dynamics show !!!!
            0: {
                0: [(0.8, 0, -0.01, False), (0.1, 0, -0.01, False), (0.1, 4, -0.01, False)],
                1: [(0.8, 0, -0.01, False), (0.1, 0, -0.01, False), (0.1, 1, -0.01, False)],
                2: [(0.8, 1, -0.01, False), (0.1, 0, -0.01, False), (0.1, 4, -0.01, False)],
                3: [(0.8, 4, -0.01, False), (0.1, 0, -0.01, False), (0.1, 1, -0.01, False)],
            },
            1: {
                0: [(0.8, 0, -0.01, False), (0.1, 1, -0.01, False), (0.1, 1, -0.01, False)],
                1: [(0.8, 1, -0.01, False), (0.1, 0, -0.01, False), (0.1, 2, -0.01, False)],
                2: [(0.8, 2, -0.01, False), (0.1, 1, -0.01, False), (0.1, 1, -0.01, False)],
                3: [(0.8, 1, -0.01, False), (0.1, 0, -0.01, False), (0.1, 2, -0.01, False)],
            },
            2: {
                0: [(0.8, 1, -0.01, False), (0.1, 2, -0.01, False), (0.1, 6, -0.01, False)],
                1: [(0.8, 2, -0.01, False), (0.1, 1, -0.01, False), (0.1, 3, +1, True)],
                2: [(0.8, 3, +1, True), (0.1, 2, -0.01, False), (0.1, 6, -0.01, False)],
                3: [(0.8, 6, -0.01, False), (0.1, 1, -0.01, False), (0.1, 3, +1, True)],
            },
            3: {
                0: [(0.8, 3, 0, True), (0.1, 3, 0, True), (0.1, 3, 0, True)],
                1: [(0.8, 3, 0, True), (0.1, 3, 0, True), (0.1, 3, 0, True)],
                2: [(0.8, 3, 0, True), (0.1, 3, 0, True), (0.1, 3, 0, True)],
                3: [(0.8, 3, 0, True), (0.1, 3, 0, True), (0.1, 3, 0, True)],
            },
            4: {
                0: [(0.8, 4, -0.01, False), (0.1, 0, -0.01, False), (0.1, 8, -0.01, False)],
                1: [(0.8, 0, -0.01, False), (0.1, 4, -0.01, False), (0.1, 4, -0.01, False)],
                2: [(0.8, 4, -0.01, False), (0.1, 0, -0.01, False), (0.1, 8, -0.01, False)],
                3: [(0.8, 8, -0.01, False), (0.1, 4, -0.01, False), (0.1, 4, -0.01, False)],
            },
            6: {
                0: [(0.8, 6, -0.01, False), (0.1, 2, -0.01, False), (0.1, 10, -0.01, False)],
                1: [(0.8, 2, -0.01, False), (0.1, 6, -0.01, False), (0.1, 7, -1, True)],
                2: [(0.8, 7, -1, True), (0.1, 2, -0.01, False), (0.1, 10, -0.01, False)],
                3: [(0.8, 10, -0.01, False), (0.1, 6, -0.01, False), (0.1, 7, -1, True)],
            },
            7: {
                0: [(0.8, 7, 0, True), (0.1, 7, 0, True), (0.1, 7, 0, True)],
                1: [(0.8, 7, 0, True), (0.1, 7, 0, True), (0.1, 7, 0, True)],
                2: [(0.8, 7, 0, True), (0.1, 7, 0, True), (0.1, 7, 0, True)],
                3: [(0.8, 7, 0, True), (0.1, 7, 0, True), (0.1, 7, 0, True)],
            },
            8: {
                0: [(0.8, 8, -0.01, False), (0.1, 8, -0.01, False), (0.1, 4, -0.01, False)],
                1: [(0.8, 4, -0.01, False), (0.1, 8, -0.01, False), (0.1, 9, -0.01, False)],
                2: [(0.8, 9, -0.01, False), (0.1, 8, -0.01, False), (0.1, 4, -0.01, False)],
                3: [(0.8, 8, -0.01, False), (0.1, 8, -0.01, False), (0.1, 9, -0.01, False)],
            },
            9: {
                0: [(0.8, 8, -0.01, False), (0.1, 9, -0.01, False), (0.1, 9, -0.01, False)],
                1: [(0.8, 9, -0.01, False), (0.1, 8, -0.01, False), (0.1, 10, -0.01, False)],
                2: [(0.8, 10, -0.01, False), (0.1, 9, -0.01, False), (0.1, 9, -0.01, False)],
                3: [(0.8, 9, -0.01, False), (0.1, 8, -0.01, False), (0.1, 10, -0.01, False)]
            },
            10: {
                0: [(0.8, 9, -0.01, False), (0.1, 6, -0.01, False), (0.1, 10, -0.01, False)],
                1: [(0.8, 6, -0.01, False), (0.1, 9, -0.01, False), (0.1, 11, -0.01, False)],
                2: [(0.8, 11, -0.01, False), (0.1, 6, -0.01, False), (0.1, 10, -0.01, False)],
                3: [(0.8, 10, -0.01, False), (0.1, 9, -0.01, False), (0.1, 11, -0.01, False)]
            },
            11: {
                0: [(0.8, 10, -0.01, False), (0.1, 7, -1, True), (0.1, 11, -0.01, False)],
                1: [(0.8, 7, -1, True), (0.1, 10, -0.01, False), (0.1, 11, -0.01, False)],
                2: [(0.8, 11, -0.01, False), (0.1, 7, -1, True), (0.1, 11, -0.01, False)],
                3: [(0.8, 11, -0.01, False), (0.1, 11, -0.01, False), (0.1, 10, -0.01, False)]
            }
        }
        self.reset()

    def reset(self):
        self.state = self.initial_state
        return self.get_obs()

    def get_obs(self):
        return (self.state)

    def render(self, mode='human'):
        print("Current state: {}".format(self.state))

    def sample_action(self):
        return random.randint(0, self.nA)
    
    def P(self):
        
        self.prob_dynamics = {
            0: {
                0: [(0.8, 0, -0.01, False), (0.1, 0, -0.01, False), (0.1, 4, -0.01, False)],
                1: [(0.8, 0, -0.01, False), (0.1, 0, -0.01, False), (0.1, 1, -0.01, False)],
                2: [(0.8, 1, -0.01, False), (0.1, 0, -0.01, False), (0.1, 4, -0.01, False)],
                3: [(0.8, 4, -0.01, False), (0.1, 0, -0.01, False), (0.1, 1, -0.01, False)],
            },
            1: {
                0: [(0.8, 0, -0.01, False), (0.1, 1, -0.01, False), (0.1, 1, -0.01, False)],
                1: [(0.8, 1, -0.01, False), (0.1, 0, -0.01, False), (0.1, 2, -0.01, False)],
                2: [(0.8, 2, -0.01, False), (0.1, 1, -0.01, False), (0.1, 1, -0.01, False)],
                3: [(0.8, 1, -0.01, False), (0.1, 0, -0.01, False), (0.1, 2, -0.01, False)],
            },
            2: {
                0: [(0.8, 1, -0.01, False), (0.1, 2, -0.01, False), (0.1, 6, -0.01, False)],
                1: [(0.8, 2, -0.01, False), (0.1, 1, -0.01, False), (0.1, 3, +1, True)],
                2: [(0.8, 3, +1, True), (0.1, 2, -0.01, False), (0.1, 6, -0.01, False)],
                3: [(0.8, 6, -0.01, False), (0.1, 1, -0.01, False), (0.1, 3, +1, True)],
            },
            3: {
                0: [(0.8, 3, 0, True), (0.1, 3, 0, True), (0.1, 3, 0, True)],
                1: [(0.8, 3, 0, True), (0.1, 3, 0, True), (0.1, 3, 0, True)],
                2: [(0.8, 3, 0, True), (0.1, 3, 0, True), (0.1, 3, 0, True)],
                3: [(0.8, 3, 0, True), (0.1, 3, 0, True), (0.1, 3, 0, True)],
            },
            4: {
                0: [(0.8, 4, -0.01, False), (0.1, 0, -0.01, False), (0.1, 8, -0.01, False)],
                1: [(0.8, 0, -0.01, False), (0.1, 4, -0.01, False), (0.1, 4, -0.01, False)],
                2: [(0.8, 4, -0.01, False), (0.1, 0, -0.01, False), (0.1, 8, -0.01, False)],
                3: [(0.8, 8, -0.01, False), (0.1, 4, -0.01, False), (0.1, 4, -0.01, False)],
            },
            6: {
                0: [(0.8, 6, -0.01, False), (0.1, 2, -0.01, False), (0.1, 10, -0.01, False)],
                1: [(0.8, 2, -0.01, False), (0.1, 6, -0.01, False), (0.1, 7, -1, True)],
                2: [(0.8, 7, -1, True), (0.1, 2, -0.01, False), (0.1, 10, -0.01, False)],
                3: [(0.8, 10, -0.01, False), (0.1, 6, -0.01, False), (0.1, 7, -1, True)],
            },
            7: {
                0: [(0.8, 7, 0, True), (0.1, 7, 0, True), (0.1, 7, 0, True)],
                1: [(0.8, 7, 0, True), (0.1, 7, 0, True), (0.1, 7, 0, True)],
                2: [(0.8, 7, 0, True), (0.1, 7, 0, True), (0.1, 7, 0, True)],
                3: [(0.8, 7, 0, True), (0.1, 7, 0, True), (0.1, 7, 0, True)],
            },
            8: {
                0: [(0.8, 8, -0.01, False), (0.1, 8, -0.01, False), (0.1, 4, -0.01, False)],
                1: [(0.8, 4, -0.01, False), (0.1, 8, -0.01, False), (0.1, 9, -0.01, False)],
                2: [(0.8, 9, -0.01, False), (0.1, 8, -0.01, False), (0.1, 4, -0.01, False)],
                3: [(0.8, 8, -0.01, False), (0.1, 8, -0.01, False), (0.1, 9, -0.01, False)],
            },
            9: {
                0: [(0.8, 8, -0.01, False), (0.1, 9, -0.01, False), (0.1, 9, -0.01, False)],
                1: [(0.8, 9, -0.01, False), (0.1, 8, -0.01, False), (0.1, 10, -0.01, False)],
                2: [(0.8, 10, -0.01, False), (0.1, 9, -0.01, False), (0.1, 9, -0.01, False)],
                3: [(0.8, 9, -0.01, False), (0.1, 8, -0.01, False), (0.1, 10, -0.01, False)]
            },
            10: {
                0: [(0.8, 9, -0.01, False), (0.1, 6, -0.01, False), (0.1, 10, -0.01, False)],
                1: [(0.8, 6, -0.01, False), (0.1, 9, -0.01, False), (0.1, 11, -0.01, False)],
                2: [(0.8, 11, -0.01, False), (0.1, 6, -0.01, False), (0.1, 10, -0.01, False)],
                3: [(0.8, 10, -0.01, False), (0.1, 9, -0.01, False), (0.1, 11, -0.01, False)]
            },
            11: {
                0: [(0.8, 10, -0.01, False), (0.1, 7, -1, True), (0.1, 11, -0.01, False)],
                1: [(0.8, 7, -1, True), (0.1, 10, -0.01, False), (0.1, 11, -0.01, False)],
                2: [(0.8, 11, -0.01, False), (0.1, 7, -1, True), (0.1, 11, -0.01, False)],
                3: [(0.8, 11, -0.01, False), (0.1, 11, -0.01, False), (0.1, 10, -0.01, False)]
            }
        }
        return self.prob_dynamics


    def step(self, action):
        '''
        Performs the given action
        Args:
            action : action from the action_space to be taking in the environment
        Returns:
            observation - returns current state
            reward - reward obtained after taking the given action
            done - True if the episode is complete else False
        '''
        if action >= self.nA:
            action = self.nA-1

        index = np.random.choice(3,1,p=[0.8,0.1,0.1])[0]

        dynamics_tuple = self.prob_dynamics[self.state][action][index]
        self.state = dynamics_tuple[1]
        

        return self.state, dynamics_tuple[2], dynamics_tuple[3]

In [65]:
# Policy Iteration 
class PolicyIterationAgent:
    def __init__(self, env):
        self.env = env
        self.nS = env.nS
        self.nA = env.nA
        # Creating a policy matrix with each row representing a state.
        # And each column representing an action.
        # Initially all actions for all state are considered to be of equal probability creating a uniform distribution.
        self.policy = np.zeros((self.nS, self.nA))  # Initialize a policy in which we are taking no action Or in deterministic term taking any action at random

    # Defining the policy evaluation function.
    def policy_evaluation(self, policy, gamma=0.9, theta=0.001):
        # Initialising the Value array having elements = number of states and having initially all zero elements.
        V = np.zeros(self.nS)

        while True:
            delta = 0
            for s in range(self.nS):
                v = V[s]
                if s == 5:  
                    # For state 5 (the wall), set the value to -10 because we will hit the wall and we know that it doesn't give any benefit
                    # because intial state remain same as the final state.
                    V[s] = 0
                else:
                    # This calculates the value of state ('V[s]') using Boolean equation.
                    # For a in range(self.nA) this iterates over all actions a in the given state s.
                    # Calculating V[s] using the bellman expectation equation.
                    # Summing over all possible states and all possible actions.
                    V[s] = sum(policy[s, a] * sum(p * (r + gamma * V[s_prime]) for p, s_prime, r, _ in self.env.prob_dynamics[s][a]) for a in range(self.nA))
                # just to calculate the convergence.
                delta = max(delta, abs(v - V[s]))
            
            # If delta is less than theta then we have to break assuming that we converged to the optimal policy
            if delta < theta:
                break
        # Returning the value array for the policy.
        return V

    # Doing the policy improvement.
    def policy_improvement(self, V, gamma=0.9):
        policy_stable = True
        for s in range(self.nS):
            old_action = np.argmax(self.policy[s])

            if s == 5:  # For state 5 (the wall), we will never be at that state
                self.policy[s] = np.full_like(self.nA,-1)
                continue

            action_values = np.zeros(self.nA)
            # iterating over all possible action and taking the one with maximum value for state S.
            for a in range(self.nA):
                action_values[a] = sum(p * (r + gamma * V[s_prime]) for p, s_prime, r, _ in self.env.prob_dynamics[s][a])
            # finding the best action for state s
            best_action = np.argmax(action_values)
            if(s!=5):
                # Updatign the policy by making the probability of best action as one and rest as 0
                self.policy[s] = np.eye(self.nA)[best_action]

            # If old action is not same as the best action , this implies that we have not yet converged to the best policy.
            if old_action != best_action:
                policy_stable = False
        
        return policy_stable

    def policy_iteration(self, gamma=0.9, max_iterations=1000):
        for _ in range(max_iterations):
            V = self.policy_evaluation(self.policy, gamma)
            policy_stable = self.policy_improvement(V, gamma)
            # If we have reached the optimal policy then break.
            if policy_stable:
                break

        return self.policy

# Usage
env = StochasticMazeEnv()
agent = PolicyIterationAgent(env)
optimal_policy = agent.policy_iteration()
my_dict= {-1 : "Wall" ,0 : "Left", 1: "Up" , 2: "Right" , 4: "Down"}
print("My policy is this >>>>>")
print(optimal_policy)
print()
print("The best policy is :")
for s in range(0,12):
    if(s==5):
        print("State: 5 => This is Wall")
    elif(s==3):
        print("State: 4 => Episode Ends :)")
    else:
        print("State:",s,"Action:",my_dict[np.argmax(optimal_policy[s])])


My policy is this >>>>>
[[ 0.  0.  1.  0.]
 [ 0.  0.  1.  0.]
 [ 0.  0.  1.  0.]
 [ 1.  0.  0.  0.]
 [ 0.  1.  0.  0.]
 [-1. -1. -1. -1.]
 [ 0.  1.  0.  0.]
 [ 1.  0.  0.  0.]
 [ 0.  1.  0.  0.]
 [ 0.  0.  1.  0.]
 [ 0.  1.  0.  0.]
 [ 1.  0.  0.  0.]]

The best policy is :
State: 0 Action: Right
State: 1 Action: Right
State: 2 Action: Right
State: 4 => Episode Ends :)
State: 4 Action: Up
State: 5 => This is Wall
State: 6 Action: Up
State: 7 Action: Left
State: 8 Action: Up
State: 9 Action: Right
State: 10 Action: Up
State: 11 Action: Left


In [66]:
# Value iteration
class Valueiterationagent:
    def __init__(self,env) :
        self.env = env
        self.nS = env.nS
        self.nA = env.nA
        self.policy=np.zeros(self.nS)
    # Evaluating the value using V[s]= maxa(sum over s'(p(s')*(r+V[s'])))
    # Continuing this evauluation till the V converges      
    def value_evaluation(self,gamma=0.9,theta=0.001):
        V=np.zeros(self.nS)
        while True:
            delta=0.0
            for s in range(self.nS):
                if(s==5):
                    continue
                v=V[s]
                for a in range(self.nA):
                    sum=0
                    for p, s_prime, r, _ in self.env.prob_dynamics[s][a]:
                        sum+=(p*(r+gamma*V[s_prime]))
                    V[s]=max(V[s],sum)
                
                delta=max(delta,abs(v-V[s]))
            
            if(delta<theta):
                break
        return V

    # Performing the Value Iteration.
    def value_iteration(self,gamma=0.9):
        # Calculating the value for each state using the value_evaluation function
        V = self.value_evaluation(gamma)
        self.policy[5]=-1
        # Finding the Deterministic Policy by rule policy[s]= argmax a (sum over s'(p*(r+gamma*V[s'])))
        for s in range(self.nS):
            if(s==5):
                continue
            prev_sum=0
            for a in range(self.nA):
                sum=0
                for p,s_prime,r,_ in self.env.prob_dynamics[s][a]:
                    sum+=(p*(r+gamma*V[s_prime]))
                if(sum>prev_sum):
                    self.policy[s]=a
                    prev_sum=sum
        # because if initial state is state 3 then episode will end instantanously
        self.policy[3]=-1        
        print("The policy is" ,self.policy)
                

env = StochasticMazeEnv()
agent = Valueiterationagent(env)
optimal_policy = agent.value_iteration()
# In the Policy -1 is shown for the wall because this state is never possible
# In the Policy -1 is shown for the Goal because , If our initial state is the Goal then the episode will end and we will not take any action

The policy is [ 2.  2.  2. -1.  1. -1.  1.  0.  1.  2.  1.  0.]
