# 強化学習の実験に便利なコード

タイトルの通りです。よく使う関数をまとめます。

## 無限ホライゾン

### マルコフ決定過程の生成

[強化学習の青本](https://amzn.asia/d/2epmlxT)に従ったMDPの定義用のコードです。
MDPを次で定義します。

1. 有限状態集合: $S=\{1, \dots, |S|\}$
2. 有限行動集合: $A=\{1, \dots, |A|\}$
3. 遷移確率行列: $P\in \mathbb{R}^{SA\times S}$
4. 報酬行列: $r\in \mathbb{R}^{S\times A}$
5. 割引率: $\gamma \in [0, 1)$
6. 初期状態: $\mu \in \mathbb{R}^{S}$

In [4]:
import numpy as np
from typing import NamedTuple, Optional


S = 10  # 状態集合のサイズ
A = 3  # 行動集合のサイズ
S_set = np.arange(S)  # 状態集合
A_set = np.arange(A)  # 行動集合
gamma = 0.8  # 割引率

# 報酬行列を適当に作ります
rew = np.random.rand(S, A)
assert rew.shape == (S, A)

# 遷移確率行列を適当に作ります
P = np.random.rand(S*A, S)
P = P / np.sum(P, axis=-1, keepdims=True)  # 正規化して確率にします
P = P.reshape(S, A, S)
np.testing.assert_almost_equal(P.sum(axis=-1), 1)  # ちゃんと確率行列になっているか確認します

# 初期状態分布を適当に作ります
mu = np.random.rand(S)
mu = mu / np.sum(mu)

# 状態集合, 行動集合, 割引率, 報酬行列, 遷移確率行列が準備できたのでMDPのクラスを作ります

class MDP(NamedTuple):
    S_set: np.array  # 状態集合
    A_set: np.array  # 行動集合
    gamma: float  # 割引率
    H: int  # エフェクティブホライゾン
    rew: np.array  # 報酬行列
    P: np.array  # 遷移確率行列
    mu: np.array  # 初期分布
    optimal_Q: Optional[np.ndarray] = None  # 最適Q値

    @property
    def S(self) -> int:  # 状態空間のサイズ
        return len(self.S_set)

    @property
    def A(self) -> int:  # 行動空間のサイズ
        return len(self.A_set)


H = int(1 / (1 - gamma))
mdp = MDP(S_set, A_set, gamma, H, rew, P, mu)

print("状態数：", mdp.S)
print("行動数：", mdp.A)
print("割引率：", mdp.gamma)
print("ホライゾン：", mdp.H)

状態数： 10
行動数： 3
割引率： 0.8
ホライゾン： 5


### 動的計画法

参考

* [Safe Policy Iteration](http://proceedings.mlr.press/v28/pirotta13.pdf)の２ページ目

**表記**

* 内積の記法: $f_1, f_2 \in \mathbb{R}^{S\times A}$に対して、$\langle f_1, f_2 \rangle = (\sum_{a\in A} f_1(s, a)f_2(s, a))_s \in \mathbb{R}^S$とします。これは方策についての和を省略するときなどに便利です。例えば$\langle \pi, q_\pi\rangle = v_\pi$です。
* 方策行列（$\Pi^\pi \in \mathbb{R}^{S\times SA}$）：$\langle \pi, q\rangle$を行列で書きたいときに便利。
    * $\Pi^\pi(s,(s, a))=\pi(a \mid s)$ 
    * $\Pi^\pi q^\pi = \langle \pi, q^\pi \rangle = v^\pi$が成立。
* 遷移確率行列１（$P^\pi \in \mathbb{R}^{SA\times SA}$）: 次の状態についての方策の情報を追加したやつ。
    * $P^\pi = P \Pi^\pi$
    * Q値を使ったベルマン期待作用素とかで便利。$q^\pi = r + \gamma P^\pi q^\pi$が成立。
    * $(I - \gamma P^\pi)^{-1}r = q^\pi$が成立する。
* 遷移確率行列２（$\bar{P}^\pi \in \mathbb{R}^{S\times S}$）: 方策$\pi$のもとでの状態遷移の行列。
    * $\bar{P}^\pi = \Pi^\pi P$
    * V値を使ったベルマン期待作用素とかで便利。$v^\pi = \Pi^\pi r + \gamma \bar{P}^\pi v^\pi$。
    * $(I - \gamma \bar{P}^\pi)^{-1}\Pi^\pi r = v^\pi$が成立する。
* 割引訪問頻度１（$d^\pi_\mu \in \mathbb{R}^{SA}$）：S, Aについての割引累積訪問頻度
    * $d^\pi_\mu = \mu \Pi^\pi (I - \gamma P^\pi)^{-1} = \mu (I - \gamma \bar{P}^\pi)^{-1} \Pi^\pi$
    * ${d}^\pi_\mu (s, a) = \pi(a|s) \sum_{s_0} \mu(s_0) \sum_{t=0}^\infty \mathrm{Pr}\left(S_t=s|S_0=s_0, M(\pi)\right)$が成立する。
* 割引訪問頻度２（$\bar{d}^\pi_\mu \in \mathbb{R}^{S}$）：Sについての割引累積訪問頻度
    * $\bar{d}^\pi_\mu = \mu (I - \gamma \bar{P}^\pi)^{-1}$
    * $\bar{d}^\pi_\mu (s) = \sum_{s_0} \mu(s_0) \sum_{t=0}^\infty \mathrm{Pr}\left(S_t=s|S_0=s_0, M(\pi)\right)$が成立する。


**実装した関数**

* ``compute_greedy_policy``: Q関数 ($S \times A \to \mathcal{R}$) の貪欲方策を返します
* ``compute_optimal_Q``: MDPの最適Q関数 $q_* : S \times A \to \mathcal{R}$ を返します。
* ``compute_policy_Q``: 方策 $\pi$ のQ関数 $q_\pi : S \times A \to \mathcal{R}$ を返します。
* ``compute_policy_matrix``: 方策$\pi$の行列${\Pi}^{\pi}$を返します。
* ``compute_policy_visit``: 方策 $\pi$ の割引訪問頻度２$\bar{d}^\pi_\mu \in \mathbb{R}^{S}$ を返します。

In [5]:
import jax
import jax.numpy as jnp
import numpy as np
from functools import partial


@jax.jit
def compute_greedy_policy(Q: np.ndarray):
    """Q関数の貪欲方策を返します

    Args:
        Q (np.ndarray): (SxA)の行列

    Returns:
        greedy_policy (np.ndarray): (SxA)の行列
    """
    greedy_policy = jnp.zeros_like(Q)
    S, A = Q.shape
    greedy_policy = greedy_policy.at[jnp.arange(S), Q.argmax(axis=1)].set(1)
    assert greedy_policy.shape == (S, A)
    return greedy_policy


@partial(jax.jit, static_argnames=("S", "A"))
def _compute_optimal_Q(mdp: MDP, S: int, A: int):
    """MDPについて、ベルマン最適作用素を複数回走らせて最適価値関数を動的計画法で計算します。
    Args:
        mdp (MDP)

    Returns:
        optimal_Q (np.ndarray): (SxA)の行列
    """

    def backup(optimal_Q):
        next_v = mdp.P @ optimal_Q.max(axis=1)
        assert next_v.shape == (S, A)
        return mdp.rew + mdp.gamma * next_v
    
    optimal_Q = jnp.zeros((S, A))
    body_fn = lambda i, Q: backup(Q)
    return jax.lax.fori_loop(0, mdp.H + 100, body_fn, optimal_Q)

compute_optimal_Q = lambda mdp: _compute_optimal_Q(mdp, mdp.S, mdp.A)


@jax.jit
def compute_policy_Q(mdp: MDP, policy: np.ndarray):
    """MDPと方策について、ベルマン期待作用素を複数回走らせて価値関数を動的計画法で計算します。
    Args:
        mdp (MDP)
        policy (np.ndarray): (SxA)の行列

    Returns:
        optimal_Q (np.ndarray): (SxA)の行列
    """
    S, A = policy.shape

    def backup(policy_Q):
        max_Q = (policy * policy_Q).sum(axis=1)
        next_v = mdp.P @ max_Q
        assert next_v.shape == (S, A)
        return mdp.rew + mdp.gamma * next_v
    
    policy_Q = jnp.zeros((S, A))
    body_fn = lambda i, Q: backup(Q)
    return jax.lax.fori_loop(0, mdp.H + 100, body_fn, policy_Q)


@jax.jit
def compute_policy_matrix(policy: np.ndarray):
    """
    上で定義した方策行列を計算します。方策についての内積が取りたいときに便利です。
    Args:
        policy (np.ndarray): (SxA)の行列

    Returns:
        policy_matrix (np.ndarray): (SxSA)の行列
    """
    S, A = policy.shape
    PI = policy.reshape(1, S, A)
    PI = jnp.tile(PI, (S, 1, 1))
    eyes = jnp.eye(S).reshape(S, S, 1)
    PI = (eyes * PI).reshape(S, S*A)
    return PI


@jax.jit
def compute_policy_visit(mdp: MDP, policy: np.ndarray, init_dist: np.ndarray):
    """MDPと方策について、割引訪問頻度を動的計画法で計算します。
    Args:
        mdp (MDP)
        policy (np.ndarray): (SxA)の行列
        init_dist (np.ndarray): (S) 初期状態の分布

    Returns:
        visit (np.ndarray): (S)のベクトル
    """
    S, A = policy.shape
    Pi = compute_policy_matrix(policy)
    PiP = Pi @ mdp.P.reshape(S*A, S) 

    def backup(visit):
        next_visit = mdp.gamma * visit @ PiP
        return init_dist + next_visit
    
    body_fn = lambda i, visit: backup(visit)
    visit = jax.lax.fori_loop(0, mdp.H + 100, body_fn, init_dist)
    return visit


# 動的計画法による最適価値関数
optimal_Q_DP = compute_optimal_Q(mdp)
optimal_V_DP = optimal_Q_DP.max(axis=1)
optimal_policy = compute_greedy_policy(optimal_Q_DP)


# 逆行列による解法 Q
Pi = compute_policy_matrix(optimal_policy)
PPi = mdp.P.reshape(S*A, S) @ Pi
optimal_Q_inv = np.linalg.inv(np.eye(S*A) - mdp.gamma * PPi) @ mdp.rew.reshape(S*A)
print("Qベースの動的計画法と逆行列の解の差：", np.abs(optimal_Q_inv - optimal_Q_DP.reshape(-1)).max())


# 逆行列による解法 V
Pi = compute_policy_matrix(optimal_policy)
PiP = Pi @ mdp.P.reshape(S*A, S) 
Pirew = Pi @ mdp.rew.reshape(S*A)
optimal_V_inv = np.linalg.inv(np.eye(S) - mdp.gamma * PiP) @ Pirew
print("Vベースの動的計画法と逆行列の解の差：", np.abs(optimal_V_inv - optimal_V_DP.reshape(-1)).max())


# 割引訪問頻度の計算
d_pi_DP = compute_policy_visit(mdp, optimal_policy, mdp.mu)
d_pi_inv = mdp.mu @ np.linalg.inv(np.eye(S) - mdp.gamma * PiP)
print("動的計画法で計算した割引訪問頻度と逆行列の解の差", np.abs(d_pi_DP - d_pi_inv).max())
optimal_return_DP = mdp.mu @ optimal_V_DP
optimal_return_visit = d_pi_inv @ Pirew
print("割引訪問頻度で計算した期待リターンと動的計画法の解の差", np.abs(optimal_return_DP - optimal_return_visit).max())


d_pi_inv_SA1 = mdp.mu @ np.linalg.inv(np.eye(S) - mdp.gamma * PiP) @ Pi
d_pi_inv_SA2 = mdp.mu @ Pi @ np.linalg.inv(np.eye(S*A) - mdp.gamma * PPi)

print("SAについての割引訪問頻度の求め方２つの差：", np.abs(d_pi_inv_SA1 - d_pi_inv_SA2).max())

No GPU/TPU found, falling back to CPU. (Set TF_CPP_MIN_LOG_LEVEL=0 and rerun for more info.)


Qベースの動的計画法と逆行列の解の差： 4.7683716e-07
Vベースの動的計画法と逆行列の解の差： 4.7683716e-07
動的計画法で計算した割引訪問頻度と逆行列の解の差 1.1920929e-07
割引訪問頻度で計算した期待リターンと動的計画法の解の差 2.3841858e-07
SAについての割引訪問頻度の求め方２つの差： 5.9604645e-08


## 有限ホライゾン

### マルコフ決定過程の生成

**参考**

有限MDPの定義については[Reinforcement Learning: Theory and Algorithms](https://rltheorybook.github.io/)の1.2章を参照しています。
有限ホライゾンの場合、遷移行列や報酬関数が各ステップで変わる設定を考えます。

1. 有限状態集合: $S=\{1, \dots, |S|\}$
2. 有限行動集合: $A=\{1, \dots, |A|\}$
3. $h$ステップ目の遷移確率行列: $P_h\in \mathbb{R}^{SA\times S}$
4. $h$ステップ目の報酬行列: $r_h\in \mathbb{R}^{S\times A}$
5. ホライゾン: $H$
6. 初期状態: $\mu \in \mathbb{R}^{S}$

In [6]:
import numpy as np
from typing import NamedTuple, Optional


S = 10  # 状態集合のサイズ
A = 3  # 行動集合のサイズ
S_set = np.arange(S)  # 状態集合
A_set = np.arange(A)  # 行動集合
H = 5  # ホライゾン

# 報酬行列を適当に作ります
rew = np.random.rand(H, S, A)
assert rew.shape == (H, S, A)

# 遷移確率行列を適当に作ります
P = np.random.rand(H, S*A, S)
P = P / np.sum(P, axis=-1, keepdims=True)  # 正規化して確率にします
P = P.reshape(H, S, A, S)
np.testing.assert_almost_equal(P.sum(axis=-1), 1)  # ちゃんと確率行列になっているか確認します

# 初期状態分布を適当に作ります
mu = np.random.rand(S)
mu = mu / np.sum(mu)

# 状態集合, 行動集合, 割引率, 報酬行列, 遷移確率行列が準備できたのでMDPのクラスを作ります

class MDP(NamedTuple):
    S_set: np.array  # 状態集合
    A_set: np.array  # 行動集合
    H: int  # ホライゾン
    rew: np.array  # 報酬行列
    P: np.array  # 遷移確率行列
    mu: np.array  # 初期分布
    optimal_Q: Optional[np.ndarray] = None  # 最適Q値

    @property
    def S(self) -> int:  # 状態空間のサイズ
        return len(self.S_set)

    @property
    def A(self) -> int:  # 行動空間のサイズ
        return len(self.A_set)


mdp = MDP(S_set, A_set, H, rew, P, mu)

print("状態数：", mdp.S)
print("行動数：", mdp.A)
print("ホライゾン：", mdp.H)

状態数： 10
行動数： 3
ホライゾン： 5


### 動的計画法

**表記**

* ステップ$h$の方策行列（$\Pi_h^\pi \in \mathbb{R}^{S\times SA}$）：$\langle \pi_h, q\rangle$を行列で書きたいときに便利。
    * $\Pi_h^\pi(s,(s, a))=\pi_h(a \mid s)$ 
    * $\Pi_h^\pi q_h^\pi = \langle \pi, q_h^\pi \rangle = v_h^\pi$が成立。
* ステップ$h$の遷移確率行列１（$P_h^\pi \in \mathbb{R}^{SA\times SA}$）: 次の状態についての方策の情報を追加したやつ。
    * $P_h^\pi = P_h \Pi_h^\pi$
    * Q値を使ったベルマン期待作用素とかで便利。$q_h^\pi = r_h + P_h^\pi q^\pi$が成立。
* ステップ$h$の遷移確率行列２（$\bar{P}_h^\pi \in \mathbb{R}^{S\times S}$）: 方策$\pi$のもとでの状態遷移の行列。
    * $\bar{P}_h^\pi = \Pi_h^\pi P_h$
    * V値を使ったベルマン期待作用素とかで便利。$v_h^\pi = \Pi^\pi r_h + \gamma \bar{P}_h^\pi v^\pi$。

TODO: 有限ホライゾンのときは訪問分布の扱いがよくわかんないな。時間非定常のときはリターンの計算には使えない？


**実装した関数**

* ``compute_greedy_policy``: Q関数 ($H\times S \times A \to \mathcal{R}$) の貪欲方策を返します
* ``compute_optimal_Q``: MDPの最適Q関数 $q_* : H\times S \times A \to \mathcal{R}$ を返します。
* ``compute_policy_Q``: 方策 $\pi$ のQ関数 $q_\pi : H\times S \times A \to \mathcal{R}$ を返します。
* ``compute_policy_matrix``: 方策$\pi$の行列${\Pi}^{\pi} : H \times S \times SA$を返します。

In [60]:
from functools import partial
import jax


@jax.jit
def compute_greedy_policy(Q: np.ndarray):
    """Q関数の貪欲方策を返します

    Args:
        Q (np.ndarray): (HxSxA)の行列

    Returns:
        greedy_policy (np.ndarray): (HxSxA)の行列
    """
    greedy_policy = jnp.zeros_like(Q)
    H, S, A = Q.shape
    
    def body_fn(i, greedy_policy):
        greedy_policy = greedy_policy.at[i, jnp.arange(S), Q[i].argmax(axis=-1)].set(1)
        return greedy_policy

    greedy_policy = jax.lax.fori_loop(0, H, body_fn, greedy_policy)
    assert greedy_policy.shape == (H, S, A)
    return greedy_policy


@partial(jax.jit, static_argnames=("H", "S", "A"))
def _compute_optimal_Q(mdp: MDP, H: int, S: int, A: int):
    """ベルマン最適作用素をホライゾン回走らせて最適価値関数を動的計画法で計算します。
    Args:
        mdp (MDP)

    Returns:
        optimal_Q (np.ndarray): (HxSxA)の行列
    """

    def backup(i, optimal_Q):
        h = H - i - 1
        max_Q = optimal_Q[h].max(axis=1)
        next_v = mdp.P[h] @ max_Q
        assert next_v.shape == (S, A)
        optimal_Q = optimal_Q.at[h-1].set(mdp.rew[h] + next_v)
        return optimal_Q
    
    optimal_Q = jnp.zeros((H, S, A))
    return jax.lax.fori_loop(0, mdp.H, backup, optimal_Q)

compute_optimal_Q = lambda mdp: _compute_optimal_Q(mdp, mdp.H, mdp.S, mdp.A)


@jax.jit
def compute_policy_Q(mdp: MDP, policy: np.ndarray):
    """ベルマン期待作用素をホライゾン回走らせて価値関数を動的計画法で計算します。
    Args:
        mdp (MDP)
        policy (np.ndarray): (HxSxA)の行列

    Returns:
        optimal_Q (np.ndarray): (HxSxA)の行列
    """
    H, S, A = policy.shape

    def backup(i, policy_Q):
        h = H - i - 1
        max_Q = (policy[h] * policy_Q[h]).sum(axis=1)
        next_v = mdp.P[h] @ max_Q
        assert next_v.shape == (S, A)
        policy_Q = policy_Q.at[h-1].set(mdp.rew[h] + next_v)
        return policy_Q
    
    policy_Q = jnp.zeros((H, S, A))
    return jax.lax.fori_loop(0, mdp.H, backup, policy_Q)


@jax.jit
def compute_policy_matrix(policy: np.ndarray):
    """
    上で定義した方策行列を計算します。方策についての内積が取りたいときに便利です。
    Args:
        policy (np.ndarray): (HxSxA)の行列

    Returns:
        policy_matrix (np.ndarray): (HxSxSA)の行列
    """
    H, S, A = policy.shape
    PI = policy.reshape(H, 1, S, A)
    PI = jnp.tile(PI, (1, S, 1, 1))
    eyes = jnp.tile(jnp.eye(S).reshape(1, S, S, 1), (H, 1, 1, 1))
    PI = (eyes * PI).reshape(H, S, S*A)
    return PI


# 動的計画法による最適価値関数
optimal_Q_DP = compute_optimal_Q(mdp)
optimal_V_DP = optimal_Q_DP.max(axis=-1)
optimal_policy = compute_greedy_policy(optimal_Q_DP)
optimal_policy_Q_DP = compute_policy_Q(mdp, optimal_policy)
print("最適価値関数と最適方策の価値関数の差", np.abs(optimal_Q_DP - optimal_policy_Q_DP).max())

最適価値関数と最適方策の価値関数の差 0.0
