# Double Deep Q-Learning to play 2048

by Jennifer Bryson, Madina Abdrakhmanova, and Caleb Nelson

In this group project we used the code outline given from chapter 16 in the O'Reilly textbook Hands On Machine Learning with Scikit-Learn and TensorFlow by Aurélien Géron.  In the textbook, they followed the ideas of DeepMind and used reinforcement learning to play Ms. Pac-Man.  We modified the code to play 2048 instead.  We also added "double" deep Q-learning to improve performance.  Our overall outcome is that we found several different neural network configurations which can get to the 512 tile around 5% of the time (which never happens for random gameplay) with as little as 35 minutes worth of training time, but more work must be done to reach 2048.  Here's what we have so far!

First, we must make the 2048 game:

In [1]:
# coding: utf-8
from types import *
import copy
import math
import random

DirectionMap = { 0:'up',    'up':0,    'u':0,
                 1:'right', 'right':1, 'r':1, 
                 2:'down',  'down':2,  'd':2,
                 3:'left',  'left':3,  'l':3 }

class Board:
    board = []
    score = 0

    def __init__(self, grid = None, scr = None):
        self.board = []
        if (scr == None):
            scr = 0
        self.score = int(scr)
        if (grid == None):
            grid = "2,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,0,0"
        if (type(grid) is list):
            self.board = copy.deepcopy(grid)
        else:
            grid = list(map(int, grid.replace(' ', '').split(',')))
            if len(grid) < 16:
                grid.extend([0 for x in range(16-len(grid))])
            for row in range(0, 4):
                self.board.append(grid[4*row:4*(row+1)])

    def __str__(self):
        rowStrs = []
        for row in self.board:
            rowStrs.append(','.join(map(str, row)))
        return (', '.join(rowStrs) + ", " + str(self.score))

    #returns true if board[r1][c1] can move onto board[r0][c0], false otherwise
    def moveOk(self, r0, c0, r1, c1):
        a = self.board[r0][c0]
        b = self.board[r1][c1]
        return (a == 0 and b != 0) or (a != 0 and a == b)

    #returns true if dir is a valid move for board, false otherwise
    def tryDir(self, dir):
        if(type(dir) == int):
            dir = DirectionMap[dir]
        if (dir == 'l' or dir == "left"):
            for row in range(0,4):
                for col in range(0, 3):
                    if (self.moveOk(row, col, row, col+1)):
                        return True
        elif (dir == 'u' or dir == "up"):
            for col in range(0, 4):
                for row in range(0, 3):
                    if (self.moveOk(row, col, row+1, col)):
                        return True
        elif (dir == 'r' or dir == "right"):
            for row in range(0,4):
                for col in range(1, 4):
                    if (self.moveOk(row, col, row, col-1)):
                        return True
        elif (dir == 'd' or dir == "down"):
            for col in range(0, 4):
                for row in range(1, 4):
                    if (self.moveOk(row, col, row-1, col)):
                        return True
        return False

    #returns true if the board has any valid moves, false otherwise
    def canMove(self):
        return(self.tryDir('l') or self.tryDir('u') or self.tryDir('r') or self.tryDir('d'))

    #returns a board that would be the result if the current board was moved in direction dir
    def move(self, dir):
        if(type(dir) == int):
            dir = DirectionMap[dir]
        newboard = Board(self.board, self.score)
        newscr = self.score
        if (not self.tryDir(dir)):
            return newboard
        else:
            if (dir == 'l' or dir == "left"):
                for row in range(0, 4):
                    vscore = makeMove(newboard.board[row])
                    newboard.board[row] = vscore[0]
                    newscr += vscore[1]
            elif (dir == 'u' or dir == "up"):
                for col in range(0, 4):
                    vector = []
                    for row in range(0, 4):
                        vector.append(newboard.board[row][col])
                    vscore = makeMove(vector)
                    newscr += vscore[1]
                    for row in range(0, 4):
                        newboard.board[row][col] = vscore[0][row]
            elif (dir == 'r' or dir == "right"):
                for row in range(0, 4):
                    vector = []
                    for col in range(0, 4):
                        vector.append(newboard.board[row][3-col])
                    vscore = makeMove(vector)
                    newscr += vscore[1]
                    for col in range(0, 4):
                        newboard.board[row][3-col] = vscore[0][col]
            elif (dir == 'd' or dir == "down"):
                for col in range(0, 4):
                    vector = []
                    for row in range(0, 4):
                        vector.append(newboard.board[3-row][col])
                    vscore = makeMove(vector)
                    newscr += vscore[1]
                    for row in range(0, 4):
                        newboard.board[3-row][col] = vscore[0][row]
            newboard.score = newscr
            return newboard

    #returns true if the board can move num times, false if it can't.
    def moveNum(self, num):
        if (num <= 1):
            return self.canMove()
        return(self.move('l').drop().moveNum(num-1) or
               self.move('u').drop().moveNum(num-1) or
               self.move('r').drop().moveNum(num-1) or
               self.move('d').drop().moveNum(num-1))


    #returns an array of tuples that represent empty board locations, namely those that can recieve a random drop
    def possibleDrops(self):
        drops = []
        for row in range(0, 4):
            for col in range(0, 4):
                if (self.board[row][col] == 0):
                    drops.append((row, col))
        return drops

    #returns a board that would be the result if the current board were to recieve a random drop from list drops, or from possibleDrops if drops is null
    def drop(self, drops = None):
        if (drops == None):
            drops = self.possibleDrops()
        newboard = Board(self.board, self.score)
        if (len(drops) == 0):
            return newboard
        choice = random.choice(drops)
        if (random.random() >= .9):
            newboard.board[choice[0]][choice[1]] = 4
        else:
            newboard.board[choice[0]][choice[1]] = 2
        return newboard

