## ライブラリのインポート

In [73]:
import math
import copy
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import os
os.environ["SDL_VIDEODRIVER"] = "dummy"  # this line make pop-out window not appear
from ple.games.flappybird import FlappyBird
from ple import PLE
from collections import defaultdict

## アニメーションの作成

In [74]:
def make_anim(images, fps=60, true_image=False):
    duration = len(images) / fps
    import moviepy.editor as mpy

    def make_frame(t):
        try:
            x = images[int(len(images) / duration * t)]
        except:
            x = images[-1]

        if true_image:
            return x.astype(np.uint8)
        else:
            return ((x + 1) / 2 * 255).astype(np.uint8)

    clip = mpy.VideoClip(make_frame, duration=duration)
    clip.fps = fps
    return clip

## グラフの作成

In [75]:
def make_graph(reward_per_epoch, lifetime_per_epoch):
    fig, (axL, axR) = plt.subplots(ncols=2, figsize=(10,4))
    axL.set_title('lifetime')
    axL.grid(True)
    axL.plot(lifetime_per_epoch)
    axR.set_title('reward')
    axR.grid(True)
    axR.plot(reward_per_epoch)
    fig.show()

## 定数の宣言

In [76]:
ETA = 0.5
GAMMA = 0.99
GOAL_FRAME = 1200 # 目標フレーム数

## Agentクラス

In [77]:
class Agent:
    def __init__(self, num_actions):
        self.brain = Brain(num_actions)
    
    def update_Q_function(self, state, action, reward, observation_prime):
        self.brain.update_policy(state, action, reward, observation_prime)
    
    def get_action(self, state, episode):
        action = self.brain.decide_action(state, episode)
        return action

## Brainクラス

In [78]:
bucket_range_per_feature = {
  'next_next_pipe_bottom_y': 40,
  'next_next_pipe_dist_to_player': 512,
  'next_next_pipe_top_y': 40,
  'next_pipe_bottom_y': 20,
  'next_pipe_dist_to_player': 20,
  'next_pipe_top_y': 20,
  'player_vel': 4,
  'player_y': 16
}

class Brain:
    
    def __init__(self, num_actions):
        self.num_actions = num_actions
        
        self.q_table = defaultdict(lambda: np.zeros(num_actions))

    def decide_action(self, state, episode):
        # epsilon-greedy
        state_idx = self.get_state_idx(state)
        epsilon = 0.5 * (1 / (episode + 1))
        if epsilon <= np.random.uniform(0, 1):
            action = np.argmax(self.q_table[state_idx])  # Q値が最大の行動を選択する
        else:
            action = np.random.choice(self.num_actions)  # ランダムな行動を選択する
        return action

    def update_policy(self, state, action, reward, state_prime):
        state_idx = self.get_state_idx(state)
        state_prime_idx = self.get_state_idx(state_prime)
        # Q学習を用いてQ値を更新する
        best_q = np.max(self.q_table[state_prime_idx])
        self.q_table[state_idx][action] += ETA * (
            reward + GAMMA * best_q - self.q_table[state_idx][action])
    
    def get_state_idx(self, state):
        # パイプの絶対位置の代わりに相対位置を使用する
        state = copy.deepcopy(state)
        state['next_next_pipe_bottom_y'] -= state['player_y']
        state['next_next_pipe_top_y'] -= state['player_y']
        state['next_pipe_bottom_y'] -= state['player_y']
        state['next_pipe_top_y'] -= state['player_y']

        # アルファベット順に並び替える
        state_key = [k for k, v in sorted(state.items())]

        # 相対位置を返す
        state_idx = []
        for key in state_key:
            state_idx.append(int(state[key] / bucket_range_per_feature[key]))
        return tuple(state_idx)
        

## Environmentクラス

In [None]:
class Environment:
    
    def __init__(self, graph=True):
        self.game = FlappyBird()
        self.env = PLE(self.game, fps=30, display_screen=False)  # environment interface to game
        #self.env = PLE(self.game, fps=30, display_screen=True)  # environment interface to game
        self.num_actions = len(self.env.getActionSet())
        self.agent = Agent(self.num_actions)
        self.graph=graph
    
    def run(self):
        
        from IPython.display import Image, display

        reward_per_epoch = []
        lifetime_per_epoch = []
        print_every_episode = 500
        show_gif_every_episode = 5000
        NUM_EPISODE = 50000
        for episode in range(0, NUM_EPISODE):
            # 環境のリセット
            self.env.reset_game()
            # record frame
            frames = [self.env.getScreenRGB()]

            # for every 500 episodes, shutdown exploration to see performance of greedy action
            #if episode % print_every_episode == 0:
            #    self.agent.shutdown_explore()

            # 状態の初期化
            state = self.game.getGameState()
            cum_reward = 0  # このエピソードにおける累積報酬の和
            t = 0

            while not self.env.game_over():

                # 行動の選択
                action = self.agent.get_action(state, episode)

                # 行動を実行し、報酬を得る
                reward = self.env.act(
                        self.env.getActionSet()[action])  # パイプを超えれば、reward +=1 失敗したら reward  -= 5

                frames.append(self.env.getScreenRGB())

                # 累積報酬
                cum_reward += reward

                # observe the result
                state_prime = self.game.getGameState()  # 次状態を得る

                # update agent
                self.agent.update_Q_function(state, action, reward, state_prime)

                # Setting up for the next iteration
                state = state_prime
                t += 1

            # update exploring_rate and learning_rate
            #self.agent.update_parameters(episode)

            if episode % print_every_episode == 0:
                print("Episode %d finished after %f time steps" % (episode, t))
                print("cumulated reward: %f" % cum_reward)
                reward_per_epoch.append(cum_reward)
                lifetime_per_epoch.append(t)

            # for every 5000 episode, record an animation
            if episode % show_gif_every_episode == 0:
                print("len frames:", len(frames))
                clip = make_anim(frames, fps=60, true_image=True).rotate(-90)
                display(clip.ipython_display(fps=60, autoplay=1, loop=1))
                if len(frames) > GOAL_FRAME:
                    if self.graph == True:
                        make_graph(reward_per_epoch, lifetime_per_epoch)
                    break

## Main関数

In [None]:
flappybird_env = Environment()
flappybird_env.run()

 25%|██▍       | 15/61 [00:00<00:00, 146.62it/s]

Episode 0 finished after 59.000000 time steps
cumulated reward: -5.000000
len frames: 60


 98%|█████████▊| 60/61 [00:00<00:00, 206.84it/s]


Episode 500 finished after 62.000000 time steps
cumulated reward: -5.000000
Episode 1000 finished after 62.000000 time steps
cumulated reward: -5.000000
Episode 1500 finished after 67.000000 time steps
cumulated reward: -4.000000
Episode 2000 finished after 59.000000 time steps
cumulated reward: -5.000000
Episode 2500 finished after 62.000000 time steps
cumulated reward: -5.000000
