In [11]:
pip install gymnasium gym-walk



In [12]:
import warnings ; warnings.filterwarnings('ignore')

import gymnasium as gym # Updated import
import gym_walk
import numpy as np

import random
import warnings

warnings.filterwarnings('ignore', category=DeprecationWarning)
np.set_printoptions(suppress=True)
random.seed(123); np.random.seed(123)

In [3]:


def print_policy(pi, P, action_symbols=('<', 'v', '>', '^'), n_cols=4, title='Policy:'):
    print(title)
    arrs = {k:v for k,v in enumerate(action_symbols)}
    for s in range(len(P)):
        a = pi(s)
        print("| ", end="")
        if np.all([done for action in P[s].values() for _, _, _, done in action]):
            print("".rjust(9), end=" ")
        else:
            print(str(s).zfill(2), arrs[a].rjust(6), end=" ")
        if (s + 1) % n_cols == 0: print("|")

In [4]:
def print_state_value_function(V, P, n_cols=4, prec=3, title='State-value function:'):
    print(title)
    for s in range(len(P)):
        v = V[s]
        print("| ", end="")
        if np.all([done for action in P[s].values() for _, _, _, done in action]):
            print("".rjust(9), end=" ")
        else:
            print(str(s).zfill(2), '{}'.format(np.round(v, prec)).rjust(6), end=" ")
        if (s + 1) % n_cols == 0: print("|")


In [31]:
def probability_success(env, pi, goal_state, n_episodes=100, max_steps=200):
    random.seed(123); np.random.seed(123) ; # env.seed(123) # Seeding is now done in reset()
    results = []
    for _ in range(n_episodes):
        state, info = env.reset(seed=123) # env.reset() returns a tuple (state, info)
        done, steps = False, 0
        while not done and steps < max_steps:
            action = pi(state) # Pass only the state to the policy function
            state, reward, terminated, truncated, h = env.step(action) # env.step() returns a tuple (state, reward, terminated, truncated, info)
            done = terminated or truncated
            steps += 1
        results.append(state == goal_state)
    return np.sum(results)/len(results)

In [32]:
def mean_return(env, pi, n_episodes=100, max_steps=200):
    random.seed(123); np.random.seed(123) ; # env.seed(123) # Seeding is now done in reset()
    results = []
    for _ in range(n_episodes):
        state, info = env.reset(seed=123) # env.reset() returns a tuple (state, info)
        done, steps = False, 0
        results.append(0.0)
        while not done and steps < max_steps:
            action = pi(state) # Pass only the state to the policy function
            state, reward, terminated, truncated, _ = env.step(action) # env.step() returns a tuple (state, reward, terminated, truncated, info)
            done = terminated or truncated
            results[-1] += reward
            steps += 1
    return np.mean(results)

In [18]:
env = gym.make('FrozenLake-v1')
P = env.unwrapped.P # Access P from the unwrapped environment
init_state = env.reset()
goal_state = 15
LEFT, DOWN, RIGHT, UP = range(4)

In [19]:


P

