# Double deep Q-learning in 2048

# Game imports
This is made slightly more complicated than it has to be, because the target folder has characters that are disallowed in python packages.

In [None]:
import os 
dir_path = os.path.abspath('')
dir_path = os.path.join(dir_path, '2048-python-custom-player')

import sys
sys.path.insert(0, dir_path)

import constants as c
from tie_in import TieIn # Used to launch a game of 2048

## Set game constants

In [None]:
c.GRID_LEN_X = 4
c.GRID_LEN_Y = 4
c.PROBABILITY_4 = 0.1

# The model

### Imports

In [None]:
import tensorflow as tf
import tensorflow.keras as keras

from tensorflow.keras import Sequential
from tensorflow.keras.layers import Conv2D, Dense, InputLayer, Reshape

### Clear previous models from memory (if needed)

In [None]:
keras.backend.clear_session()

## Instantiate the model

In [None]:
GAME_SHAPE = (c.GRID_LEN_X, c.GRID_LEN_Y, 1)

def create_model():
    model = Sequential([InputLayer(input_shape=GAME_SHAPE),
                         Conv2D(32, kernel_size=2, activation='relu'),
                        Conv2D(32, kernel_size=2, activation='relu'),
                        Conv2D(64, kernel_size=2, activation='relu'),
                        Dense(64, activation='relu'),
                        Dense(4, activation='softmax'),
                        Reshape((4,))])

    model.compile(loss=keras.losses.CategoricalCrossentropy(from_logits=False), optimizer=keras.optimizers.Adam())
    return model

model = create_model()
model.summary()

model_copy = create_model()
model_copy.trainable = False

# Preprocessing
Because the growth rate of the tiles in 2048 is exponential, we take log2 of all values, since linearly scaling values are more manageable. We also squish the values to be smaller in scale. For instance, 2 maps to 0.1, 1024 to 1, 8192 to 1.3, and 0 to 0.

In [None]:
import numpy as np

def reward_func(score_change):
    if score_change == 0:
        return 0
    return np.log2(score_change) / np.log2(1024)

def preprocess(matrix):
    non_zeros = np.nonzero(matrix)
    matrix[non_zeros] = np.log2(matrix[non_zeros]) / np.log2(1024)
    return matrix

def one_hot(moves):
    move_count = moves.shape[0]
    oh = np.zeros((move_count, 4))
    oh[np.arange(move_count), moves] = 1
    return oh

ACTION_SPACE = np.arange(4)
def sparse(moves):
    return np.dot(moves, ACTION_SPACE).astype(int)

## Data augmentation
Because of rotational symmetry, each game of 2048 corresponds to another set of three games (at least for square games). If we accept a move in one of the games as valid, then state and action's corresponding rotations must be equally valid. We use this argument to create more artificial games. Hopefully, this will also prevent a collapse in strategy where the model favors one corner.

In [None]:
move_rotation_lookup = { 0: 2, 1: 3, 2: 1, 3: 0} # UP->LEFT, DOWN->RIGHT, LEFT->DOWN, RIGHT->UP
def rotate_90_moves(moves):
    move_count = moves.shape[0]
    rot = np.zeros(moves.shape)
    for i in range(move_count): # TODO: this is likely an inefficient way to implement this
        rot[i] = move_rotation_lookup[rot[i]]
    return rot.astype(int)

def rotate_90_board(boards):
    return np.rot90(boards, axes=(1, 2))

# Other utility

In [None]:
def matrix_to_model_input(matrix):
    model_input = np.array(matrix)
    model_input = np.expand_dims(model_input, axis=-1) # Number of channels (1)
    model_input = np.expand_dims(model_input, axis=0) # Batch dimension
    return model_input

# Experience replay

