In [1]:
from ple import PLE
import numpy as np
from ple.games.flappybird import FlappyBird
from collections import defaultdict
import random
import math
import json
import matplotlib.pyplot as plt
%load_ext autoreload
%autoreload 2
%matplotlib inline
from IPython.display import clear_output


pygame 2.2.0 (SDL 2.0.22, Python 3.10.10)
Hello from the pygame community. https://www.pygame.org/contribute.html
couldn't import doomish
Couldn't import doom


In [2]:
def convert_state(s):
    dx = int(s['next_pipe_dist_to_player'])
    dy = int(s['player_y'] - s['next_pipe_bottom_y'])
    vel = int(s['player_vel'])

    rx = 20
    ry = 10
    dx = rx * (dx // rx)
    dy = ry * (dy // ry)

    res = str(dx) + '_' + str(dy) + '_' + str(vel)
    return res



In [3]:
rewards = [-1000, 0.1]
# die, nothing

In [4]:
import random
class ReplayBuffer(object):
    def __init__(self, size):
        self._storage = []
        self._maxsize = size
        self._replaceId = 0
        
    def __len__(self):
        return len(self._storage)

    def add(self, obs_t, action, reward, obs_tp1):
        data = (obs_t, action, reward, obs_tp1)
        if len(self._storage) == self._maxsize:
            self._storage[self._replaceId] = data
            self._replaceId = (self._replaceId + 1) % self._maxsize
        else:
            self._storage.append(data)
        
    def sample(self, batch_size):
        states = []
        actions = []
        rewards = []
        next_states = []

        for _ in range(batch_size):
            data = random.choice(self._storage)
            state, action, reward, next_state = data

            states.append(state)
            actions.append(action)
            rewards.append(reward)
            next_states.append(next_state)
        
        return states, actions, rewards, next_states


In [5]:
class QLearningAgent():
  def __init__(self, alpha, epsilon, discount, need_train, load_from=None, save_to=None):
    self.is_training = need_train
    self.load_from = load_from
    self.save_to = save_to

    self.qValues = {}
    self.alpha = alpha
    self.epsilon = epsilon
    self.discount = discount

    self.moves = []
    # self.replay = ReplayBuffer(10000)
    self.played = 0

    self.best_score = -1
    self.records = []
    self.mean_rewards = []

    self.load_qvalues()

  def get_qValue(self, state, action):
    if not (state in self.qValues):
      self.qValues[state] = [0, 0, 0] # do nothing, flap, had_before
      return 0.0

    return self.qValues[state][action]

  def set_qValue(self, state, action, value):
    self.qValues[state][action] = value

# -----------

  def get_action(self, state):
    # 0 - do nothing
    # 1 - flap

    s = convert_state(state)

    if random.random() < self.epsilon or self.get_qValue(s, 2) == 0:
      return random.random() >= 0.9 

    return self.get_qValue(s, 1) > self.get_qValue(s, 0)
  
  def episode_end(self):
    self.epsilon = max(0.01, self.epsilon * 0.9993)
    # self.alpha = max(0.05, self.alpha - self.alpha_decay)

    if self.is_training:
      moves = list(reversed(self.moves))

      time_from_die = 0
      for (past_state, action, next_state) in moves:
        reward = rewards[1]
        
        time_from_die += 1
        
        if time_from_die <= 4:
          reward = rewards[0]

        past_s = convert_state(past_state)
        next_s = convert_state(next_state)

        learning_rate = self.alpha
        self.set_qValue(past_s, action, (1 - learning_rate) * self.get_qValue(past_s, action) + \
                        learning_rate * (reward + self.discount * max(self.get_qValue(next_s, 0), self.get_qValue(next_s, 1))))

        # self.replay.add(past_s, action, reward, next_s)
        self.set_qValue(past_s, 2, self.get_qValue(past_s, 2) + 1)
    
    self.moves = []

  def update(self, batch_size=10):
    if self.replay.__len__() == 0:
      return
    
    states, actions, rewards, next_states = self.replay.sample(batch_size)
    for i in range(len(states)):
      gamma = self.discount
      learning_rate = self.alpha
      self.set_qValue(states[i], actions[i], (1 - learning_rate) * self.get_qValue(states[i], actions[i]) + \
                        learning_rate * (rewards[i] + gamma * max(self.get_qValue(next_states[i], 0), self.get_qValue(next_states[i], 1))))

  def add_move(self, past_state, action, next_state):
    self.moves.append((past_state, action, next_state))

  def load_qvalues(self):
    if (self.load_from == None):
      return
    
    print(f"Loading saved Q-table states from {self.load_from}...")
    try:
      with open(self.load_from, "r") as f:
        saves = json.load(f)
        self.played = saves['played']
        self.epsilon = saves['epsilon']
        self.qValues = saves['qValues']
        self.mean_rewards = saves['mean_rewards']
        self.best_score = saves['best_score']
        self.records = saves['records']
        print("Loaded successfully!")
    except IOError:
      print("No saves found")

  def save_qvalues(self):
    if self.save_to != None:
      print(f"Saving Q-table with {len(self.qValues.keys())} states to file...")
      with open(self.save_to, "w") as f:
        json_data = {
          'played': self.played,
          'epsilon': self.epsilon,
          'qValues': self.qValues,
          'mean_rewards': self.mean_rewards,
          'best_score': self.best_score,
          'records': self.records,
        }

        json.dump(json_data, f, sort_keys=True, indent=4)
    

In [6]:
game = FlappyBird()

p = PLE(game, fps=30, display_screen=False, force_fps=True)
p.init()

possible_actions = p.getActionSet()

In [7]:
import time 

def play_and_train(agent, play_time = 15, save_qvalues = False):
    time_end = time.time() + 60 * play_time
    agent.mean_rewards = []

    # we will plot graph with mean score of past k games
    k = 1
    past_k_results = []

    past_mx = 0.0
    current_score = 0.0
    played_games = 0

    while time.time() < time_end:
        if p.game_over():
            p.reset_game()
            played_games += 1

            agent.episode_end()
            agent.played += 1

            if (current_score > agent.best_score):
                agent.best_score = current_score
                agent.records.append((current_score, agent.played))
            
            past_k_results.append(current_score)
            if len(past_k_results) == k:
                agent.mean_rewards.append(np.mean(past_k_results))
                past_k_results = []

            past_mx = max(past_mx, current_score)

            # remove "or True" part if plot reloads often
            if agent.played % 100 == 0 or True:
                clear_output(True)
                print(f"N: {agent.played}, best: {agent.best_score}, past {k}: {past_mx}, eps: {agent.epsilon}")
                
                # plt.plot(range(1, len(rewards) + 1), rewards, label="reward")
                plt.plot(np.linspace(1, len(rewards), num=len(agent.mean_rewards)), agent.mean_rewards, label="mean reward")
                plt.xlabel("game")
                plt.ylabel("score")
                plt.legend()
                plt.show()

                past_mx = 0.0

            current_score = 0.0

        state = p.getGameState()
        action = agent.get_action(state)

        score_change = p.act(possible_actions[action ^ 1])
        current_score += score_change > 0

        next_state = p.getGameState()
        agent.add_move(state, action, next_state)

        # agent.update(batch_size=3)
    
    if save_qvalues:
        agent.save_qvalues()
    

In [None]:
for i in range(10):
    agent = QLearningAgent(alpha=0.2, epsilon=1, discount=0.97, need_train=1, load_from="data/q_values.json", save_to=None)
    play_and_train(agent=agent, play_time=15, save_qvalues=False)
