### ロバスト動的計画法の実装
* pythonで書いていきます〜



In [3]:
import numpy as np
import matplotlib.pyplot as plt


### ロバストVIアルゴリズム

**入力:**  
$V \in \mathcal{V}, \epsilon > 0$

**出力:**  
$\tilde{V}$ ただし、$\|\tilde{V} - V^{*}\| \leq \frac{\epsilon}{2}$

- 各 $s \in \mathcal{S}$ に対して、$\tilde{V}(s)$ を次のように設定する：
  $$
  \tilde{V}(s) = \sup_{a \in \mathcal{A}(s)} \left\{ \inf_{p \in \mathcal{P}(s, a)} \mathbf{E}^{p} \left[ r(r, a, s') + \lambda V(s') \right] \right\}
  $$

- while $\left( \|\tilde{V} - V\| \geq \frac{(1 - \lambda)}{\lambda} \cdot \epsilon \right)$ do:

  - $V = \tilde{V}$

  - $\forall s \in \mathcal{S}$, 次のように $\tilde{V}(s)$ を設定する：
    $$
    \tilde{V}(s) = \sup_{a \in \mathcal{A}(s)} \left\{ \inf_{p \in \mathcal{P}(s, a)} \mathbf{E}^{p} \left[ r(r, a, s') + \lambda V(s') \right] \right\}
    $$

- end while

**戻り値:**  
$\tilde{V}$


In [15]:

import numpy as np

def robust_value_iteration(initial_value_function, tolerance, states, actions, transition_probabilities, reward_function, discount_factor):
  """
  Args:
    initial_value_function: 初期価値関数 
    tolerance: 許容誤差 
    states: 状態空間 
    actions: 行動空間 
    transition_probabilities: 状態遷移確率の集合 
    reward_function: 報酬関数 
    discount_factor: 割引率
  Returns:
    V
  """

  value_function = initial_value_function.copy()
  updated_value_function = np.zeros_like(value_function)

  while True:
    for state in states:
      state_values = []
      for action in actions:
        # 各行動に対する最悪ケースの期待報酬を計算
        worst_case_expected_reward = min([
            np.dot(transition_probability, reward_function(state, action, states) + discount_factor * value_function)
            for transition_probability in transition_probabilities[(state, action)]
        ])
        state_values.append(worst_case_expected_reward)
      # 最適な行動を選択
      updated_value_function[state] = max(state_values)

    # 価値関数の更新量が許容誤差以下になったら終了
    if np.linalg.norm(updated_value_function - value_function) < tolerance:
      break

    # 価値関数を更新
    value_function = updated_value_function.copy()

  return value_function

# 状態空間と行動空間
states = [0, 1, 2]
actions = [0, 1]

# 遷移確率の集合 (例)
transition_probabilities = {
    (0, 0): [np.array([0.7, 0.3, 0.0]), np.array([0.8, 0.2, 0.0])],
    (0, 1): [np.array([0.2, 0.6, 0.2]), np.array([0.3, 0.5, 0.2])],
    (1, 0): [np.array([0.1, 0.8, 0.1]), np.array([0.0, 0.9, 0.1])],
    (1, 1): [np.array([0.0, 0.3, 0.7]), np.array([0.0, 0.2, 0.8])],
    (2, 0): [np.array([0.6, 0.0, 0.4]), np.array([0.5, 0.1, 0.4])],
    (2, 1): [np.array([0.3, 0.2, 0.5]), np.array([0.4, 0.1, 0.5])]
}

# 報酬関数
def reward_function(state, action, next_state):
  return 1.0 if next_state == 2 else -0.01

# 割引率
discount_factor = 0.9

# 初期価値関数と許容誤差
initial_value_function = np.zeros(len(states))
tolerance = 0.01

# ロバストな価値反復の実行
optimal_value_function = robust_value_iteration(
    initial_value_function, tolerance, states, actions, transition_probabilities, reward_function, discount_factor
)

print("VI:", optimal_value_function)

VI: [-0.0468559 -0.0468559 -0.0468559]


### ロバスト方策反復アルゴリズム

**入力:**  
決定規則 $d_{0}$、$\epsilon > 0$

**出力:**  
$\epsilon$-最適な決定規則 $d^{*}$

