In [23]:
import sys
from google.colab import drive
drive.mount('/content/drive')
sys.path.append('/content/drive/MyDrive/DRL. policy iteration')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [24]:
from Frozen_Lake import FrozenLakeEnv

In [25]:
import numpy as np
import time

#How our environment works

[Frozen Lake](https://www.gymlibrary.dev/environments/toy_text/frozen_lake/)

In [26]:
env = FrozenLakeEnv()

In [27]:
env.get_all_states() #4x4 field

((0, 0),
 (0, 1),
 (0, 2),
 (0, 3),
 (1, 0),
 (1, 1),
 (1, 2),
 (1, 3),
 (2, 0),
 (2, 1),
 (2, 2),
 (2, 3),
 (3, 0),
 (3, 1),
 (3, 2),
 (3, 3))

In [28]:
state = (1, 2)
env.get_possible_actions(state)

('left', 'down', 'right', 'up')

In [29]:
env.render()

*FFF
FHFH
FFFH
HFFG



In [30]:
state = (3, 2)
action = 'right'
env.get_next_states(state, action)

#(1, 1) with prob 0.1
#(0, 2) with prob 0.8
#stay in the same with prob 0.1

{(3, 2): 0.1, (3, 3): 0.8, (2, 2): 0.1}

In [31]:
next_state = (3, 3)
env.get_transition_prob(state, action, next_state)

#prob of moving to the 'next_state' from 'state' with such action

0.8

In [32]:
env.get_reward(state, action, next_state)

#positive reward only if we reach finish point (3, 3) else 0

1.0

In [33]:
state = (3,3)

env.is_terminal(state)

#true if it is last state or hole

True

#Algorithm

If reward don not depend on s' - next state:
$$
q(s, a) = R(s, a) + \gamma \sum_{s'}P(s'|s, a) v(s')
$$
If reward depends on s': (our case)
$$
q(s, a) = \sum_{s'} P(s'|s, a) \Big( R(s, a, s') + \gamma  v(s')\Big)
$$

If reward do not depends on s' - it is special case of the dependece on s'

$$
q(s, a) = \sum_{s'} P(s'|s, a) \Big( R(s, a) + \gamma  v(s')\Big) = R(s, a) \sum_{s'} P(s'|s,a) + \gamma \sum_{s'}P(s'|s, a) v(s') = R(s, a) + \gamma \sum_{s'}P(s'|s, a) v(s')
$$

In [34]:
def get_q_values(v_values, gamma):
  q_values = {}
  for state in env.get_all_states():
    q_values[state] = {}
    for action in env.get_possible_actions(state):
      q_values[state][action] = 0
      for next_state in env.get_next_states(state, action):
        q_values[state][action] += env.get_transition_prob(state, action, next_state) * env.get_reward(state, action, next_state)
        q_values[state][action] += gamma * env.get_transition_prob(state, action, next_state) * v_values[next_state]

  return q_values

In [35]:
def init_policy():
  policy = {}
  for state in env.get_all_states():
    policy[state] = {}
    for action in env.get_possible_actions(state):
      policy[state][action] = 1 / len(env.get_possible_actions(state)) #uniform filling

  return policy

In [36]:
def init_v_values():
  v_values = {}
  for state in env.get_all_states():
    v_values[state] = 0
  return v_values

We can obtain $v_{\pi}(s)$ by defining $q_{\pi}(s, a)$ which depends on $v_{\pi}(s')$:


---



$q_{\pi}(s, a) = R(s,a) + \gamma \sum_{s'} P(s'|s, a)v_{\pi}(s')$

$v_{\pi}(s) = \sum_{a} \pi (a|s) q_{\pi}(s, a)$


In [37]:
def policy_evaluation_step(v_values, policy, gamma): #right side of bellman equation
    q_values = get_q_values(v_values, gamma)
    new_v_values = init_v_values()
    for state in env.get_all_states():
      new_v_values[state] = 0
      for action in env.get_possible_actions(state):
        new_v_values[state] += policy[state][action] * q_values[state][action]

    return new_v_values

Iterative Policy Evaluation:

---

$v^{l+1} = R_{\pi^k} + P_{\pi^k} v^l \text{ for } l \in \overline{0, L-1}$

Then we can define $q^L (s, a)$ by $v^L (s)$ (as before).



In [38]:
def policy_evaluation(policy, gamma, eval_iter_n):
  v_values = init_v_values()
  for _ in range(eval_iter_n):
    v_values = policy_evaluation_step(v_values, policy, gamma)
  q_values = get_q_values(v_values, gamma)
  return q_values

Greedy Policy Improvement:


---

$$\pi^{k+1}(a|s) = \begin{cases}
  1, & \text{if } a\in argmax_{a' \in A} q^L (s, a') \\    
  0, & \text{otherwise}   
\end{cases}
$$

In [88]:
def policy_improvement(q_values):
  policy = {}
  for state in env.get_all_states():
    policy[state] = {}
    argmax_action = None
    max_q_value = float('-inf')

    for action in env.get_possible_actions(state):
      policy[state][action] = 0

      if q_values[state][action] > max_q_value:
        argmax_action = action
        max_q_value = q_values[state][action]

    policy[state][argmax_action] = 1

  return policy

In [89]:
iter_n = 100
eval_iter_n = 100 #iteration of evaluation stage
gamma = 0.99

policy = init_policy()

for _ in range(iter_n):
  q_values = policy_evaluation(policy, gamma, eval_iter_n)
  policy = policy_improvement(q_values)

In [90]:
policy

{(0, 0): {'left': 0, 'down': 1, 'right': 0, 'up': 0},
 (0, 1): {'left': 0, 'down': 0, 'right': 0, 'up': 1},
 (0, 2): {'left': 0, 'down': 1, 'right': 0, 'up': 0},
 (0, 3): {'left': 0, 'down': 0, 'right': 0, 'up': 1},
 (1, 0): {'left': 1, 'down': 0, 'right': 0, 'up': 0},
 (1, 1): {None: 1},
 (1, 2): {'left': 0, 'down': 1, 'right': 0, 'up': 0},
 (1, 3): {None: 1},
 (2, 0): {'left': 0, 'down': 0, 'right': 1, 'up': 0},
 (2, 1): {'left': 0, 'down': 1, 'right': 0, 'up': 0},
 (2, 2): {'left': 1, 'down': 0, 'right': 0, 'up': 0},
 (2, 3): {None: 1},
 (3, 0): {None: 1},
 (3, 1): {'left': 0, 'down': 0, 'right': 1, 'up': 0},
 (3, 2): {'left': 0, 'down': 0, 'right': 1, 'up': 0},
 (3, 3): {None: 1}}

In [94]:
total_rewards = []

for _ in range(500):
  total_reward = 0
  state = env.reset()
  for _ in range(1000):
    action = np.random.choice(env.get_possible_actions(state), p=list(policy[state].values()))
    state, reward, done, _ = env.step(action)
    total_reward += reward

    #env.render()
    #time.sleep(0.2)

    if done:
      break

  total_rewards.append(total_reward)

np.mean(total_rewards) #percent of right moves

0.874

In [96]:
policy

{(0, 0): {'left': 0, 'down': 1, 'right': 0, 'up': 0},
 (0, 1): {'left': 0, 'down': 0, 'right': 0, 'up': 1},
 (0, 2): {'left': 0, 'down': 1, 'right': 0, 'up': 0},
 (0, 3): {'left': 0, 'down': 0, 'right': 0, 'up': 1},
 (1, 0): {'left': 1, 'down': 0, 'right': 0, 'up': 0},
 (1, 1): {None: 1},
 (1, 2): {'left': 0, 'down': 1, 'right': 0, 'up': 0},
 (1, 3): {None: 1},
 (2, 0): {'left': 0, 'down': 0, 'right': 1, 'up': 0},
 (2, 1): {'left': 0, 'down': 1, 'right': 0, 'up': 0},
 (2, 2): {'left': 1, 'down': 0, 'right': 0, 'up': 0},
 (2, 3): {None: 1},
 (3, 0): {None: 1},
 (3, 1): {'left': 0, 'down': 0, 'right': 1, 'up': 0},
 (3, 2): {'left': 0, 'down': 0, 'right': 1, 'up': 0},
 (3, 3): {None: 1}}