In [1]:
import numpy as np  # ベクトル・行列演算ライブラリ
import pygame       # 画像作成・画像表示・キー操作用のライブラリ
import sys

class CorridorEnv():
    """ コリドータスクの環境クラス """
    # 内部表現のID
    ID_blank = 0  # 空白
    ID_robot = 1  # ロボット
    ID_crystal = 2  # クリスタル

    def __init__(self,
            field_length=4,         # int: フィールドの長さ
            crystal_candidate=(2, 3),  # tuple of int: ゴールの位置
            rwd_fail=-1.0,            # 失敗した時の報酬（ペナルティ）
            rwd_move=-1.0,            # 進んだ時の報酬（コスト）
            rwd_crystal=5.0,          # クリスタルを得た時の報酬
            ):
        """ 初期処理 """
        # 行動の数
        self.n_act = 2
        # 最終状態判定
        self.done = False
        
        """ インスタンス生成時の処理 """
        # タスクパラメータ
        self.field_length = field_length
        self.crystal_candidate = crystal_candidate
        self.rwd_fail = rwd_fail
        self.rwd_move = rwd_move
        self.rwd_crystal = rwd_crystal

        # 内部状態の変数
        self.robot_pos = None       # ロボットの位置
        self.crystal_pos = None     # クリスタルの位置
        self.robot_state = None     # render 用

    def reset(self):
        """ 状態を初期化 """
        self.done = False
        
        # ロボットを通常状態に戻す
        self.robot_state = 'normal'

        # ロボットの位置を開始位置へ戻す
        self.robot_pos = 0

        # クリスタルの位置をランダムに決める
        idx = np.random.randint(len(self.crystal_candidate))
        self.crystal_pos = self.crystal_candidate[idx]

        # ロボットとクリスタルの位置から観測を作る
        obs = self._make_obs()
        
        return obs

    def _make_obs(self):
        """ 状態から観測を作成 """
        # 最終状態判定がTrueだったら 9999 を出力
        if self.done is True:
            obs = np.array([9] * self.field_length)
            return obs

        # ロボットとクリスタルの位置から観測を作成
        obs = np.ones(self.field_length, dtype=int) * CorridorEnv.ID_blank # 1 * 0
        obs[self.crystal_pos] = CorridorEnv.ID_crystal
        obs[self.robot_pos] = CorridorEnv.ID_robot

        return obs

    def step(self, act):
        """ 状態を更新 """
        # 最終状態の次の状態はリセット
        if self.done is True:
            obs = self.reset()
            return None, None, obs

        # 行動act に対して状態を更新する
        if act == 0:  # 拾う
            if self.robot_pos == self.crystal_pos:
                # クリスタルの場所で「拾う」を選んだ
                rwd = self.rwd_crystal
                done = True
                self.robot_state = 'success'
            else:
                # クリスタル以外の場所で「拾う」を選んだ
                rwd = self.rwd_fail
                done = True
                self.robot_state = 'fail'
        else:  # act==1 進む
            next_pos = self.robot_pos + 1
            if next_pos >= self.field_length:
                # 右端で「進む」を選んだ
                rwd = self.rwd_fail
                done = True
                self.robot_state = 'fail'
            else:
                # 右端より前で「進む」を選んだ
                self.robot_pos = next_pos
                rwd = self.rwd_move
                done = False
                self.robot_state = 'normal'

        self.done = done 
        # obsを作成
        obs = self._make_obs()
        
        return rwd, done, obs

# 強化学習情報表示の関数定義
def show_info(t, act, rwd, done, obs, isFirst=False):
    """ 強化学習情報の表示 """
    if rwd is None:
        if isFirst:
            tt = t
        else:
            tt = t + 1
        print('')
        print(f'x({tt:d})={str(obs):s}')
    else:
        msg = (
            f'a({t:d})={act:d}, ' +
            f'r({t:d})={rwd: .2f}, ' +
            f'done({t:d})={done:}, ' +
            f'x({t + 1:d})={str(obs):s}'
        )
        print(msg)