#the following are helper functions to make move work
def squishZeros(vector):
    index = 0
    for i in range(0, len(vector)):
        if (vector[i] != 0):
            vector[index] = vector[i]
            if (i != index):
                vector[i] = 0
            index += 1
    return vector

def makeFusions(vector):
    newscr = 0
    for i in range(0, len(vector)-1):
        if (vector[i] == vector[i+1]):
            vector[i] += vector[i+1]
            newscr += vector[i]
            vector[i+1] = 0
    return [vector, newscr]

def makeMove(vector):
    vector = squishZeros(vector)
    vscore = makeFusions(vector)
    vscore[0] = squishZeros(vscore[0])
    return vscore

Load in the packages we will use:

In [2]:
import numpy as np
import tensorflow as tf
import os
import sys
import matplotlib.pyplot as plt

  from ._conv import register_converters as _register_converters


**IMPORTANT NOTE:** the next block of code is only for running on Colab.  Do not run it if you're running this on Jupyter.

In [216]:
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

Found GPU at: /device:GPU:0


Now is where the fun begins!

## Q Learning


In [3]:
tf.reset_default_graph()

Below we specify the convolutional neural network that we will use.  We've played around with several different configurations.  This one appears to be good, although we haven't done an exhuastive search, so more work could be done here.

In [4]:
input_height = 4
input_width = 4
input_channels = 1
conv_n_maps = [128, 128]
conv_kernel_sizes = [(2,2), (2,2)]
conv_strides = [1, 1]
conv_paddings = ["VALID"] * 2 
conv_activation = [tf.nn.relu] * 2
n_hidden_in = 128 * 2 * 2  # conv3 has 128 maps of 2x2 each
n_hidden = 256
hidden_activation = tf.nn.relu
n_outputs = 4  # 4 discrete actions are available
initializer = tf.contrib.layers.variance_scaling_initializer()

def q_network(X_state, name):
    prev_layer = X_state
    with tf.variable_scope(name) as scope:
        for n_maps, kernel_size, strides, padding, activation in zip(
                conv_n_maps, conv_kernel_sizes, conv_strides,
                conv_paddings, conv_activation):
            prev_layer = tf.layers.conv2d(
                prev_layer, filters=n_maps, kernel_size=kernel_size,
                strides=strides, padding=padding, activation=activation,
                kernel_initializer=initializer)
        last_conv_layer_flat = tf.reshape(prev_layer, shape=[-1, n_hidden_in])
        hidden = tf.layers.dense(last_conv_layer_flat, n_hidden,
                                 activation=hidden_activation,
                                 kernel_initializer=initializer)
        outputs = tf.layers.dense(hidden, n_outputs,
                                  kernel_initializer=initializer)
    trainable_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES,
                                       scope=scope.name)
    trainable_vars_by_name = {var.name[len(scope.name):]: var
                              for var in trainable_vars}
    return outputs, trainable_vars_by_name

We will use two networks: online and target.  This is one of the "tricks" by DeepMind which increased stability and success.

In [5]:
X_state = tf.placeholder(tf.float32, shape=[None, input_height, input_width,
                                            input_channels])
online_q_values, online_vars = q_network(X_state, name="q_networks/online")
target_q_values, target_vars = q_network(X_state, name="q_networks/target")

copy_ops = [target_var.assign(online_vars[var_name])
            for var_name, target_var in target_vars.items()]
