# Pythonで学ぶ強化学習　ハンズオン

## Day1 （強化学習の位置付けを知る）

### 1. 強化学習は学習方法の一種（抜粋）
- 教師有り学習
    - データと正解ラベルをセットで与える。　データが与えられたらラベルが出力されるようパラメータを調整。
- 教師なし学習
    - データのみを与えてデータの特徴（構造や表現）を抽出できるようパラメータを調整。
- 強化学種
    - 行動により報酬が得られる環境（タスク）を与えて、各状態で報酬に繋がる行動が出力される行動が出力されるようモデルのパラメータを調整。

### 2. 強化学習における問題設定: Markov Deciosion Process

- 強化学習における「環境」は以下の要素からなる**マルコフ決定過程（Markov Deciosion Process）**からなる。
    - $s \in \mathcal{S}$ : **状態 （state）**
    - $a \in \mathcal{A}$ : **行動 （action）**
    - $T(s'\mid s, a): \mathcal{S} \times \mathcal{A} \times \mathcal{S} \rightarrow \mathbb{R}^{+}$ : 状態遷移の確率（**遷移関数 / Transition function**）。状態$s$と行動$a$を引数に、遷移先$s^{\prime}$（次の状態）と遷移確率を出力する関数。
    - $R(s, s^{\prime}): \mathcal{S} \times \mathcal{A} \rightarrow \mathbb{R}$ : **報酬関数 （Reward function）**。状態$s$と遷移先$s^{\prime}$を引数に、報酬を出力する関数（行動$a$を引数に取ることもある）。    
    
        - メモ: $\mathcal{S}, \mathcal{A}$ は離散集合、連続集合どちらでもOK
          - 離散か連続かで、適用できる強化学習アルゴリズムは変わってくる.
          - 離散でも, 要素数が多い場合は連続と思って扱うことが多い.
          

- また、強化学習における用語を以下のように定める。
    - **エピソード （Episode）**: 環境の開始から終了までの期間。
    
    - **エージェント（Agent）**: 戦略$\pi$に従って行動する主体。（強化学習では戦略がモデルとなり、パラメーターを調節して適切な行動を出力する）
    
    - $\pi: \mathcal{S} \rightarrow \mathcal{A}$: **戦略（policy）**。状態$s$を受け取り、行動$a$を決める関数。
    
    - $r:= R(s, s^{\prime}) \in \mathbb{R}$ : **即時報酬（Immediate reward）**。 MDPにおける報酬。
    
    - $G_{t}\in \mathbb{R}$ : **期待報酬（Expected reward）** / **価値（Value）**。 報酬の総和を表す。
    
    - $\gamma \in (0, 1]$ :  **割引率 （Discount factor）**。 将来の報酬の影響をコントロールする変数。
    
    - **価値評価（Value apploximation）**: 価値を算出すること。

- MDPにおける時刻$t$ から時刻$T$ までの「報酬の総和」は割引率 $\gamma$を用いて以下のように書ける。

$$G_{t} := r_{t+1} + \gamma r_{t+2} + \gamma^{2}r_{t+2} + \cdots + \gamma^{T-t-1}r_{tT} = \sum_{k=0}^{T-t+1}\gamma^{k}r_{t+k+1}$$

- 価値$G_{t}$は時刻$t+1$での価値を用いて、再帰的に以下のように書ける。

$$G_{t} = r_{t+1} + \gamma G_{t+1}$$

- 方策$\pi$ と状態の初期値$s_0$ が与えられると、以下に示す状態と行動、報酬の確率過程が得られる。

$$
S_0, A_0, R_0, S_1, A_1, R_1, \cdots
$$


- エージェントの目的は、（割引）累積価値を最大化すること。すなわち、以下の値を最大化する$\pi$を見つけることである。
$$\mathbb{E}^\pi[G_{0}] = \mathbb{E}^\pi\left[\sum_{t=0}^{T-1} \gamma^t r_t \right] = \mathbb{E}^\pi\left[\sum_{t=0}^{T-1} \gamma^t  R(s_{t}, s_{t+1}) \right]$$

### 3. Example(迷路)

簡単な迷路における、MDPの構成要素は以下である。

- $s$ 状態 : セルの位置（行/列）
- $a$ 行動 : 上下左右への移動
- $T$ 遷移関数: 状態と行動を受け取り、移動可能なセルとそこへの遷移確率を返す関数
- $R$ 報酬関数: 状態を受け取り、緑(ゴール)のセルなら1、赤(ペナルティ)のセルなら-1を返す関数
    - メモ: 迷路における報酬関数の設定は複数ある
        - ゴール地点で+1, 進んではいけない地点で-1, それ以外は0
        - 常に-1, 進んではいけない地点で-M （M >> 0）
        - etc.

    

1. 位置クラス(状態$s$)、上下移動クラス（行動$a$）の実装

In [1]:
from enum import Enum

# セル位置表現クラス
class State:
    def __init__(self, row: int=-1, column: int=-1):
        self.row = row
        self.column = column
    
    def __repr__(self):
        return f"State: [{self.row}, {self.column}]"
    
    def clone(self):
        return State(self.row, self.column)
    
    def __hash__(self):
        return hash((self.row, self.column))
    
    def __eq__(self, other):
        return self.row == other.row and self.column == other.column
    
# アクション定義クラス
class Action(Enum):
    UP = 1
    DOWN = -1
    LEFT = 2
    RIGHT = -2

In [2]:
State(3, 5)

State: [3, 5]

In [3]:
Action.UP

<Action.UP: 1>

2. 環境実態クラス(雛形)を実装

In [4]:
from typing import List, Dict
import numpy as np

# 迷路環境実態クラス
class Environment:
    def __init__(self, grid: List[List[int]], move_prob: float=0):
        """
        エージェントがゴールを早く目指すように、デフォルトの報酬を負値に定義。
        エージェントは選択した向きにmove_probの確率で移動し、(1 - move_prob)の確率で他の方向に進む。
        """
        # grid is 2d-Array. Its values are treated as an attribute.
        # kinds of attributes is following
        # 0: ordinary cell
        # -1: damage cell (game end)
        # 1: reward cepp (game end)
        # 9: block cell (can't locate agent)
        self.grid = grid
        self.agent_state = State()
        self.default_reward = -0.04
        self.move_prob = move_prob
        self.reset()
        
    @property
    def row_length(self) -> int:
        return len(self.grid)
    
    @property
    def column_length(self) -> int:
        return len(self.grid[0])
    
    @property
    def actions(self) -> List[Action]:
        return [Action.UP, Action.DOWN, Action.LEFT, Action.RIGHT]
    
    @property
    def states(self) -> List[State]:
        """
        迷路内の移動可能なセルを返す。(ブロックセルを除外)
        """
        states = []
        for row in range(self.row_length):
            for col in range(self.column_lengthh):
                if self.grid[row][col] != 9:
                    states.append(State(row, col))
        return states

3. 遷移関数$T$と報酬関数$R$の実装

- 遷移関数$T$

    - 今回のtransition_funcの実装では、actionを受け取った場合、「**非決定的**」に遷移先が決定される。
    - 迷路の外への移動が提案された場合、その確率でその場にとどまるよう実装されている。

In [5]:
class Environment(Environment):
    def transition_func(self, state: State, action: Action) -> Dict[State, float]:
        """
        状態とアクションを受けとり、次の状態への遷移確率を返す。
        今回の迷路では、move_probの確率で選択した方向に、
        (1-move_prob)の確率で、選択した方向との反対以外の方向に等確率で遷移する。
        """
        transition_probs = {}
        if not self.can_action_at(state):
            # Already on the terminal cell.
            return transition_probs
        
        opposite_direction = Action(action.value * -1)
        
        for suggest_action in self.actions:
            next_state = self._move(state, suggest_action)
            
            if suggest_action == action:
                prob = self.move_prob
            elif suggest_action != opposite_direction :
                prob = (1 - self.move_prob) / 2
            else:
                prob = 0
                
            if next_state not in transition_probs:
                transition_probs[next_state] = prob
            else:
                transition_probs[next_state] += prob

        return transition_probs
    
    def can_action_at(self, state: State) -> bool:
        """
        stateがアクション可能なセルかどうか判定
        """
        # Indexエラー（迷路外の座標を参照）をキャッチするように実装
        try:
            if self.grid[state.row][state.column] == 0:
                return True
            else:
                return False
        except IndexError as e:
            raise ValueError(f"This state is out of mase! {state}")
    
    def _move(self, state: State, aciton) -> State:
        """
        位置とアクションを受け取り、アクション可能な位置であれば、
        受け取ったアクション方向に移動した位置に移動する。
        移動先位置が迷路の外であれば、そのままの位置を返す。
        """
        if not self.can_action_at:
            raise ValueError("Can't move from here!")
        
        next_state = state.clone()

        # Execute an action (move).
        if action == Action.UP:
            next_state.row -= 1
        elif action == Action.DOWN:
            next_state.row += 1
        elif action == Action.LEFT:
            next_state.column -= 1
        elif action == Action.RIGHT:
            next_state.column += 1

        # Check whether a state is out of the grid.
        if not (0 <= next_state.row < self.row_length):
            next_state = state
        if not (0 <= next_state.column < self.column_length):
            next_state = state

        # Check whether the agent bumped a block cell.
        if self.grid[next_state.row][next_state.column] == 9:
            next_state = state

        return next_state

- 報酬関数$R$

    - 状態を受け取り、迷路の構造に応じた報酬関数(reward_func)を返す。
    - 今回の設定では「緑(ゴール)のセルなら1、赤(ペナルティ)のセルなら-1」
    
    
- Environment.step
    - このメソッドでエージェントから受け取ったアクションに応じた状態遷移、即時報酬算出をする。

In [6]:
class Environment(Environment):
    def reward_func(self, state: State) -> (float, bool):
        """
        受け取ったstateの報酬と、ゲームが終了したか否かを返す
        デフォルトでは負の値（-0.04）を設定しているが、
        歩き回るだけでは報酬が減る（早くゴールするよう促す）影響を与える。
        """
        reward = self.default_reward
        done = False

        # Check an attribute of next state.
        attribute = self.grid[state.row][state.column]
        if attribute == 1:
            # Get reward! and the game ends.
            reward = 1
            done = True
        elif attribute == -1:
            # Get damage! and the game ends.
            reward = -1
            done = True

        return reward, done

    def reset(self) -> State:
        # Locate the agent at lower left corner.
        self.agent_state = State(self.row_length - 1, 0)
        return self.agent_state

    def step(self, action) -> (State, float, bool):
        """
        現在のエージェントの状態にアクションを適用し、遷移先状態, 即時報酬, 終了判定を返す
        """
        next_state, reward, done = self.transit(self.agent_state, action)
        if next_state is not None:
            self.agent_state = next_state

        return next_state, reward, done

    def transit(self, state: State, action: Action) -> (State, float, bool):
        """
        状態とアクションを受け取り、遷移関数によって遷移確率を算出、。
        確率に応じた繊維を実行し、遷移先状態、即時報酬、終了判定を返す。
        """
        transition_probs = self.transition_func(state, action)
        if len(transition_probs) == 0:
            return None, None, True

        next_states = []
        probs = []
        for s in transition_probs:
            next_states.append(s)
            probs.append(transition_probs[s])

        next_state = np.random.choice(next_states, p=probs)
        reward, done = self.reward_func(next_state)
        return next_state, reward, done

### 4. Demonstrate (エージェントを動かしてみる)

- 方策（policy）に従って移動するエージェントを実装する。
    - 今回はランダム移動
    
    
- Environment(環境)を作成し、ループで10回迷路を探索する。
- agent.policyによりactionが選択され、env.stepからアクションactionに応じた遷移先(next_state)と即時報酬(reward)を得る。



In [7]:

import random
class Agent():
    
    def __init__(self, env: Environment):
        """
        エージェントの初期化
        """
        self.actions = env.actions
        
    def policy(self, state: State) -> Action:
        """
        状態を受け取り、アクションを決定する。
        """
        return random.choice(self.actions)

In [8]:
## Demonstrate
grid = [
    [0, 0, 0, 1],
    [0, 9, 0, -1],
    [0, 0, 0, 0]
]

env = Environment(grid)
agent = Agent(env)

In [9]:
# 環境はagentをattributeとしてはも持っていない
# テキストの実装、next_stateだったりをメソッドの戻り値として定義する必要性はある？
# doneもenvの@propertyで env.is_terminalとかで定義すればよくない？ （agent_stateの位置のgridの値を参照すれば良い）　
# -> そうすればEpisode内でnext_state保存する必要もdoneを置く必要もないので簡潔になる。
for i in range(10):
    # Initialize position of agent.
    state = env.reset()
    total_reward = 0
    done = False
    
    while not done:
        action = agent.policy(env.agent_state)
        _, reward, done =  env.step(action)
        if reward:
            total_reward += reward
        
    print(f"Episode {i}, Agent gets {total_reward} reward.")
    

Episode 0, Agent gets 0.4800000000000001 reward.
Episode 1, Agent gets -1.7200000000000002 reward.
Episode 2, Agent gets -1.8400000000000003 reward.
Episode 3, Agent gets -3.280000000000001 reward.
Episode 4, Agent gets -1.56 reward.
Episode 5, Agent gets -3.120000000000001 reward.
Episode 6, Agent gets 0.6000000000000001 reward.
Episode 7, Agent gets -1.32 reward.
Episode 8, Agent gets 0.0399999999999997 reward.
Episode 9, Agent gets 0.19999999999999984 reward.


### 5. 色々いじって遊ぼう！
- transit_funcやreward_funcを変えてみる
- agentの足跡を辿れるようにする
- etc.