{0: {0: [(0.33333333333333337, 0, 0, False),
   (0.3333333333333333, 0, 0, False),
   (0.33333333333333337, 4, 0, False)],
  1: [(0.33333333333333337, 0, 0, False),
   (0.3333333333333333, 4, 0, False),
   (0.33333333333333337, 1, 0, False)],
  2: [(0.33333333333333337, 4, 0, False),
   (0.3333333333333333, 1, 0, False),
   (0.33333333333333337, 0, 0, False)],
  3: [(0.33333333333333337, 1, 0, False),
   (0.3333333333333333, 0, 0, False),
   (0.33333333333333337, 0, 0, False)]},
 1: {0: [(0.33333333333333337, 1, 0, False),
   (0.3333333333333333, 0, 0, False),
   (0.33333333333333337, 5, 0, True)],
  1: [(0.33333333333333337, 0, 0, False),
   (0.3333333333333333, 5, 0, True),
   (0.33333333333333337, 2, 0, False)],
  2: [(0.33333333333333337, 5, 0, True),
   (0.3333333333333333, 2, 0, False),
   (0.33333333333333337, 1, 0, False)],
  3: [(0.33333333333333337, 2, 0, False),
   (0.3333333333333333, 1, 0, False),
   (0.33333333333333337, 0, 0, False)]},
 2: {0: [(0.33333333333333337, 2, 0

In [9]:


init_state

0

In [21]:
state, reward, terminated, truncated, info = env.step(RIGHT)
done = terminated or truncated
print("state:{0} - reward:{1} - done:{2} - info:{3}".format(state, reward, done, info))

state:0 - reward:0 - done:False - info:{'prob': 0.33333333333333337}


In [28]:
# Adversarial Policy
pi_frozenlake1 = lambda s: {
    0: RIGHT,
    1: RIGHT,
    2: RIGHT,
    3: RIGHT,
    4: RIGHT,
    5: RIGHT,
    6: RIGHT,
    7: RIGHT,
    8: RIGHT,
    9: RIGHT,
    10:RIGHT,
    11:RIGHT,
    12:RIGHT,
    13:RIGHT,
    14:RIGHT,
    15:RIGHT #Stop
}[s]
print("Name:  ROHIT G   ")
print("Register Number: 212222240083        ")
print_policy(pi_frozenlake1, P, action_symbols=('<', 'v', '>', '^'), n_cols=4)

Name:  ROHIT G   
Register Number: 212222240083        
Policy:
| 00      > | 01      > | 02      > | 03      > |
| 04      > |           | 06      > |           |
| 08      > | 09      > | 10      > |           |
|           | 13      > | 14      > |           |


In [33]:
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
    probability_success(env, pi_frozenlake1, goal_state=goal_state)*100,
    mean_return(env, pi_frozenlake1)))

Reaches goal 0.00%. Obtains an average undiscounted return of 0.0000.


In [34]:


def policy_evaluation(pi, P, gamma=1.0, theta=1e-10):
    prev_V = np.zeros(len(P), dtype=np.float64)
    while True:
        V = np.zeros(len(P), dtype=np.float64)
        for s in range(len(P)):
            for prob, next_state, reward, done in P[s][pi(s)]:
                V[s] += prob * (reward + gamma * prev_V[next_state] * (not done))
        if np.max(np.abs(prev_V - V)) < theta:
            break
        prev_V = V.copy()
    return V


In [35]:

# Code to evaluate the adversarial policy
V1 = policy_evaluation(pi_frozenlake1, P)
print("Name: ROHIT G     ")
print("Register Number:   212222240083     ")
print_state_value_function(V1, P, n_cols=4, prec=5)

Name: ROHIT G     
Register Number:   212222240083     
State-value function:
| 00 0.0315 | 01 0.02381 | 02 0.04762 | 03    0.0 |
| 04 0.03919 |           | 06 0.09524 |           |
| 08 0.08608 | 09 0.21905 | 10 0.2381 |           |
|           | 13 0.41905 | 14 0.61905 |           |


In [36]:
def policy_improvement(V, P, gamma=1.0):
    Q = np.zeros((len(P), len(P[0])), dtype=np.float64)
    # Write your code here to improve the given policy
    for s in range(len(P)):
      for a in range(len(P[s])):
        for prob,next_state,reward,done in P[s][a]:
          Q[s][a]+=prob*(reward+gamma*V[next_state]*(not done))
          new_pi=lambda s:{s:a for s, a in enumerate(np.argmax(Q,axis=1))}[s]
    return new_pi

In [37]:
pi_2 = policy_improvement(V1, P)
print("Name: ROHIT G     ")
print("Register Number:  212222240083       ")
print_policy(pi_2, P, action_symbols=('<', 'v', '>', '^'), n_cols=4)

Name: ROHIT G     
Register Number:  212222240083       
Policy:
| 00      < | 01      ^ | 02      < | 03      v |
| 04      < |           | 06      < |           |
| 08      ^ | 09      v | 10      < |           |
|           | 13      > | 14      v |           |


In [38]:


print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
    probability_success(env, pi_2, goal_state=goal_state)*100,
    mean_return(env, pi_2)))


Reaches goal 100.00%. Obtains an average undiscounted return of 1.0000.


In [39]:

V2 = policy_evaluation(pi_2, P)
print("Name:  ROHIT G    ")
print("Register Number:   212222240083     ")
print_state_value_function(V2, P, n_cols=4, prec=5)


Name:  ROHIT G    
Register Number:   212222240083     
State-value function:
| 00 0.78049 | 01 0.65854 | 02 0.53659 | 03 0.26829 |
| 04 0.78049 |           | 06 0.41463 |           |
| 08 0.78049 | 09 0.78049 | 10 0.70732 |           |
|           | 13 0.85366 | 14 0.92683 |           |


In [40]:


# comparing the initial and the improved policy
if(np.sum(V1>=V2)==16):
  print("The Adversarial policy is the better policy")
elif(np.sum(V2>=V1)==16):
  print("The Improved policy is the better policy")
else:
  print("Both policies have their merits.")

The Improved policy is the better policy


In [41]:
def policy_iteration(P, gamma=1.0, theta=1e-10):
   random_actions=np.random.choice(tuple(P[0].keys()),len(P))
   pi = lambda s: {s:a for s, a in enumerate(random_actions)}[s]
   while True:
    old_pi = {s:pi(s) for s in range(len(P))}
    V = policy_evaluation(pi, P,gamma,theta)
    pi = policy_improvement(V,P,gamma)
    if old_pi == {s:pi(s) for s in range(len(P))}:
      break
   return V, pi

In [42]:

optimal_V, optimal_pi = policy_iteration(P)

In [43]:
print("Name:   ROHIT G   ")
print("Register Number:    212222240083    ")
print('Optimal policy and state-value function (PI):')
print_policy(optimal_pi, P, action_symbols=('<', 'v', '>', '^'), n_cols=4)

Name:   ROHIT G   
Register Number:    212222240083    
Optimal policy and state-value function (PI):
Policy:
| 00      < | 01      ^ | 02      ^ | 03      ^ |
| 04      < |           | 06      < |           |
| 08      ^ | 09      v | 10      < |           |
|           | 13      > | 14      v |           |


In [44]:

print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
    probability_success(env, optimal_pi, goal_state=goal_state)*100,
    mean_return(env, optimal_pi)))

Reaches goal 100.00%. Obtains an average undiscounted return of 1.0000.


In [45]:


print("Name: ROHIT G    ")
print("Register Number:    212222240083   ")
print_state_value_function(optimal_V, P, n_cols=7, prec=5)

Name: ROHIT G    
Register Number:    212222240083   
State-value function:
| 00 0.82353 | 01 0.82353 | 02 0.82353 | 03 0.82353 | 04 0.82353 |           | 06 0.52941 |
|           | 08 0.82353 | 09 0.82353 | 10 0.76471 |           |           | 13 0.88235 |
| 14 0.94118 |           