<a href="https://colab.research.google.com/github/popolome/Flappy-Bird-RL-DQN/blob/main/Flappy_Bird_RL_DQN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 1 - Install Dependencies

In [1]:
!pip install pygame
!pip install tensorflow
!pip install gymnasium



# 2 - Import libraries

In [2]:
import pygame
import random
import numpy as np
from collections import deque
import tensorflow as tf
from tensorflow.keras import models, layers, optimizers

pygame 2.6.1 (SDL 2.28.4, Python 3.12.12)
Hello from the pygame community. https://www.pygame.org/contribute.html


# 3 - Set up environment

In [None]:
class FlappyBirdEnv:
  def __init__(self):
    self.screen_width = 288
    self.screen_height = 512
    self.pipe_width = 52
    self.pipe_gap = 100
    self.gravity = 1
    self.flap_power = -10
    self.bird_y = None
    self.bird_vel = None
    self.pipe_x = None
    self.pipe_y = None
    self.done = False

  def reset(self):
    self.bird_y = self.screen_height / 2
    self.bird_vel = 0
    self.pipe_x = self.screen_width
    self.pipe_y = np.random.randint(50, self.screen_height - 50 - self.pipe_gap)
    self.done = False
    return self.get_state()

  def step(self, action):
    if action == 1:     # Action = 0: do nothing, Action = 1: flap
      self.bird_vel = self.flap_power
    self.bird_vel += self.gravity
    self.bird_y += self.bird_vel

    self.pipe_x -= 3    # This is the pipe moving speed

    if self.pipe_x < -self.screen_width:      # If pipe went off screen, reset it's position
      self.pipe_x = self.screen_width
      self.pipe_y = np.random.randint(50, self.screen_height - 50 - self.pipe_gap)

    if self.bird_y < 0 or self.bird_y > self.screen_height:
      self.done = True
      reward = -1       # Negative reward if bird went too low or too high screen height
    elif (self.pipe_x < 50 and self.pipe_x + self.pipe_width > 0) and \
         (self.bird_y < self.pipe_y or self.bird_y > self.pipe_y + self.pipe_gap):
        self.done = True
        reward = -1     # Negative reward if bird collide with pipe
    else:
      reward = 0.1      # Bird gets rewarded for being alive

    return self.get_state(), reward, self.done(), {}

  def get_state(self):      # Returns 4 dimension states: [bird_y, bird_vel, pipe_dist_x, pipe_dist_y]
    pipe_dist_x = self.pipe_x
    pipe_dist_y = self.pipe_y + self.pipe_gap/2 - self.bird_y
    return np.array([self.bird_y, self.bird_vel, pipe_dist_x, pipe_dist_y], dtype=np.float32)

# 4 - DQN Network (Keras)

In [3]:
def build_model(state_dim, action_dim):
  model = models.Sequential({
      layers.Dense(128, activation='relu', input_shape=(state_dim,)),
      layers.Dense(128, activation='relu'),
      layers.Dense(action_dim, activation='linear')
  })
  model.compile(optimizer=optimizers.Adam(learning_rate=0.001), loss='mse')
  return model

# 5 - Replay Buffer

In [None]:
class ReplayBuffer:
  def __init__(self, max_size=10000):   # Set a default max size of 10,000
    self.buffer = deque(maxlen=max_size)    # Discard the oldest tuple when more than 10,000

  def push(self, state, action, reward, next_state, done):
    self.buffer.append((state, action, reward, next_state, done))   # Adds in tuple into self.buffer(which is the deque)

  def sample(self, batch_size):
    idx = np.random.choice(len(self.buffer), batch_size, replace=False)   # Randomly pick 32 tuple indices from the self.buffer
    states, actions, rewards, next_states, dones = zip(*[self.buffer[i] for i in idx])    # Breaks them up and group them based on each states, actions, rewards, next_states, and dones
    return np.array(states), np.array(actions), np.array(rewards), np.array(next_states), np.array(dones)   # Return them as arrays

  def __len__(self):
    return len(self.buffer)   # Return the len of the self.buffer if the ReplayBuffer len is called

# 6 - DQN Agent

In [None]:
class DQNAgent:
  def __init__(self, state_dim, action_dim, gamma=0.99, epsilon=1.0, min_epsilon=0.05, epsilon_decay=0.995):
    self.state_dim = state_dim
    self.action_dim = action_dim
    self.gamma = gamma
    self.epsilon = epsilon
    self.min_epsilon = min_epsilon
    self.epsilon_decay = epsilon_decay    # Making variables store in instance of class DQNAgent to be used by other methods

    self.model = build_model(state_dim, action_dim)
    self.memory = ReplayBuffer()

  def select_action(self, state):
    if np.random.rand() < self.epsilon:
      return np.random.randint(self.action_dim)   # Take a random action if less then epsilon value
    q_values = self.model.predict(state[np.newaxis, :], verbose=0)    # Converts the 1D array into 2D array with batch_size and state
    return np.argmax(q_values[0])   # Else exploit the highest q-value

  def train(self, batch_size=64):     # Temporary stops training first if self.buffer less than batch_size
    if len(self.memory) < batch_size:
      return

    states, actions, rewards, next_states, dones = self.memory.sample(batch_size)   # Randomly pick complete experiences and returns batches of states, actions, rewards, next_states, dones

    q_next = self.model.predict(next_states, verbose=0)   # Predict future Q-values
    q_target = self.model.predict(states, verbose=0)    # Predict Q-values

    for i in range(batch_size):
      q_target[i, actions[i]] = rewards[i] + self.gamma * np.max(q_next[i]) * (1 - dones[i])    # The bellman equation from 0 to batch_size

    self.model.fit(states, q_target, epochs=1, verbose=0)   # Adjust the network based on single batch

    self.epsilon = max(self.min_epsilon, self.epsilon * self.epsilon_decay)   # Slowly transition from exploration to exploitation

# 7 - Training Loop

In [None]:
env = FlappyBirdEnv()     # Create the environment
state_dim = 4
action_dim = 2

num_episodes = 5000
agent = DQNAgent(state_dim=state_dim, action_dim=action_dim)

reward_history = []     # For monitoring the model progress

for episode in range(num_episodes):
  state = env.reset()
  done = False
  total_rewards = 0

  while not done:
    action = agent.select_action(state)
    next_state, reward, done, info = env.step(action)

    agent.memory.push(state, action, reward, next_state, done)
    agent.train()

    state = next_state
    total_rewards += reward

  reward_history.append(total_rewards)      # Append the total_rewards into monitoring list

  if (episode + 1) % 500 == 0:
    print(f"Episode {episode + 1}, Total Reward: {total_rewards}, Epsilon: {agent.epsilon:.3f}")

# 8 - Monitoring Running Average

In [1]:
import matplotlib.pyplot as plt
plt.plot([np.mean(reward_history[i:i+100]) for i in range(len(reward_history)-100)])
plt.xlabel("Episodes")
plt.ylabel("Running Average Reward")
plt.title("Flappy Bird DQN Training Progress")
plt.show()

NameError: name 'reward_history' is not defined

# 9 - Save the Trained Model

In [None]:
agent.model.save("flappy_dqn_model.h5")