pygame 2.5.2 (SDL 2.28.3, Python 3.11.4)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
def main():
    # 操作方法の表示
    msg = (
        '\n' +
        '---- 操作方法 -------------------------------------\n'
        '[f] 右に進む\n' +
        '[d] 拾う\n' +
        '[q] 終了\n' +
        'クリスタルを拾うと成功\n' +
        '---------------------------------------------------'
    )
    print(msg)
    
    # 開始
    pygame.init()
    
    # 環境の準備
    env = CorridorEnv()
    
    # スクリーンの設定
    screen_size = (100 * env.field_length, 100)
    screen = pygame.display.set_mode(screen_size)
    pygame.display.set_caption("Corridor Task - Robot Collecting Gem")
    
    # マスの設定
    field_size = 100
    fields = [pygame.Rect(i * field_size, 0, field_size, field_size) for i in range(env.field_length)]
    
    # 色の定義
    BLACK = (0, 0, 0)
    WHITE = (255, 255, 255)
    RED = (255, 0, 0)
    GREEN = (0, 255, 0)
    BLUE = (0, 0, 255)
    GRAY = (128, 128, 128)
    
    # 強化学習情報の初期化
    t = 0
    obs = env.reset()
    act = None
    rwd = None
    done = None
    
    running = True
    clock = pygame.time.Clock()
    
    # メインループ
    while running:
        # 画面の更新
        screen.fill(WHITE)

        # フィールドの描画
        for field in fields:
            pygame.draw.rect(screen, BLACK, field, 1)
            
        # クリスタルの描画
        pygame.draw.circle(screen, BLUE, fields[env.crystal_pos].center, field_size // 4)    
        
        # ロボットの描画
        if env.robot_state == 'normal':
            robot_color = GRAY
        elif env.robot_state == 'success':
            robot_color = GREEN
        elif env.robot_state == 'fail':
            robot_color = RED
        pygame.draw.circle(screen, robot_color, fields[env.robot_pos].center, field_size // 4)
        
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                running = False
            elif event.type == pygame.KEYDOWN:
                if event.key == pygame.K_q:
                    running = False  # ゲームをリセット
                elif event.key == pygame.K_d:
                    act = 0 # 拾う
                elif event.key == pygame.K_f:
                    act = 1 # 進む
        
        # キーが押された場合のみ環境を更新
        if act is not None:
            # 強化学習情報表示
            show_info(t, act, rwd, done, obs)
            rwd, done, obs = env.step(act)
            act = None  # actをリセット
            t += 1
        
        pygame.display.flip()
        clock.tick(30)

    pygame.quit()

In [3]:
if __name__ == '__main__':
    main()


---- 操作方法 -------------------------------------
[f] 右に進む
[d] 拾う
[q] 終了
クリスタルを拾うと成功
---------------------------------------------------

x(1)=[1 0 0 2]
a(1)=1, r(1)=-1.00, done(1)=False, x(2)=[0 1 0 2]
a(2)=1, r(2)=-1.00, done(2)=False, x(3)=[0 0 1 2]
a(3)=0, r(3)=-1.00, done(3)=False, x(4)=[0 0 0 1]
a(4)=1, r(4)= 5.00, done(4)=True, x(5)=[9 9 9 9]

x(6)=[1 0 2 0]
a(6)=1, r(6)=-1.00, done(6)=False, x(7)=[0 1 2 0]
a(7)=0, r(7)=-1.00, done(7)=False, x(8)=[0 0 1 0]
a(8)=1, r(8)= 5.00, done(8)=True, x(9)=[9 9 9 9]

x(10)=[1 0 2 0]
a(10)=1, r(10)=-1.00, done(10)=True, x(11)=[9 9 9 9]

x(12)=[1 0 2 0]
a(12)=1, r(12)=-1.00, done(12)=False, x(13)=[0 1 2 0]
a(13)=0, r(13)=-1.00, done(13)=False, x(14)=[0 0 1 0]
a(14)=0, r(14)= 5.00, done(14)=True, x(15)=[9 9 9 9]

x(16)=[1 0 2 0]
a(16)=1, r(16)=-1.00, done(16)=False, x(17)=[0 1 2 0]
a(17)=0, r(17)=-1.00, done(17)=False, x(18)=[0 0 1 0]
a(18)=0, r(18)= 5.00, done(18)=True, x(19)=[9 9 9 9]

x(20)=[1 0 0 2]
a(20)=1, r(20)=-1.00, done(20)=False, x(2

In [None]:
import sys
import pickle
import numpy as np

class TableQAgt():
    """ Qテーブルを使ったQ学習エージェントクラス """
    def __init__(           # 引数とデフォルト値の設定
            self,
            n_act=2,            # int: 行動の種類数
            init_val_Q=0,       # float: Q値の初期値
            epsilon=0.1,        # float: 乱雑度
            alpha=0.1,          # float: 学習率
            gamma=0.9,          # float: 割引率
            max_memory=500,     # int: 記憶する最大の観測数
            filepath=None,      # str: 保存用ファイル名
            ):
        """ 初期処理 """
        self.epsilon = epsilon
        self.n_act = n_act
        # エージェントのハイパーパラメータ
        self.init_val_Q = init_val_Q
        self.gamma = gamma
        self.alpha = alpha

        # 保存ファイル名
        self.filepath = filepath

        # Qテーブル関連
        self.Q = {}     # Qテーブル
        self.len_Q = 0  # Qテーブルに登録した観測の数
        self.max_memory = max_memory

    def select_action(self, obs):
        """ 観測に対して行動を出力 """
        # obsを文字列に変換
        obs = str(obs)

        # obs が登録されていなかったら初期値を与えて登録
        self._check_and_add_observation(obs)

        # 確率的に処理を分岐
        if np.random.rand() < self.epsilon:
            # epsilon の確率
            act = np.random.randint(0, self.n_act)  # ランダム行動
        else:
            # 1-epsilon の確率
            act = np.argmax(self.Q[obs])  # Qを最大にする行動
        return act

    def _check_and_add_observation(self, obs):
        """ obs が登録されていなかったら初期値を与えて登録 """
        if obs not in self.Q:  # (A)
            self.Q[obs] = [self.init_val_Q] * self.n_act  # (B)
            self.len_Q += 1  # (C)
            if self.len_Q > self.max_memory:  # (D)
                print(f'観測の登録数が上限 ' +
                      f'{self.max_memory:d} に達しました。')
                sys.exit()
            if (self.len_Q < 100 and self.len_Q % 10 == 0) or \
                    (self.len_Q % 100 == 0):  # (E)
                print(f'the number of obs in Q-table' +
                      f' --- {self.len_Q:d}')

    def learn(self, obs, act, rwd, done, next_obs):
        """ 学習 """
        if rwd is None:  # rwdがNoneだったら戻る(A)
            return
        # ------------------------- 編集ここから
        # obs, next_obs を文字列に変換 (B)
        obs = str(obs)
        next_obs = str(next_obs)

        # next_obs が登録されていなかったら初期値を与えて登録 (C)
        self._check_and_add_observation(next_obs)

        # 学習のターゲットを作成 (D)
        if done is True:
            target = rwd
        else:
            target = rwd + self.gamma * max(self.Q[next_obs])

        # Qをターゲットに近づける (E)
        self.Q[obs][act] = \
            (1 - self.alpha) * self.Q[obs][act] + \
            self.alpha * target
        # ------------------------- ここまで

    def get_Q(self, obs):
        """ 観測に対するQ値を出力 """
        # ------------------------- 編集ここから
        obs = str(obs)
        if obs in self.Q:   # obsがQにある (A)
            val = self.Q[obs]
            Q = np.array(val)
        else:               # obsがQにない (B)
            Q = None
        # ------------------------- ここまで
        return Q

    def save_weights(self, filepath=None):
        """ 方策のパラメータの保存 """
        # ------------------------- 編集ここから
        # Qテーブルの保存
        if filepath is None:
            filepath = self.filepath + '.pkl'
        with open(filepath, mode='wb') as f:
            pickle.dump(self.Q, f)
        # ------------------------- ここまで

    def load_weights(self, filepath=None):
        """ 方策のパラメータの読み込み """
        # ------------------------- 編集ここから
        # Qテーブルの読み込み
        if filepath is None:
            filepath = self.filepath + '.pkl'
        with open(filepath, mode='rb') as f:
            self.Q = pickle.load(f)
        # ------------------------- ここまで


if __name__ == '__main__':
    # 学習のステップ数 (A)
    n_step = 5000

    # コマンドライン引数 (B)
    argvs = sys.argv
    if len(argvs) > 1:
        n_step = int(argvs[1])
    print(f'{n_step:d}ステップの学習シミュレーション開始')

    # 環境の準備 (C)
    from env_corridor import CorridorEnv
    env = CorridorEnv()

    # 環境のパラメータの与え方の例
    """
    env = CorridorEnv(
        field_length=6,
        crystal_candidate=(2, 3, 4, 5),
        rwd_fail=-1,
        rwd_move=0,
        rwd_crystal=5,
    )
    """

    # エージェントの準備 (D)
    agt = TableQAgt(
        alpha=0.2,
        gamma=1,
        epsilon=0.5,
        )

    # 学習シミュレーション (E)
    obs = env.reset()
    for t in range(n_step):
        # エージェントが行動を選ぶ (F)
        act = agt.select_action(obs)

        # 環境が報酬と次の観測を決める (G)
        rwd, done, next_obs = env.step(act)

        # エージェントが学習する (H)
        agt.learn(obs, act, rwd, done, next_obs)

        # next_obsを次の学習のために保持 (I)
        obs = next_obs

    # 学習後のQ値の表示のための入力観測 (J)
    obss = [
        '[1 0 2 0]',
        '[0 1 2 0]',
        '[0 0 1 0]',
        '[0 0 2 1]',
        '[1 0 0 2]',
        '[0 1 0 2]',
        '[0 0 1 2]',
        '[0 0 0 1]',
    ]

    # 学習後のQ値の表示 (K)
    print('')
    print('学習後のQ値')
    for obs in obss:
        q_vals = agt.get_Q(obs)
        if q_vals is not None:
            msg = (
                f'{obs}: ' +
                f'{agt.Q[obs][0]: .2f}, ' +
                f'{agt.Q[obs][1]: .2f}'
            )
            print(msg)
        else:
            print(f'{obs}:')

    # 学習結果を見せるためのシミュレーションの準備(L)
    import cv2
    agt.epsilon = 0

    # 強化学習情報の初期化 (M)
    t = 0
    obs = env.reset()
    act = None
    rwd = None
    done = None
    next_obs = None

    # 開始メッセージを表示 (N)
    print('')
    print('学習なしシミュレーション開始')

    # 強化学習情報表示関数の定義 (O)
    def show_info(t, act, rwd, done, obs, isFirst=None):
        if rwd is None:
            if isFirst:
                tt = t
            else:
                tt = t + 1
            print('')
            print(f'x({tt:d})={str(obs):s}')
        else:
            msg = (
                f'a({t:d})={act:d}, ' +
                f'r({t:d})={rwd: .2f}, ' +
                f'done({t:d})={done:}, ' +
                f'x({t + 1:d})={str(obs):s}'
            )
            print(msg)

    # 強化学習情報表示 (P)
    show_info(t, act, rwd, done, obs, isFirst=True)

    # 学習なしシミュレーション (Q)
    while True:
        # 画面表示 (R)
        image = env.render()
        cv2.imshow('agt', image)

        # キーの受付と終了処理 (S)
        key = cv2.waitKey(0)
        if key == ord('q'):
            break

        # エージェントの行動選択 (T)
        act = agt.select_action(obs)

        # 環境の更新 (U)
        rwd, done, obs = env.step(act)

        # 強化学習情報表示 (V)
        show_info(t, act, rwd, done, obs)
        t += 1
