### Deep Learning Final Project
#### Playing Flappy Bird Using Reinforcement Learning
Leo Li(), Zhangnan Jiang(), Zichen Yang()
In this project, we would like to build our own reinforcement learning neural network to play the
Flappy Bird game.

#### Importing necessary libraries:

In [1]:
import os
import cv2
import shutil
import numpy as np
import torch
import torch.nn as nn
import torchsummary
from random import random, randint, sample
from tensorboardX import SummaryWriter

#### Defining function used to preprocess game frame data (converting to grayscale):

In [2]:
def pre_processing(image, width, height):
    image = cv2.cvtColor(cv2.resize(image, (width, height)), cv2.COLOR_BGR2GRAY)
    _, image = cv2.threshold(image, 1, 255, cv2.THRESH_BINARY)
    return image[None, :, :].astype(np.float32)


#### Defining Model Structure:

In [3]:
class DeepQNetwork(nn.Module):
    def __init__(self):
        super(DeepQNetwork, self).__init__()

        self.conv1 = nn.Sequential(nn.Conv2d(4, 32, kernel_size=8, stride=4), nn.ReLU(inplace=True))
        self.conv2 = nn.Sequential(nn.Conv2d(32, 64, kernel_size=4, stride=2), nn.ReLU(inplace=True))
        self.conv3 = nn.Sequential(nn.Conv2d(64, 64, kernel_size=3, stride=1), nn.ReLU(inplace=True))

        self.fc1 = nn.Sequential(nn.Linear(7 * 7 * 64, 512), nn.ReLU(inplace=True))
        self.fc2 = nn.Linear(512, 2)
        self.init_weights()

    def init_weights(self):
        for module in self.modules():
            if isinstance(module, nn.Conv2d) or isinstance(module, nn.Linear):
                nn.init.uniform_(module.weight, -0.01, 0.01)
                nn.init.constant_(module.bias, 0)

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = output.view(x.size(0), -1)
        x = self.fc1(x)
        x = self.fc2(x)

        return output


In [4]:
torch.cuda.is_available()

True

#### The training procedure:

In [5]:
saved_path="trained_models/flappy_bird"
log_path="tensorboard"
image_size=84
lr=1e-6
num_iters=2000000
initial_epsilon=0.1
final_epsilon=1e-4
replay_memory_size=50000
gamma=0.99
batch_size=32

In [6]:
torch.cuda.manual_seed(123)
model = DeepQNetwork()
model = torch.load("trained_models/flappy_bird")
# torchsummary.summary(model, (4, 8, 8))



In [7]:
if os.path.isdir(log_path):
        shutil.rmtree(log_path)
os.makedirs(log_path)
writer = SummaryWriter(log_path)

In [8]:
%load_ext tensorboard
%tensorboard --logdir tensorboard

In [9]:
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
criterion = nn.MSELoss()

In [10]:
from src.flappy_bird import FlappyBird
game_state = FlappyBird()
image, reward, terminal = game_state.next_frame(0)
image = pre_processing(image[:game_state.screen_width, :int(game_state.base_y)], image_size, image_size)
image = torch.from_numpy(image)

pygame 2.1.2 (SDL 2.0.16, Python 3.6.15)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [11]:
model.cuda()
image = image.cuda()
state = torch.cat(tuple(image for _ in range(4)))[None, :, :, :]

In [12]:
replay_memory = []
iter = 0

