# 03 Algorithms

## 3.3 Truncated Policy Iteration

### Comparing Value Iteration and Policy Iteration
- 策略迭代从任意的初始策略$\pi_k$开始，第$k$步遵循以下两个步骤：
  - 步骤Policy Evaluation(PE)：给定当前的策略$\pi_k$，计算相应的价值函数$v_{\pi_k}$，$$v_{\pi_k} = r_{\pi_k} + \gamma P_{\pi_k} v_{\pi_k} \ \rightarrow \ v_{\pi_{k+1}} = r_{\pi_k} + \gamma P_{\pi_k} v_{\pi_k}$$
  - 步骤Policy Improvement(PI)：基于当前的价值函数$v_{\pi_k}$，更新策略$\pi_{k+1}$，$$\pi_{k+1} = \arg\max_{\pi} (r_{\pi} + \gamma p_{\pi} v_{\pi_{k+1}})$$
- 价值迭代从任意的初始值函数$v_0$开始，第$k$步遵循以下两个步骤：
  - 步骤Policy Update(PU)：给定当前的值函数$v_k$，更新策略函数$\pi_{k+1}$，$$\pi_{k+1} = \max_{\pi} (r_{\pi} + \gamma P_{\pi} v_k)$$
  - 步骤Value Update(VU)：给定的策略函数$\pi_{k+1}$，更新价值函数$v_{k+1}$，$$v_{k+1} = r_{\pi_{k+1}} + \gamma p_{\pi_{k+1}} v_{k}$$

- 对比Policy Iteration(PI)和Value Iteration(VI)，我们可以看到两个算法是相似的：
  - Policy Iteration: $\pi_0 \xrightarrow{PE} v_{\pi_0} \xrightarrow{PI} \pi_1 \xrightarrow{PE} v_{\pi_1} \xrightarrow{PI} \pi_2 \xrightarrow{PE} v_{\pi_2} ... \xrightarrow{PE} \pi_{opt}$
  - Value Iteration: $\ \ \ \ \ \ \ \ \ \ \ \ \ \ v_0 \xrightarrow{PU} \pi'_1 \xrightarrow{VU} v_2 \xrightarrow{PU} \pi'_2 \xrightarrow{VU} v_3 ... \xrightarrow{VU} v_{opt} \xrightarrow{PU} \pi_{opt}$


- 进一步地，对$v$值迭代进行拆解：
  $$
  \begin{align*}
  v_{\pi_1}^0 &= v_0 \\
  Value Policy Iteration  \leftarrow \ v_1 \leftarrow v_{\pi_1}^1 &= r_{\pi_1} + \gamma P_{\pi_1}v_{\pi_1}^{0} \\
  v_{\pi_1}^2 &= r_{\pi_1} + \gamma P_{\pi_1}v_{\pi_1}^{1} \\
  \vdots \\
  Truncated Policy Iteration \leftarrow \ \hat v_1 \leftarrow v_{\pi_1}^j &= r_{\pi_1} + \gamma P_{\pi_1}v_{\pi_1}^{j-1} \\
  \vdots \\
  Policy Evaluation \leftarrow \leftarrow \ v_{\pi_1}^{\infty} &= r_{\pi_1}^{\infty} + \gamma P_{\pi_1}v_{\pi_1}^{\infty} \\
  \end{align*}
  $$