In [None]:
class ExperienceReplay():
    def __init__(self, max_length):
        self.max_length = max_length
        self.state_memory_before = np.zeros((self.max_length, *GAME_SHAPE))
        self.state_memory_after = np.zeros((self.max_length, *GAME_SHAPE))
        self.action_memory = np.zeros((self.max_length, 4))
        self.reward_memory = np.zeros((self.max_length))
        self.terminal_memory = np.zeros((self.max_length))
        self.counter = 0
        
    def store(self, state_before, state_after, action, score_diff, tile_count_diff, done):
        state_before = matrix_to_model_input(state_before)
        state_after = matrix_to_model_input(state_after)
        action = np.array([action])
        reward = reward_func(score_diff)

        for rot in range(4):
            index = self.counter % self.max_length

            self.state_memory_before[index] = state_before
            self.state_memory_after[index] = state_after
            self.action_memory[index, :] = one_hot(action)[0, :]
            self.reward_memory[index] = reward
            self.terminal_memory[index] = 1 - done

            self.counter += 1            
            
            if rot < 3:
                state_before = rotate_90_board(state_before)
                state_after = rotate_90_board(state_after)
                action = rotate_90_moves(action)
        
    def sample(self, batch_size):
        max_length = np.minimum(self.max_length, self.counter)
        batch = np.random.choice(max_length, batch_size)
        states_before = self.state_memory_before[batch]
        states_after = self.state_memory_after[batch]
        actions = self.action_memory[batch]
        rewards = self.reward_memory[batch]
        terminal = self.terminal_memory[batch]
        
        return states_before, states_after, actions, rewards, terminal

# Create a custom player

In [None]:
GAMMA = 0.9995
EPSILON = 1.0
EPSILON_DEC = 0.9995
EPSILON_END = 0.01
BATCH_SIZE = 32

In [None]:
class ModelPlayer():
    def __init__(self, model, model_copy):
        # Hyperparameters
        self.gamma = GAMMA # How much to discount future rewards
        self.epsilon = EPSILON # Controls the probability of exploit/explore
        self.epsilon_dec = EPSILON_DEC # How fast epsilon decreases
        self.epsilon_end = EPSILON_END # Minimum epsilon value
        self.batch_size = BATCH_SIZE
        
        # Model
        self.model = model
        self.model_copy = model_copy
        
        # Experience
        self.memory = ExperienceReplay(1000000)
        
        # Other
        self.game_final = None # Contains the last state of the game
        self.loss = -1
        self.reset_game_specific_variables()
        self.training = True
    
    def reset_game_specific_variables(self):
        self.last_state = None
        self.last_score = -1
        self.last_action = -1
        self.last_turn = -1
    
    def game_grid_init(self, game_grid):
        self.reset_game_specific_variables()
        self.training = False

    def play(self, game):
        ### Check that we picked a valid move. ###
        if game.move_count > self.last_turn:
            self.last_turn = game.move_count
        else:
            print("ModelPlayer: stuck on turn {}".format(game.move_count))
            breakpoint()
            
        ### Store experience. ###
        if game.move_count > 0:
            state_before = self.last_state
            state_after = game.matrix
            action = self.last_action
            score_diff = game.score_diff
            tile_count_diff = game.tile_count_diff
            done = 0
            self.memory.store(state_before, state_after, action, score_diff, tile_count_diff, done)
        
        ### Pick a move ###
        possible_directions = game.possible_directions()
        possible_directions = np.array(possible_directions)
        
        explore_flag = (np.random.random() < self.epsilon) and self.training
        
        ### Let the model suggest a move ###
        if not explore_flag:
            model_input = matrix_to_model_input(game.matrix)
            model_input = preprocess(model_input)
            
            model_output = self.model.predict(model_input)

            model_mask = np.zeros(4)
            model_mask[possible_directions] = 1
            model_output *= model_mask # Only select from possible moves

            # Pick a move based on how strongly the model suggests it
            response = np.argmax(model_output)
        
        ### Pick a random move in case of explore or calculation failure ###
        if (explore_flag
            or response is None
            or not np.any(possible_directions == response)):
            response = np.random.choice(possible_directions)
        
        ### Short-term state storage to be able to calculate rewards ###
        self.last_state = game.matrix
        self.last_score = game.score
        self.last_action = response
        
        return response
    
    def sleep(self, game, render):
        if not render:
            return 0 # Don't sleep when training

        return np.log2(game.max_tile) / 30 # Go slower when it gets interesting
    
    def lost(self, game):
        self.game_final = game
        
        ### Store experience. ###
        if self.training:
            state_before = self.last_state
            state_after = game.matrix
            action = self.last_action
            score_diff = game.score_diff
            tile_count_diff = game.tile_count_diff
            done = 1
            self.memory.store(state_before, state_after, action, score_diff, tile_count_diff, done)

            self.reset_game_specific_variables() # Clean up to prepare for next game
            self.learn()
        else:
            self.training = True
        
    def learn(self):
        if self.memory.counter < self.batch_size:
            return
        
        states_before, states_after, actions, reward, done = self.memory.sample(self.batch_size)
        
        action_indices = sparse(actions)
        
        states_before = preprocess(states_before)
        states_before = preprocess(states_after)
        
        pred_first = self.model.predict(states_before)
        pred_second = self.model_copy.predict(states_after)
        
        q_target = pred_first.copy()
        q_target = np.squeeze(q_target)
        batch_index = np.arange(self.batch_size, dtype=int)
        q_target[batch_index, action_indices] = reward + self.gamma * np.max(pred_second, axis=1)*done

        history = self.model.fit(states_before, q_target, verbose=0)
        self.loss = history.history['loss'][-1]
        
        if self.epsilon > self.epsilon_end:
            self.epsilon *= self.epsilon_dec
            self.epsilon = np.maximum(self.epsilon, self.epsilon_end)
    
    def update_model_copy(self):
        self.model_copy.set_weights(self.model.get_weights())