In [13]:
try:
    while iter < num_iters:
        prediction = model(state)[0]
        # Exploration or exploitation
        epsilon = final_epsilon + (
                (num_iters - iter) * (initial_epsilon - final_epsilon) / num_iters)
        u = random()
        random_action = u <= epsilon
        if random_action:
            # print("Perform a random action")
            action = randint(0, 1)
        else:

            action = torch.argmax(prediction)

        next_image, reward, terminal = game_state.next_frame(action)
        next_image = pre_processing(next_image[:game_state.screen_width, :int(game_state.base_y)], image_size,
                                    image_size)
        next_image = torch.from_numpy(next_image)
        if torch.cuda.is_available():
            next_image = next_image.cuda()
        next_state = torch.cat((state[0, 1:, :, :], next_image))[None, :, :, :]
        replay_memory.append([state, action, reward, next_state, terminal])
        if len(replay_memory) > replay_memory_size:
            del replay_memory[0]
        batch = sample(replay_memory, min(len(replay_memory), batch_size))
        state_batch, action_batch, reward_batch, next_state_batch, terminal_batch = zip(*batch)

        state_batch = torch.cat(tuple(state for state in state_batch))
        action_batch = torch.from_numpy(
            np.array([[1, 0] if action == 0 else [0, 1] for action in action_batch], dtype=np.float32))
        reward_batch = torch.from_numpy(np.array(reward_batch, dtype=np.float32)[:, None])
        next_state_batch = torch.cat(tuple(state for state in next_state_batch))

        if torch.cuda.is_available():
            state_batch = state_batch.cuda()
            action_batch = action_batch.cuda()
            reward_batch = reward_batch.cuda()
            next_state_batch = next_state_batch.cuda()
        current_prediction_batch = model(state_batch)
        next_prediction_batch = model(next_state_batch)

        y_batch = torch.cat(
            tuple(reward if terminal else reward + gamma * torch.max(prediction) for reward, terminal, prediction in
                zip(reward_batch, terminal_batch, next_prediction_batch)))

        q_value = torch.sum(current_prediction_batch * action_batch, dim=1)
        optimizer.zero_grad()
        # y_batch = y_batch.detach()
        loss = criterion(q_value, y_batch)
        loss.backward()
        optimizer.step()

        state = next_state
        iter += 1
        # only write on every 1000th iteration
        if iter % 1000 == 0:
            print("Iteration: {}/{}, Action: {}, Loss: {}, Epsilon {}, Reward: {}, Q-value: {}".format(
                iter + 1,
                num_iters,
                action,
                loss,
                epsilon, reward, torch.max(prediction)))
            writer.add_scalar('Train/Loss', loss, iter)
            writer.add_scalar('Train/Epsilon', epsilon, iter)
            writer.add_scalar('Train/Reward', reward, iter)
            writer.add_scalar('Train/Q-value', torch.max(prediction), iter)
        if (iter+1) % 1000000 == 0:
            torch.save(model, "{}/fireball_flappy_bird_init{}".format(saved_path, iter+1))
except KeyboardInterrupt:
    print("Saving model before quit")
    torch.save(model, "{}/fireball_flappy_bird_init{}".format(saved_path, iter+1))
torch.save(model, "{}/fireball_flappy_bird_init".format(saved_path))

Current FPS:  30.303030014038086
Current FPS:  30.21148109436035
Current FPS:  30.1204833984375
Current FPS:  30.21148109436035
Current FPS:  30.395137786865234
Current FPS:  30.21148109436035
Current FPS:  30.21148109436035
Current FPS:  30.303030014038086
Current FPS:  30.21148109436035
Current FPS:  30.303030014038086
Current FPS:  30.395137786865234
Current FPS:  30.395137786865234
Current FPS:  30.21148109436035
Current FPS:  30.1204833984375
Current FPS:  30.1204833984375
Current FPS:  30.21148109436035
Current FPS:  30.21148109436035
Iteration: 1001/2000000, Action: 0, Loss: 0.0043078092858195305, Epsilon 0.09995009995000001, Reward: 0.1, Q-value: 1.3297972679138184
Current FPS:  30.303030014038086
Current FPS:  30.21148109436035
Current FPS:  30.303030014038086
Current FPS:  30.303030014038086
Current FPS:  30.1204833984375
Current FPS:  30.303030014038086
Current FPS:  30.303030014038086
Current FPS:  30.21148109436035
Current FPS:  30.1204833984375
Current FPS:  30.2114810943

NotADirectoryError: [Errno 20] Not a directory: 'trained_models/flappy_bird/fireball_flappy_bird_init11235'

#### Test Procedure:

In [10]:
saved_path = "trained_models"
image_size = 84

In [11]:
torch.cuda.manual_seed(123)

In [13]:
model = torch.load("{}/fireball_flappy_bird".format(saved_path))

model.eval()
try:
    from src.flappy_bird import FlappyBird
    game_state = FlappyBird()
    image, reward, terminal = game_state.next_frame(0)
    image = pre_processing(image[:game_state.screen_width, :int(game_state.base_y)], image_size, image_size)
    image = torch.from_numpy(image)
    if torch.cuda.is_available():
        model.cuda()
        image = image.cuda()
    state = torch.cat(tuple(image for _ in range(4)))[None, :, :, :]

    while True:
        prediction = model(state)[0]
        action = torch.argmax(prediction)

        next_image, reward, terminal = game_state.next_frame(action)
        next_image = pre_processing(next_image[:game_state.screen_width, :int(game_state.base_y)], image_size,
                                    image_size)
        next_image = torch.from_numpy(next_image)
        if torch.cuda.is_available():
            next_image = next_image.cuda()
        next_state = torch.cat((state[0, 1:, :, :], next_image))[None, :, :, :]

        state = next_state
except KeyboardInterrupt:
    print("Quit")

Current FPS:  30.303030014038086
Current FPS:  30.1204833984375
Current FPS:  30.1204833984375
Current FPS:  30.1204833984375
Current FPS:  30.303030014038086
Current FPS:  30.303030014038086
Current FPS:  30.1204833984375
Quit