### Truncated Policy Iteration Algorithm
* 对于所有状态动作对$(s, a)$环境模型的$p(r|s, a)$和$p(s'|s, a)$都已知，随机初始化$\pi_0$
* $while \ v_{\pi_k} - v_{\pi_{k-1}} > \epsilon$ do:
* $\qquad$ $Policy \ Evaluation:$
* $\qquad$ 选择一个初始猜测值$v_{\pi_k}^{0}=v_{\pi_{k-1}}$, 确定一个最大迭代次数$j_{truncated}$
* $\qquad$ $while \ j < j_{truncated} \ do$:
* $\qquad\qquad$ $for \ s \in \cal S \ do:$
* $\qquad\qquad\qquad$ $v_{\pi_k}^{j+1}(s) \leftarrow \sum_{a \in \cal A(s)} \pi_k (a|s) \left (\sum_{r \in \cal R(s'|s, a)} p(r|s, a) r + \gamma \sum_{s' \in \cal S} p(s'|s, a) v_{\pi_k}^{j}(s') \right )$
* $\qquad\qquad$ $end \ for$
* $\qquad$ $v_{\pi_k} \leftarrow v_{\pi_k}^{j_{truncated}}$
* $\qquad$ $end \ while$
* $\qquad$ $Policy \ Improvement:$
* $\qquad$ $for \ s \in \cal S \ do:$
* $\qquad\qquad$ $for \ a \in \cal A(s) \ do:$
* $\qquad\qquad\qquad$ $q_{\pi_k}(s, a) \leftarrow \sum_{r \in \cal R(s'|s, a)} p(r|s, a) r + \gamma \sum_{s' \in \cal S} p(s'|s, a) v_{\pi_k}(s')$
* $\qquad\qquad$ $end \ for$
* $\qquad\qquad$ $a_k^*(s) \leftarrow \arg\max_a q_k(s, a)$
* $\qquad\qquad$ $\pi_{k+1}(a|s) = 1 \ if \ a_k^*(s)=a, \ else \ 0$
* $\qquad$ $end \ for$
* $end \ while$

In [1]:
import time

import numpy as np
import gymnasium as gym

In [None]:
from hmac import new


class TruncatedPolicyIteration:
    """ Truncated Policy Iteration Algorithm for Freezing-Lake environment """

    def __init__(self, env, gamma=0.95, delta=1e-6, max_iterations=None):
        self.env = env
        self.gamma = gamma
        self.delta = delta
        self.max_iterations = max_iterations

        self.values = np.zeros(self.env.observation_space.n)  # Value function initialization
        self.policy = np.zeros([self.env.observation_space.n, self.env.action_space.n])  # Policy initialization (uniform random policy)

    def policy_evaluation(self):
        """ Policy Evaluation Step """
        steps = 0
        delta = float('inf')
        while True:
            delta = 0
            for state in range(self.env.observation_space.n):
                value = 0  # Temporary values for each state
                for action, action_prob in enumerate(self.policy[state]):
                    for prob, next_state, reward, done in self.env.unwrapped.P[state][action]:
                        value += action_prob * prob * (reward + self.gamma * self.values[next_state] * (1 - done))  # p(r|s,a) & p(s'|s,a) -> prob
                delta = max(delta, np.abs(self.values[state] - value))
                self.values[state] = value
            if self.max_iterations is not None and steps >= self.max_iterations: break
            if delta < self.delta: break
            steps += 1
        print("Policy Evaluation Steps:", steps)  # Print number of steps taken for policy evaluation

    def policy_improvement(self):
        """ Policy Improvement Step """
        policy = np.zeros([self.env.observation_space.n, self.env.action_space.n])
        for state in range(self.env.observation_space.n):
            q_values = np.zeros(self.env.action_space.n)  # Temporary values for each state
            for action in range(self.env.action_space.n):
                for prob, next_state, reward, done in self.env.unwrapped.P[state][action]:  # p(s'|s,a) & r(s,a,s') -> prob
                    q_values[action] += prob * (reward + self.gamma * self.values[next_state] * (1 - done))
            best_action = np.argmax(q_values)  # Best action in this state according to current policy values
            policy[state][best_action] = 1.0  # Set policy to take best action in this state

        self.policy = policy
        print("Policy after improvement Done.")


    def run(self):
        """ Run the Policy Iteration Algorithm """
        while True:
            self.policy_evaluation()
            old_policy = np.copy(self.policy)
            self.policy_improvement()
            new_policy = np.copy(self.policy)

            if np.max(np.abs(old_policy - new_policy).flatten()) < self.delta:
                break

    def visualize_policy(self, delay=0.5):
        """ Visualize the policy in the environment """
        state, info = self.env.reset()
        done = False

        while not done:
            self.env.render()
            action = np.argmax(self.policy[state])
            state, reward, terminated, truncated, info = self.env.step(action)
            done = terminated or truncated
            time.sleep(delay)

        self.env.render()
        self.env.close()  # Close the environment after visualization is complete

In [3]:
env = gym.make('FrozenLake-v1', desc=None, map_name='8x8', is_slippery=True, render_mode='human')
env.reset()

agent = TruncatedPolicyIteration(env, gamma=0.99, delta=1e-6, max_iterations=20)
agent.run()
print(f"Optimal Policy: {agent.policy}")
print(f"Optimal Value Function: {agent.values}")

Policy Evaluation Steps: 0
Policy after improvement Done.
Policy Evaluation Steps: 17
Policy after improvement Done.
Policy Evaluation Steps: 20
Policy after improvement Done.
Policy Evaluation Steps: 20
Policy after improvement Done.
Policy Evaluation Steps: 20
Policy after improvement Done.
Policy Evaluation Steps: 20
Policy after improvement Done.
Policy Evaluation Steps: 20
Policy after improvement Done.
Policy Evaluation Steps: 20
Policy after improvement Done.
Policy Evaluation Steps: 20
Policy after improvement Done.
Policy Evaluation Steps: 20
Policy after improvement Done.
Policy Evaluation Steps: 20
Policy after improvement Done.
Optimal Policy: [[0. 0. 0. 1.]
 [0. 0. 1. 0.]
 [0. 0. 1. 0.]
 [0. 0. 1. 0.]
 [0. 0. 1. 0.]
 [0. 0. 1. 0.]
 [0. 0. 1. 0.]
 [0. 0. 1. 0.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]
 [0. 0. 1. 0.]
 [0. 0. 1. 0.]
 [0. 1. 0. 0.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [0. 0. 0. 1.]
 [0. 0

In [4]:
agent.visualize_policy(delay=0.005)