# Training

### Constants

In [None]:
NUM_GAMES = 2000
MODEL_COPY_UPDATE_FREQUENCY = 5
MOVING_AVERAGE_N = 10 # Moving average setting
PRINT_INTERVALS = 2 # Number of seconds between progress update (minimum)

### The actual loop

In [None]:
import time
import multiprocessing
from functools import reduce

last_print_time = time.time()
history = []
avg_num_moves = -1
avg_loss = -1
moving_average_factor = 1.0 / MOVING_AVERAGE_N

model_player = ModelPlayer(model, model_copy)

print("Starting training...")
for game_number in range(1, NUM_GAMES + 1):
    ti = TieIn(model_player, render=False, log_history=False)
    ti.start()
    
    if game_number == 1:
        avg_num_moves = model_player.game_final.move_count
        avg_loss = model_player.loss
    else:
        avg_num_moves = avg_num_moves*(1-moving_average_factor) + model_player.game_final.move_count*moving_average_factor
        avg_loss = avg_loss*(1-moving_average_factor) + model_player.loss*moving_average_factor
        
        if (game_number - 1) % MODEL_COPY_UPDATE_FREQUENCY == 0:
            model_player.update_model_copy()
        
    cur_time = time.time()
    if cur_time - last_print_time >= PRINT_INTERVALS:
        print("Status: Games {}/{}: {}-game moving average: {:.1f} steps, {:.5g} loss, {:.5g} epsilon".format(game_number, NUM_GAMES, MOVING_AVERAGE_N, avg_num_moves, avg_loss, model_player.epsilon))
        last_print_time = cur_time



# Save the model and training history

In [None]:
import pickle

model.save('q_learning.h5')

policy_gradient_history = { 'history': history }
file = open('q_learning_history.pickle', 'wb')
pickle.dump(q_learning_history, file)
file.close()

# Load the model

In [None]:
model = tf.keras.models.load_model('q_learning.h5')

# Test the model

In [None]:
ti = TieIn(model_player, render=True, log_history=False)
ti.start()