# Train of Tetris Reinforcement Learning

This code is inspired by the following project: https://github.com/uvipen/Tetris-deep-Q-learning-pytorch

In [45]:
# import necessary libraries
import argparse
import os
import shutil
from collections import deque
from random import randint, random, sample
import numpy as np
import torch
import torch.nn as nn
from tensorboardX import SummaryWriter
from src.deep_q_network import DeepQNetwork
from src.tetris import Tetris

In [46]:
# define a class to hold options/configuration
class Options:
    def __init__(self):
        # game parameters
        self.width = 10
        self.height = 20
        self.block_size = 30
        # learning parameters
        self.batch_size = 512
        self.lr = 1e-3
        self.gamma = 0.99
        self.initial_epsilon = 1
        self.final_epsilon = 1e-3
        self.num_decay_epochs = 2000
        self.num_epochs = 3000
        self.save_interval = 1000
        self.replay_memory_size = 30000
        # paths for saving logs and models
        self.log_path = "tensorboard"
        self.saved_path = "trained_models"

In [47]:
# create an instance of options
opt = Options()

In [48]:
# initialize variables
total_reward = 0.0
total_episodes = 0
max_score = 0.0

In [49]:
# set the seed for torch
if torch.cuda.is_available():
    torch.cuda.manual_seed(123)
else:
    torch.manual_seed(123)

In [50]:
# prepare the log directory
if os.path.isdir(opt.log_path):
    shutil.rmtree(opt.log_path)
os.makedirs(opt.log_path)

In [51]:
# create a SummaryWriter for tensorboard
writer = SummaryWriter(opt.log_path)

In [52]:
# initialize the Tetris environment
env = Tetris(width=opt.width, height=opt.height, block_size=opt.block_size)


In [53]:
# initialize the DeepQNetwork model
model = DeepQNetwork()

In [54]:
# set the optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=opt.lr)

In [55]:
# set the loss function
criterion = nn.MSELoss()

In [56]:
# reset the game environment
state = env.reset()

In [57]:
# if cuda is available, move model and state to GPU
if torch.cuda.is_available():
    model.cuda()
    state = state.cuda()

In [58]:
# initialize replay memory
replay_memory = deque(maxlen=opt.replay_memory_size)

In [None]:
# start training epochs (this wasn't executed again prior to pushing, training took two days)
epoch = 0
while epoch < opt.num_epochs:
    # get the next possible states
    next_steps = env.get_next_states()
    
    # determine if the agent should explore or exploit
    epsilon = opt.final_epsilon + (max(opt.num_decay_epochs - epoch, 0) * (
            opt.initial_epsilon - opt.final_epsilon) / opt.num_decay_epochs)
    u = random()
    random_action = u <= epsilon
    
    # extract next possible actions and their states
    next_actions, next_states = zip(*next_steps.items())
    next_states = torch.stack(next_states)
    
    # if cuda is available, move the next_states to GPU
    if torch.cuda.is_available():
        next_states = next_states.cuda()

    # switch to eval mode for prediction
    model.eval()
    with torch.no_grad():
        predictions = model(next_states)[:, 0]
    model.train()
    
    # choose action
    if random_action:
        index = randint(0, len(next_steps) - 1)
    else:
        index = torch.argmax(predictions).item()

    next_state = next_states[index, :]
    action = next_actions[index]
    
    # make the chosen action, get reward and check if game is done
    reward, done = env.step(action, render=True)

    if torch.cuda.is_available():
        next_state = next_state.cuda()

    # append the experience to the replay memory
    replay_memory.append([state, reward, next_state, done])
    
    if done:
        # if the game is done, reset the game environment and save the score
        final_score = env.score
        final_tetrominoes = env.tetrominoes
        final_cleared_lines = env.cleared_lines
        state = env.reset()
        if torch.cuda.is_available():
            state = state.cuda()
    else:
        state = next_state
        continue

    # wait until replay memory is sufficiently full
    if len(replay_memory) < opt.replay_memory_size / 10:
        continue

    # calculate and print average reward
    total_reward += final_score
    total_episodes += 1
    average_reward = total_reward / total_episodes
    print("Average reward: {}".format(average_reward))

    # log the rewards
    writer.add_scalar('Train/Average Reward', average_reward, epoch - 1)
    
    # calculate and print max score
    max_score = max(max_score, final_score)
    print("Maximal achieved score: {}".format(max_score))
    writer.add_scalar('Train/Max Score', max_score, epoch - 1)
    
    # increment the epoch
    epoch += 1
    
    # sample a batch from the replay memory
    batch = sample(replay_memory, min(len(replay_memory), opt.batch_size))
    state_batch, reward_batch, next_state_batch, done_batch = zip(*batch)
    state_batch = torch.stack(tuple(state for state in state_batch))
    reward_batch = torch.from_numpy(np.array(reward_batch, dtype=np.float32)[:, None])
    next_state_batch = torch.stack(tuple(state for state in next_state_batch))

    if torch.cuda.is_available():
        state_batch = state_batch.cuda()
        reward_batch = reward_batch.cuda()
        next_state_batch = next_state_batch.cuda()

    # compute the q values
    q_values = model(state_batch)
    
    # switch to eval mode for prediction
    model.eval()
    with torch.no_grad():
        next_prediction_batch = model(next_state_batch)
    model.train()
    
    # compute the target q values
    y_batch = torch.cat(
        tuple(reward if done else reward + opt.gamma * prediction for reward, done, prediction in
                zip(reward_batch, done_batch, next_prediction_batch)))[:, None]

    # compute the loss, perform backpropagation, and update the weights
    optimizer.zero_grad()
    loss = criterion(q_values, y_batch)
    loss.backward()
    optimizer.step()

    # print the training information
    print("Epoch: {}/{}, Action: {}, Score: {}, Tetrominoes {}, Cleared lines: {}".format(
        epoch,
        opt.num_epochs,
        action,
        final_score,
        final_tetrominoes,
        final_cleared_lines))
    writer.add_scalar('Train/Score', final_score, epoch - 1)
    writer.add_scalar('Train/Tetrominoes', final_tetrominoes, epoch - 1)
    writer.add_scalar('Train/Cleared lines', final_cleared_lines, epoch - 1)
    writer.add_scalar('Train/Loss', loss.item(), epoch - 1)
    writer.add_scalar('Train/Epsilon', epsilon, epoch - 1)

    # save the model at certain intervals
    if epoch > 0 and epoch % opt.save_interval == 0:
        torch.save(model, "{}/tetris_{}".format(opt.saved_path, epoch))

In [None]:
# save the final model
torch.save(model, "{}/tetris".format(opt.saved_path))