copy_online_to_target = tf.group(*copy_ops)

In [6]:
online_vars

{'/conv2d/bias:0': <tf.Variable 'q_networks/online/conv2d/bias:0' shape=(128,) dtype=float32_ref>,
 '/conv2d/kernel:0': <tf.Variable 'q_networks/online/conv2d/kernel:0' shape=(2, 2, 1, 128) dtype=float32_ref>,
 '/conv2d_1/bias:0': <tf.Variable 'q_networks/online/conv2d_1/bias:0' shape=(128,) dtype=float32_ref>,
 '/conv2d_1/kernel:0': <tf.Variable 'q_networks/online/conv2d_1/kernel:0' shape=(2, 2, 128, 128) dtype=float32_ref>,
 '/dense/bias:0': <tf.Variable 'q_networks/online/dense/bias:0' shape=(256,) dtype=float32_ref>,
 '/dense/kernel:0': <tf.Variable 'q_networks/online/dense/kernel:0' shape=(512, 256) dtype=float32_ref>,
 '/dense_1/bias:0': <tf.Variable 'q_networks/online/dense_1/bias:0' shape=(4,) dtype=float32_ref>,
 '/dense_1/kernel:0': <tf.Variable 'q_networks/online/dense_1/kernel:0' shape=(256, 4) dtype=float32_ref>}

In [7]:
learning_rate = 0.001
momentum = 0.95

with tf.variable_scope("train"):
    X_action = tf.placeholder(tf.int32, shape=[None])
    y = tf.placeholder(tf.float32, shape=[None, 1])
    q_value = tf.reduce_sum(online_q_values * tf.one_hot(X_action, n_outputs),
                            axis=1, keep_dims=True)
    error = tf.abs(y - q_value)
    clipped_error = tf.clip_by_value(error, 0.0, 1.0)
    linear_error = 2 * (error - clipped_error)
    loss = tf.reduce_mean(tf.square(clipped_error) + linear_error)

    global_step = tf.Variable(0, trainable=False, name='global_step')
    optimizer = tf.train.MomentumOptimizer(learning_rate, momentum, use_nesterov=True)
    training_op = optimizer.minimize(loss, global_step=global_step)

init = tf.global_variables_initializer()
saver = tf.train.Saver()

Instructions for updating:
keep_dims is deprecated, use keepdims instead


Another crucial trick is replay memory.  We don't want to be training on subsequent gameboard states because those are highly correlated and this could cause us to find a local minimum instead of a global minimum.  Instead, we store 25,000 game states (roughly 1/8 of the total number of training iterations) and we will randomly choose batch_size=50 of these states for training.

In [19]:
from collections import deque

replay_memory_size = 25000
replay_memory = deque([], maxlen=replay_memory_size)

def sample_memories(batch_size):
    indices = np.random.permutation(len(replay_memory))[:batch_size]
    cols = [[], [], [], [], []] # state, action, reward, next_state, continue
    for idx in indices:
        memory = replay_memory[idx]
        for col, value in zip(cols, memory):
            col.append(value)
    cols = [np.array(col) for col in cols]
    return cols[0], cols[1], cols[2].reshape(-1, 1), cols[3], cols[4].reshape(-1, 1)

For the exploration/exploitation tradeoff, we will start with exploring 100% of the time and exponentially decrease the exploration to 10% of the time over the course of 100,000 steps.  Note: the decay step length should be related to the total number of training steps.  From other works, it seems the decay step length should be roughly half of the total number of training steps.

In [20]:
eps_min = 0.1
eps_max = 1.0
eps_decay_steps = 100000

def epsilon_greedy(q_values, step, state):
    epsilon = max(eps_min, eps_max - (eps_max-eps_min) * step/eps_decay_steps)
    if np.random.rand() < epsilon:
        return np.random.randint(n_outputs) # random action
    
    else:
        L = np.argsort(-q_values)
        if state.tryDir(int(L[0][0])): #if the action with the highest q_value is a valid move,
            return int(L[0][0])  #return the action with the highest q_value
        elif state.tryDir(int(L[0][1])):
            return int(L[0][1])  #return the action with the second highest q_value
        elif state.tryDir(int(L[0][2])):
            return int(L[0][2])  #return the action with the third highest q_value
        else:
            return int(L[0][3])  #return the action with the lowest q_value

We choose the total number of training steps, the batch_size for replay memory, the discount_rate, etc.