1. $n = 0$ および $\pi_{n} = (d_{n}, d_{n}, \ldots)$ を設定する。式 (31) を解いて $V^{\pi_{n}}$ を計算する。$\tilde{V} \leftarrow L_{\mathcal{D}} V^{\pi_{n}}$ とし、$\mathcal{D} = \prod_{s \in \mathcal{S}} \mathcal{A}(s)$ を設定する。

2. 各 $s \in \mathcal{S}$ に対して、次の条件を満たす $d_{n+1}(s)$ を選択する：
   $$
   d_{n+1}(s) \in \left\{ a \in \mathcal{A}(s) : \inf_{p \in \mathcal{P}(s, a)} \mathbf{E}^{p} \left[ r(s, a, s') + \lambda V(s') \right] \geq \tilde{V}(s) - \epsilon \right\}
   $$
   可能であれば、$d_{n+1}(s) = d_{n}(s)$ を設定する。

3. while $d_{n+1} \neq d_{n}$ do:
   - $n = n + 1$ とする。
   - 式 (31) を解いて $V^{\pi_{n}}$ を計算する。$\tilde{V} \leftarrow L_{\mathcal{D}} V^{\pi_{n}}$ とし、$\mathcal{D} = \prod_{s \in \mathcal{S}} \mathcal{A}(s)$ を設定する。
   - 各 $s \in \mathcal{S}$ に対して、次の条件を満たす $d_{n+1}(s)$ を選択する：
     $$
     d_{n+1}(s) \in \left\{ a \in \mathcal{A}(s) : \inf_{p \in \mathcal{P}(s, a)} \mathbf{E}^{p} \left[ r(s, a, s') + \lambda V(s') \right] \geq \tilde{V}(s) - \epsilon \right\}
     $$
     可能であれば、$d_{n+1}(s) = d_{n}(s)$ を設定する。

4. end while

**戻り値:**  
$d_{n+1}$


In [25]:
import numpy as np

def robust_policy_iteration(initial_decision_rule, tolerance, states, actions, transition_probabilities, reward_function, discount_factor):
  """


  Args:
    initial_decision_rule: 初期決定規則 (function: (状態) -> 行動)
        各状態に対して行動を決定する関数です。
    tolerance: 許容誤差 (float)
        決定規則の更新量がこの値以下になったらアルゴリズムを終了します。
    states: 状態空間 (list)
        問題におけるすべての状態のリストです。
    actions: 行動空間 (list)
        問題におけるすべての行動のリストです。
    transition_probabilities: 状態遷移確率の集合 (dictionary: (状態, 行動) -> list of numpy arrays)
        各状態-行動ペア (s, a) に対して、可能な遷移確率を表すnumpy配列のリストを値として持ちます。
        各numpy配列は、状態sで行動aを取ったときに、各次の状態に遷移する確率を表します。
    reward_function: 報酬関数 (function: (状態, 行動, 次の状態) -> float)
        現在の状態、行動、次の状態を引数に取り、その遷移で得られる報酬を返す関数です。
    discount_factor: 割引率 (float, 0 < discount_factor < 1)
        将来の報酬を現在価値に割り引くための係数です。

  Returns:
    ε-最適な決定規則 (function: (状態) -> 行動)
  """

  # 決定規則を初期化
  decision_rule = initial_decision_rule

  # 反復回数
  iteration_count = 0

  while True:
    # 反復回数をインクリメント
    iteration_count += 1

    # 現在の決定規則に基づいて、定常方策πnを作成
    def policy(state):
      return decision_rule(state)

    # 方策評価: 式(31)を解いて、現在のポリシーの価値関数を計算
    value_function = evaluate_policy(policy, tolerance, states, actions, transition_probabilities, reward_function, discount_factor)

    # 方策改善: 各状態において、ε-greedyに新たな決定規則を計算
    updated_decision_rule = improve_policy(value_function, tolerance, states, actions, transition_probabilities, reward_function, discount_factor)

    # 決定規則が更新されなくなったら終了
    if is_decision_rule_converged(updated_decision_rule, decision_rule, states):
      break

    # 決定規則を更新
    decision_rule = updated_decision_rule

    # デバッグ: 反復回数と決定規則を出力
    print(f"Iteration: {iteration_count}, Decision Rule: {decision_rule}")

  # 収束した決定規則を返す
  return decision_rule


