In [4]:
# パッケージのimport# パッケージ 
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

In [5]:
# 定数の設定
NUM_DIZITIZED = 6  # 各状態の離散値への分割数
GAMMA = 0.99  # 時間割引率
ETA = 0.5  # 学習係数
MAX_STEPS = 200  # 1試行のstep数
NUM_EPISODES = 1000  # 最大試行回数

In [6]:
class Agent:
    '''CartPoleのエージェントクラスです、棒付き台車そのものになります'''

    def __init__(self, num_states, num_actions):
        self.brain = Brain(num_states, num_actions)  # エージェントが行動を決定するための頭脳を生成

    def update_Q_function(self, observation, action, reward, observation_next):
        '''Q関数の更新'''
        self.brain.update_Q_table(
            observation, action, reward, observation_next)

    def get_action(self, observation, step):
        '''行動の決定'''
        action = self.brain.decide_action(observation, step)
        return action

In [7]:
class Brain:
    '''エージェントが持つ脳となるクラスです、Q学習を実行します'''

    def __init__(self, num_states, num_actions):
        self.num_actions = num_actions  # CartPoleの行動（右に左に押す）の2を取得

        # Qテーブルを作成。行数は状態を分割数^（4変数）にデジタル変換した値、列数は行動数を示す
        self.q_table = np.random.uniform(low=0, high=1, size=(
            NUM_DIZITIZED**num_states, num_actions))


    def bins(self, clip_min, clip_max, num):
        '''観測した状態（連続値）を離散値にデジタル変換する閾値を求める'''
        return np.linspace(clip_min, clip_max, num + 1)[1:-1]

    def digitize_state(self, observation):
        '''観測したobservation状態を、離散値に変換する'''
        cart_pos, cart_v, pole_angle, pole_v = observation
        digitized = [
            np.digitize(cart_pos, bins=self.bins(-2.4, 2.4, NUM_DIZITIZED)),
            np.digitize(cart_v, bins=self.bins(-3.0, 3.0, NUM_DIZITIZED)),
            np.digitize(pole_angle, bins=self.bins(-0.5, 0.5, NUM_DIZITIZED)),
            np.digitize(pole_v, bins=self.bins(-2.0, 2.0, NUM_DIZITIZED))
        ]
        return sum([x * (NUM_DIZITIZED**i) for i, x in enumerate(digitized)])

    def update_Q_table(self, observation, action, reward, observation_next):
        '''QテーブルをQ学習により更新'''
        state = self.digitize_state(observation)  # 状態を離散化
        state_next = self.digitize_state(observation_next)  # 次の状態を離散化
        Max_Q_next = max(self.q_table[state_next][:])
        self.q_table[state, action] = self.q_table[state, action] + \
            ETA * (reward + GAMMA * Max_Q_next - self.q_table[state, action])

    def decide_action(self, observation, episode):
        '''ε-greedy法で徐々に最適行動のみを採用する'''
        state = self.digitize_state(observation)
        epsilon = 0.5 * (1 / (episode + 1))

        if epsilon <= np.random.uniform(0, 1):
            action = np.argmax(self.q_table[state][:])
        else:
            action = np.random.choice(self.num_actions)  # 0,1の行動をランダムに返す
        return action

In [9]:
class  EnvironmentEnviron :

    #####################  ロボットハンドのためのメソッド  #############################

    def __init__(self):
        num_states = 4  # 課題の状態の数(面積と重心と、それぞれの変化量)
        num_actions = 4  # CartPoleの行動（上下左右に動く）
        self.agent = Agent(num_states, num_actions)  # 環境内で行動するAgentを生成
        self.video = cv2.VideoCapture(0) # カメラ起動
        
        init_frame = self.video.read()
        total_area =  np.prod(init_frame.shape[:1])
        self.area_threth =total_area*0.3 # 画面の3割が赤色になれば終了にする
        
    def reset(self):
        '''環境を初期化する'''
        # TODO: ロボットハンドがランダムに進路をとる
        frame = self.vidoe.read()
        ''' TODO: 赤色の面積, 重心の位置を取得する
        area_sum = 
        area_v = 0
        center_pos = 
        center_v = 0
        '''
        observation = (area_sum, area_v, center_pos, center_v )
        return observation, frame
    
    def get_env(self, area_sum_before, center_pos_before):
        '''環境を認識する'''
        # カメラで写真をとりOpeCVで面積と重心を取得する
        frame = self.video.read()
        '''TODO: 赤色の面積とその変化量, 重心の位置とその変化量を取得する
        area_sum = 
        area_v = area_sum - area_sum_before
        center_pos = 
        center_v = center_pos - center_pos_before
        '''
        observation = (area_sum, area_v, center_pos, center_v )
        return observation, frame
    
    def act_env(self, observation, action):
        '''TODO: 決定したactionに従って、ロボットハンドを動かす'''
        if action == 0:
            # 前
        elif action == 1:
            # 後
        elif action == 2:
            # 右
        elif action == 3:
            # 左
        area_sum, _, center_pos, _ = observation
        observation_next, _ = self.get_env(area_sum, center_pos)
        done = self.is_done()
        return observation_next, done
        
    def is_done(self, observation):
        '''observationによって終了判定をする'''
        # TODO: 終了判定は面積以外にある？
        done = False
        area_sum, area_v, center_pos, center_v = observation
        if area_sum > self.area_threth:
            done = True
        return done
    
    #########################################################

    def run(self):
        '''実行'''
        complete_episodes = 0  # 195step以上連続で立ち続けた試行数
        is_episode_final = False  # 最終試行フラグ
        frames = []  # 動画用に画像を格納する変数

        for episode in range(NUM_EPISODES):  # 試行数分繰り返す
            observation, frame = self.reset()  # 環境の初期化

            for step in range(MAX_STEPS):  # 1エピソードのループ

                if is_episode_final is True:  # 最終試行ではframesに各時刻の画像を追加していく
                    frames.append(frame)

                # 行動を求める
                action = self.agent.get_action(observation, episode)

                # 行動a_tの実行により、s_{t+1}, r_{t+1}を求める
                observation_next, done = self.act_env(observation, action)

                # 報酬を与える
                if done:  # ステップ数が200経過するか、一定角度以上傾くとdoneはtrueになる
                    if step < 195:
                        reward = -1  # 途中でこけたら罰則として報酬-1を与える
                        complete_episodes = 0  # 195step以上連続で立ち続けた試行数をリセット
                    else:
                        reward = 1  # 立ったまま終了時は報酬1を与える
                        complete_episodes += 1  # 連続記録を更新
                else:
                    reward = 0  # 途中の報酬は0

                # step+1の状態observation_nextを用いて,Q関数を更新する
                self.agent.update_Q_function(
                    observation, action, reward, observation_next)

                # 観測の更新
                observation = observation_next

                # 終了時の処理
                if done:
                    print('{0} Episode: Finished after {1} time steps'.format(
                        episode, step + 1))
                    break

            if is_episode_final is True:  # 最終試行では動画を保存と描画
                display_frames_as_gif(frames)
                break

            if complete_episodes >= 10:  # 10連続成功なら
                print('10回連続成功')
                is_episode_final = True  # 次の試行を描画を行う最終試行とする