In [21]:
n_steps = 200000  # total number of training steps
training_start = 10000  # start training after 10,000 game iterations
training_interval = 4  # run a training step every 4 game iterations
save_steps = 1000  # save the model every 1,000 training steps
copy_steps = 10000  # copy online DQN to target DQN every 10,000 training steps
discount_rate = 0.99
skip_start = 90  # Skip the start of every game (it's just waiting time).
batch_size = 50

iteration = 0  # game iterations
checkpoint_path = "./my_dqn.ckpt"
done = True # env needs to be reset

In [11]:
loss_val = np.infty
game_length = 0
total_max_q = 0
mean_max_q = 0.0

This next block of code actually runs the training.  Warning: it will take around 35 minutes!

For claification, below "done" takes on the value 0 or 1 via done = 1 if the game is over, and done = 0 if it's not.
This is not to be confused with "continue" used in the replay memory.  In the replay memory, we store (state, action, reward, next_state, continue) where continue is the opposite of done.

In [26]:
with tf.Session() as sess:
    if os.path.isfile(checkpoint_path + ".index"):
        saver.restore(sess, checkpoint_path)
    else:
        init.run()
        copy_online_to_target.run()
    while True:
        step = global_step.eval()  
        if step >= n_steps:
            break
        iteration += 1
        print("\rIteration {}\tTraining step {}/{} ({:.1f})%\tLoss {:5f}\tMean Max-Q {:5f}   ".format(
            iteration, step, n_steps, step * 100 / n_steps, loss_val, mean_max_q), end="")
        if done: # game over, start again
            obs = Board()
            #for skip in range(skip_start): # skip the start of each game
            #    obs, reward, done, info = env.step(0)
            #state = preprocess_observation(obs)
            state = obs

        # Online DQN evaluates what to do
        temp = np.array(state.board).reshape(4,4,1)
        q_values = online_q_values.eval(feed_dict={X_state: [temp]})
        #print('online q values', q_values)
        action = epsilon_greedy(q_values, step, state)

        # Online DQN plays
        #next_state, reward, done, info= env.step(action)
        if (state.tryDir(action)):
            next_state = state.move(int(action)).drop()
            reward = next_state.score - state.score
        else:
            next_state = state
            reward = -100 #penalty for wasting our time
        done = (0 if next_state.canMove() else 1)
        
        # Let's memorize what happened
        replay_memory.append((state, action, reward, next_state, 1.0 - done))
        state = next_state

        # Compute statistics for tracking progress (not shown in the book)
        total_max_q += q_values.max()
        game_length += 1
        if done:
            mean_max_q = total_max_q / game_length
            total_max_q = 0.0
            game_length = 0

        if iteration < training_start or iteration % training_interval != 0:
            continue # only train after warmup period and at regular intervals
        
        # Sample memories and use the target DQN to produce the target Q-Value
        X_state_val, X_action_val, rewards, X_next_state_val, continues = (
            sample_memories(batch_size))
        
        X_next_state_val_board = []
        for state in X_next_state_val:
            X_next_state_val_board.append(np.array(state.board).reshape(4,4,1))
        
        #Standart Q learning
        #next_q_values = target_q_values.eval(
        #    feed_dict={X_state: X_next_state_val_board})

        #max_next_q_values = np.max(next_q_values, axis=1, keepdims=True)
        #y_val = rewards + continues * discount_rate * max_next_q_values
        
        #Double Q Learning
        target_next_q_values = target_q_values.eval(
            feed_dict={X_state: X_next_state_val_board})
        online_next_q_values = online_q_values.eval(
            feed_dict={X_state: X_next_state_val_board})
        next_actions = np.argmax(online_next_q_values, axis=1)
        chosen_next_q_values = target_next_q_values[np.arange(len(next_actions)), next_actions].reshape(batch_size,1)
        y_val = rewards + continues * discount_rate * chosen_next_q_values

        
        # Train the online DQN
        #print('X_state_val shape', X_state_val.shape)
        X_state_val_board = []
        for state in X_state_val:
            X_state_val_board.append(np.array(state.board).reshape(4,4,1))
        
        _, loss_val = sess.run([training_op, loss], feed_dict={
            X_state: X_state_val_board, X_action: X_action_val, y: y_val})

        # Regularly copy the online DQN to the target DQN
        if step % copy_steps == 0:
            copy_online_to_target.run()

        # And save regularly
        if step % save_steps == 0:
            saver.save(sess, checkpoint_path)

INFO:tensorflow:Restoring parameters from ./my_dqn.ckpt
Iteration 813992	Training step 199999/200000 (100.0)%	Loss 20.804014	Mean Max-Q 139.137575   