def evaluate_policy(policy, tolerance, states, actions, transition_probabilities, reward_function, discount_factor):
  """

  Args:
    policy: 方策 (function: (状態) -> 行動)
    tolerance: 許容誤差 (float)
    states: 状態空間 (list)
    actions: 行動空間 (list)
    transition_probabilities: 状態遷移確率の集合 (dictionary: (状態, 行動) -> list of numpy arrays)
    reward_function: 報酬関数 (function: (状態, 行動, 次の状態) -> float)
    discount_factor: 割引率 (float)

  Returns:
    価値関数 (numpy array, shape=(状態数,))
  """

  value_function = np.zeros(len(states))
  while True:
    updated_value_function = np.zeros_like(value_function)
    for state in states:
      action = policy(state)
      # 期待報酬を計算 (最悪ケースの遷移確率を選択)
      expected_reward = min([
        np.dot(transition_probability, reward_function(state, action, states) + discount_factor * value_function)
        for transition_probability in transition_probabilities[(state, action)]
      ])
      updated_value_function[state] = expected_reward
    if np.linalg.norm(updated_value_function - value_function) < tolerance:
      break
    value_function = updated_value_function.copy()
  return value_function


def improve_policy(value_function, tolerance, states, actions, transition_probabilities, reward_function, discount_factor):
  """

  Args:
    value_function: 価値関数 (numpy array, shape=(状態数,))
    tolerance: 許容誤差 (float)
    states: 状態空間 (list)
    actions: 行動空間 (list)
    transition_probabilities: 状態遷移確率の集合 (dictionary: (状態, 行動) -> list of numpy arrays)
    reward_function: 報酬関数 (function: (状態, 行動, 次の状態) -> float)
    discount_factor: 割引率 (float)

  Returns:
    更新された決定規則 (function: (状態) -> 行動)
  """

  def updated_decision_rule(state):
    best_action = None
    best_value = float('-inf')
    for action in actions:
      # 各行動に対する最悪ケースの期待報酬を計算
      worst_case_expected_reward = min([
        np.dot(transition_probability, reward_function(state, action, states) + discount_factor * value_function)
        for transition_probability in transition_probabilities[(state, action)]
      ])
      if worst_case_expected_reward > best_value + tolerance:
        best_action = action
        best_value = worst_case_expected_reward
    return best_action
  return updated_decision_rule


def is_decision_rule_converged(decision_rule1, decision_rule2, states):
  """
  Args:
    decision_rule1: 決定規則1 (function: (状態) -> 行動)
    decision_rule2: 決定規則2 (function: (状態) -> 行動)
    states: 状態空間 (list)

  Returns:
    収束している場合はTrue、そうでない場合はFalse
  """
  for state in states:
    if decision_rule1(state) != decision_rule2(state):
      return False
  return True


  # 状態空間と行動空間
states = [0, 1, 2]
actions = [0, 1]

# 遷移確率の集合 (例)
transition_probabilities = {
    (0, 0): [np.array([0.7, 0.3, 0.0]), np.array([0.8, 0.2, 0.0])],
    (0, 1): [np.array([0.2, 0.6, 0.2]), np.array([0.3, 0.5, 0.2])],
    (1, 0): [np.array([0.1, 0.8, 0.1]), np.array([0.0, 0.9, 0.1])],
    (1, 1): [np.array([0.0, 0.3, 0.7]), np.array([0.0, 0.2, 0.8])],
    (2, 0): [np.array([0.6, 0.0, 0.4]), np.array([0.5, 0.1, 0.4])],
    (2, 1): [np.array([0.3, 0.2, 0.5]), np.array([0.4, 0.1, 0.5])]
}

# 報酬関数 
def reward_function(state, action, next_state):
  return 1.0 if next_state == 2 else 0.0

# 割引率
discount_factor = 0.9

# 初期決定規則と許容誤差
def initial_decision_rule(state):
  return 0  # すべての状態に対して行動0を選択

tolerance = 0.01

# ロバストな方策反復の実行
optimal_decision_rule = robust_policy_iteration(
    initial_decision_rule, tolerance, states, actions, transition_probabilities, reward_function, discount_factor
)

print("ε-最適な決定規則:", optimal_decision_rule(1))

ε-最適な決定規則: 0