Great, it's been trained!  Now to see how it does!  This next block of code plays the game with the learned network 100 different times and saves the final score and the final gameboard frame.

In [27]:
num_trials = 100
final_frames = []
final_scores = np.zeros(num_trials)
for i in range (num_trials):
    print('trial: ', i)
    #frames = []
    with tf.Session() as sess:
        saver.restore(sess, checkpoint_path)

        step_count = 0

        obs = Board()

        while(True):
            state = obs
            state_board = np.array(state.board).reshape(4,4,1)
            # Online DQN evaluates what to do
            q_values = online_q_values.eval(feed_dict={X_state: [state_board]})
            #print(q_values)

            L = np.argsort(-q_values)
            #print(L)
            if state.tryDir(int(L[0][0])): #if the action with the highest q_value is a valid move,
                action = L[0][0]  #return the action with the highest q_value
            elif state.tryDir(int(L[0][1])):
                action = L[0][1]  #return the action with the second highest q_value
            elif state.tryDir(int(L[0][2])):
                action = L[0][2]  #return the action with the third highest q_value
            else:
                action = L[0][3]  #return the action with the lowest q_value


            #action = np.argmax(q_values)
            #print(action)
            # Online DQN plays
            next_state = state.move(int(action)).drop()

            #print(next_state.board)
            reward = next_state.score - state.score
            done = (0 if next_state.canMove() else 1)

            obs = next_state

            img = obs.board
            #frames.append(img)

            step_count = step_count+1

            if done:
                #print('Game Over, you lasted', step_count, 'steps.')
                #print('Final Score: ', obs.score)
                final_scores[i] = obs.score
                final_frames.append(img)
                break


trial:  0
INFO:tensorflow:Restoring parameters from ./my_dqn.ckpt
trial:  1
INFO:tensorflow:Restoring parameters from ./my_dqn.ckpt
trial:  2
INFO:tensorflow:Restoring parameters from ./my_dqn.ckpt
trial:  3
INFO:tensorflow:Restoring parameters from ./my_dqn.ckpt
trial:  4
INFO:tensorflow:Restoring parameters from ./my_dqn.ckpt
trial:  5
INFO:tensorflow:Restoring parameters from ./my_dqn.ckpt
trial:  6
INFO:tensorflow:Restoring parameters from ./my_dqn.ckpt
trial:  7
INFO:tensorflow:Restoring parameters from ./my_dqn.ckpt
trial:  8
INFO:tensorflow:Restoring parameters from ./my_dqn.ckpt
trial:  9
INFO:tensorflow:Restoring parameters from ./my_dqn.ckpt
trial:  10
INFO:tensorflow:Restoring parameters from ./my_dqn.ckpt
trial:  11
INFO:tensorflow:Restoring parameters from ./my_dqn.ckpt
trial:  12
INFO:tensorflow:Restoring parameters from ./my_dqn.ckpt
trial:  13
INFO:tensorflow:Restoring parameters from ./my_dqn.ckpt
trial:  14
INFO:tensorflow:Restoring parameters from ./my_dqn.ckpt
trial

We can print the average final score to see roughly how well it does.

In [28]:
print(np.mean(final_scores))
print(np.std(final_scores, ddof = 1))


1784.6
913.6733496709805


Also we can print out the log base 2 of the largest tile seen for each game (meaning if it got to 512 it would print 9 since $2^9$=512).

In [29]:
max_tiles = []
curr_max_tiles= []
print(len(final_frames))
for curr_final_frame in final_frames:
    max_tile = np.amax(curr_final_frame)
    log2_max_tile = int(np.log2(max_tile))
    curr_max_tiles.append(log2_max_tile)
print (curr_max_tiles)
max_tiles.append(curr_max_tiles)

100
[7, 6, 6, 7, 8, 7, 6, 7, 8, 7, 8, 7, 7, 8, 8, 8, 8, 5, 7, 8, 8, 8, 8, 6, 8, 7, 8, 6, 6, 7, 6, 6, 7, 7, 6, 7, 6, 8, 8, 8, 7, 6, 7, 6, 7, 8, 7, 7, 8, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 6, 5, 9, 7, 5, 6, 7, 6, 7, 7, 7, 6, 7, 7, 6, 5, 7, 7, 7, 6, 6, 7, 6, 7, 7, 7, 7, 5, 7, 8, 6, 7, 8, 7, 6, 8, 6]


Our highest tile from these 100 gameplays was 512, which is better than random gameplay so we've definitely learned something good!  Training for many more iterations will definitely improve this further, as will tinkering with parameters and neural network configurations some more.