# Bot-player for the Game 2048

Key parts of the implementation:
- [x] Logic of the Game 2028
- [x] Interface for player
- [x] Neural Network Design
- [x] Genetic learning using stochastic sampling

## Logic of the Game

Gaming process is being held on the *Board* with size 4x4. On each step a new *Tile* is being generated. If a person plays the game, then a child class *HumanBoard* that inherits all aspects of the *Board* is used.

### Tile class

In [1]:
import tkinter as tk
from collections import defaultdict

class Tile(tk.Canvas):
    colors = {
        0: '#dddddd',
        2: '#f4f0ed',
        4: '#f7e2d2',
        8: '#f9c59d',
        16: '#efa56b',
        32: '#e86f47',
        64: '#d14314',
        128: '#f7f091',
        256: '#f9f06d',
        512: '#f9ee4a',
        1024: '#ede136',
        2048: '#e5d714'
    }
    color_map = defaultdict(lambda: "#23231f",colors)


    def __init__(self, master, number, size=50):
        tk.Canvas.__init__(self, master, height=size, width=size, background=Tile.color_map[number])
        t = str(int(number)) if not number == 0 else ""
        self.text = tk.Canvas.create_text(self, 25, 25, anchor = tk.CENTER, text = t)

        
    def set_state(self, number):
        self.configure(background = Tile.color_map[number])
        t = str(int(number)) if not number == 0 else ""
        self.itemconfigure(self.text, text = t) 

### Board class

In [2]:
import tkinter as tk

class Board(tk.Frame):
    def __init__(self, master, game):
        tk.Frame.__init__(self, master)
        self.game = game
        self.n = game.n
        self.tiles = []
        for row in range(self.n):
            row_tiles = []
            for col in range(self.n):
                tile = Tile(self, game.get_tile(row, col))
                tile.grid(row = row, column = col, padx = 1, pady = 1)
                row_tiles.append(tile)
            self.tiles.append(row_tiles)


    def update_tiles(self):
        for row in range(self.n):
            for col in range(self.n):
                self.tiles[row][col].set_state(self.game.get_tile(row, col))

### HumanBoard class

In [3]:
import tkinter as tk

class HumanBoard(Board):
    def __init__(self,master,game):
        Board.__init__(self, master, game)


    def perform_move(self, move_dir):
        self.game.swipe(move_dir)
        self.update_tiles()
        game_over_state = self.game.check_for_game_over()
        if game_over_state: print(game_over_state)

### Game2048 class (includes implementation of game functionality)

In [5]:
import numpy as np
import math


MAX_SCORE_SCALE = 10
SUM_SCORE_SCALE = 1
RANDOMNESS = 0.75

class Game2048:
    def __init__(self, n, random_stream = np.random.RandomState()):
        self.n = n
        self.game_state = np.zeros([n, n])
        self.random_stream = random_stream
        self.generate_tile()
        self.generate_tile()


    def get_tile(self, row, col):
        return self.game_state[row][col]


    def get_empty_idx(self):
        idx = np.arange(self.n**2)
        return idx[np.reshape(self.game_state, -1) == 0]


    def generate_tile(self):
        tile_idx = self.random_stream.choice(self.get_empty_idx(), 1)
        tile_val = 2 if self.random_stream.random_sample() < RANDOMNESS else 4
        self.place_tile(tile_idx, tile_val)


    def place_tile(self, idx, val):
        if idx.shape[0] == 1: self.game_state[tuple(self.idx_lin2vec(idx))] = val
        elif idx.shape[0] == 2: self.game_state[idx] = val
        else: raise ValueError('Invalid array size')


    def idx_lin2vec(self, idx_lin):
        x = math.floor(idx_lin / self.n)
        y = idx_lin - x * self.n
        return [x, y]


    def idx_vec2lin(self, idx_vec):
        return self.n * idx_vec[0] + idx_vec[1]


    def show_state(self):
        print(self.game_state)



    def swipe(self, swipe_direction):
        old_state = np.copy(self.game_state)
        if swipe_direction == 'left':
            for i in range(self.n):
                self.move_col(i, -1)
        elif swipe_direction == 'right':
            for i in range(self.n - 1, -1, -1):
                self.move_col(i, 1)
        elif swipe_direction == 'up':
            for i in range(self.n):
                self.move_row(i, -1)
        elif swipe_direction == 'down':
            for i in range(self.n-1, -1, -1):
                self.move_row(i, 1)
        else: raise ValueError('invalid direction')
        if not np.array_equal(old_state,self.game_state):
            self.generate_tile()


    def check_for_game_over(self):
        if np.any(self.game_state == 2048):
            return "WIN"
        if self.is_board_full() and self.check_for_valid_moves() == False:
            return "LOSE"
        else:
            return None


    def check_for_valid_moves(self):
        valid_moves = False
        old_state = np.copy(self.game_state)
        for move in ['up','down','left','right']:
            self.swipe(move)
            if not np.array_equal(old_state, self.game_state):
                valid_moves = True
                self.game_state = np.copy(old_state)
                break
            else:
                self.game_state = np.copy(old_state)
        return valid_moves



    def is_board_full(self):
        return np.all(self.game_state != 0)


    def move_row(self, row, direction):
        for i in range(self.n):
            self.move_val(np.array([row, i]), np.array([direction, 0]))


    def move_col(self, col, direction):
        for i in range(self.n):
            self.move_val(np.array([i, col]), np.array([0, direction]))


    def move_val(self, idx, vec):
        val = self.game_state[idx[0], idx[1]]
        blocked = False
        while not blocked:
            move_type = self.check_move_type(idx, vec)
            if move_type == 0:
                blocked = True
            elif move_type == 1:
                idx_new = idx + vec
                self.game_state[idx[0], idx[1]] = 0
                self.game_state[idx_new[0], idx_new[1]] = val
                idx = idx_new
            else:
                idx_new = idx + vec
                self.game_state[idx[0], idx[1]] = 0
                self.game_state[idx_new[0], idx_new[1]] += val
                blocked = True


    def check_move_type(self,idx,vec):
        moved_idx = idx + vec
        if np.any(np.logical_or(moved_idx < 0, moved_idx >= self.n)):
            return 0
        adjacent_val = self.game_state[moved_idx[0], moved_idx[1]]
        current_val = self.game_state[idx[0], idx[1]]
        if adjacent_val == 0:
            return 1
        elif adjacent_val == current_val:
            return 2
        else:
            return 0


    def get_state(self, flat=False):
        if flat:
            return self.game_state.reshape(-1)
        else:
            return self.game_state

        
    def get_score(self):
        max_val = np.max(np.max(self.game_state))
        board_sum = np.sum(np.sum(self.game_state))
        score = MAX_SCORE_SCALE * max_val + SUM_SCORE_SCALE * board_sum
        return score



## Eye-candy Graphical User Interface

It is necessary to use GUI to have a better understanding of which decisions are taken by an agent of the particular generation of the population. Besides, it's very interesting to watch agents playing.
<br />
For this purpose a *tkinter* python library was used.

## Neural Network Design

The network has ***three layers*** with **the input layer** having 16 neurons, each of which corresponds to a single input of the value from a single tile. The second layer, **the hidden layer** , is fully connected to the first one with 8 neurons. Finally, the third layer, **the output layer**, is fully connected to the second with 4 neurons, one for each possible move: up, down, left, right - the neuron with the largest output value is used as the selected action.

<br />

The activation function is represented by the hyperbolic tangent function for each neuron.

### Neuron class

In [9]:
def sigmoid_fn(x):
    return 1.0 / (1.0 + math.exp(-x))


def tanh_fn(x):
    return 2 * sigmoid_fn(2 * x) - 1

CLIP_LIMIT = 300

class Neuron:
    def __init__(self, weights, bias):
        self.weights = weights
        self.bias = bias
        self.fwd_sum = 0


    def forward(self, inputs):
        self.fwd_sum = np.clip(np.sum(self.weights * inputs) + self.bias, -CLIP_LIMIT, CLIP_LIMIT)
        return self.activation_fn()


    def activation_fn(self):
        try:
            return tanh_fn(self.fwd_sum)
        except:
            print(self.fwd_sum)
            raise ValueError("Math Error")

### Layer class

In [11]:
class Layer:
    def __init__(self, weights, bias, type_input = False):
        if weights.shape[0] != bias.shape[0]:
            raise ValueError('Number of nodes must match length of bias vector')
        self.n_neurons = weights.shape[0]
        self.type_input = type_input
        if type_input:
            self.n_inputs = self.n_neurons
        else:
            self.n_inputs = weights.shape[1]
        weights = np.clip(weights,-10, 10)
        bias = np.clip(bias, -10, 10)
        self.neurons = [Neuron(weights[i,:],bias[i]) for i in range(self.n_neurons)]


    def evaluate(self, inputs):
        if inputs.shape[0] != self.n_inputs:
            raise ValueError('Input size must match layer size')
        if self.type_input:
            return np.array([self.neurons[i].forward(inputs[i]) for i in range(self.n_neurons)])
        else:
            return np.array([self.neurons[i].forward(inputs) for i in range(self.n_neurons)])


    def get_layer_size(self):
        return self.n_neurons


    def get_input_size(self):
        return self.n_inputs

### Network class

In [13]:
class Network:
    def __init__(self, weights, bias):
        self.n_layers = len(weights)
        L0 = [Layer(weights[0],bias[0],True)]
        L1 = [Layer(weights[i],bias[i]) for i in range(1, self.n_layers)]
        self.layers = L0 + L1


    def evaluate(self, inputs):
        if inputs.shape[0] != self.layers[0].get_input_size():
            raise ValueError('Incorrectly sized input')
        intermediate_values = [inputs]
        for i in range(1, self.n_layers + 1):
            intermediate_values.append(self.layers[i - 1].evaluate(intermediate_values[i - 1]))
        return intermediate_values[-1]

### Class that describes Structure of the Neural Network

In [15]:
import copy

class Structure:
    def __init__(self, neurons_per_layer):
        self.neurons_per_layer = neurons_per_layer
        self.n_layers = len(self.neurons_per_layer)
        self.n_outputs = self.neurons_per_layer[-1]
        self.n_inputs = self.neurons_per_layer[0]


    def copy(self):
        neurons_per_layer_copy = copy.deepcopy(self.neurons_per_layer)
        return Structure(neurons_per_layer_copy)


    def get_number_of_weights_per_layer(self, n):
        if n == 0:return 2 * self.n_inputs
        else:return self.neurons_per_layer[n] * (self.neurons_per_layer[n - 1] + 1)


    def get_number_of_inputs_per_layer(self, n):
        if n == 0:return 1
        else:return self.neurons_per_layer[n - 1]


    def get_number_of_neurons_per_layer(self, n):
        return self.neurons_per_layer[n]


    def get_number_of_outputs(self):
        return self.n_outputs


    def get_number_of_layers(self):
        return self.n_layers


    def get_number_of_inputs(self):
        return self.n_inputs

## Genetic Learning using stochastic sampling

This algorithm reflects the process of natural selection where the fittest individuals are selected for reproduction in order to produce offspring of the next generation.
<br />
The process of natural selection starts with the selection of fittest individuals from a population. They produce offspring which inherit the characteristics of the parents and will be added to the next generation.
<br />
If parents have better fitness, their offspring will be better than parents and have a better chance at surviving. This process keeps on iterating and at the end, a generation with the fittest individuals will be found. Five phases are considered in a genetic algorithm:
- Initial population.
- Selection.
- Crossover.
- Mutation.
- Fitness function.


### Genome class

In [19]:
class Genome:
    def __init__(self, structure):
        self.structure = structure
        self.gene_sizes = [self.structure.get_number_of_weights_per_layer(i) for i in range(self.structure.get_number_of_layers())]
        self.gene_start_index = np.cumsum(self.gene_sizes) - self.gene_sizes
        self.genome_length = sum(self.gene_sizes)
        self.dna = SCALE * (np.random.uniform(-SCALE, SCALE, self.genome_length ))


    def copy(self):
        new_structure = self.structure.copy()
        new_dna = self.get_dna()
        new_genome = Genome(new_structure)
        new_genome.set_dna(new_dna)
        return new_genome


    def set_dna(self,dna):
        if len(dna) != self.genome_length:
            raise ValueError('DNA sequence is of incorrect length!')
        self.dna = dna


    def get_structured_genome(self):
        weights = []
        bias = []
        for i in range(self.structure.get_number_of_layers()):
            n_rows = self.structure.get_number_of_neurons_per_layer(i)
            n_cols = self.structure.get_number_of_inputs_per_layer(i)
            gene_end_index = self.gene_start_index[i] + self.gene_sizes[i]
            weight_end_index = gene_end_index - n_rows
            weights_flat = self.dna[self.gene_start_index[i]:weight_end_index]
            bias_flat = self.dna[weight_end_index:gene_end_index]
            weights.append(weights_flat.reshape(n_rows,n_cols))
            bias.append(bias_flat)
        return weights, bias


    def mutate(self,rate):
        mutation_dna = np.random.uniform(-SCALE, SCALE, self.genome_length )
        self.dna = np.array([self.dna[i] if np.random.uniform() > rate else mutation_dna[i] for i in range(self.genome_length)])


    def generate_network(self):
        (weights, bias) = self.get_structured_genome()
        return Network(weights, bias)


    def get_dna(self):
        return np.copy(self.dna)

    def get_length(self):
        return self.genome_length

    def get_structure(self):
        return self.structure

### Agent class

In [20]:
import math
import time
import tkinter as tk

class Agent:
    def __init__(self, nn_structure, genome = None):
        if not type(nn_structure) is Structure:
            raise ValueError('Input structure must be of Structure type')
        self.nn_structure = nn_structure
        if genome and type(genome) is Genome:
            self.genome = genome
        else:
            self.genome = Genome(self.nn_structure)
        self.network = self.genome.generate_network()
        self.game_size = math.sqrt(self.nn_structure.get_number_of_inputs())
        self.game_random_state = np.random.RandomState(np.random.randint(10000))
        self.previous_game_random_seed = -1
        if self.game_size % 1 != 0:
            raise ValueError('Input layer must be a square number!')
        else:
            self.game_size = int(self.game_size)


    def get_genome(self):
        return self.genome


    def mutate_genome(self,rate):
        self.genome.mutate(rate)


    def copy(self):
        new_genome = self.genome.copy()
        new_struct = new_genome.get_structure()
        return Agent(new_struct,new_genome)


    def evaluate_network(self, input_state):
        if type(input_state) is list:
            input_state = np.array(input_state)
        if input_state.shape[0] != self.nn_structure.get_number_of_inputs():
            raise ValueError('Input is incorrect size for this Agent')
        return self.network.evaluate(input_state)


    def choose_action(self, input_state):
        weighted_actions = self.evaluate_network(input_state)
        max_idx = np.argmax(weighted_actions)
        if max_idx == 0:
            action = 'left'
        elif max_idx == 1:
            action = 'right'
        elif max_idx == 2:
            action = 'up'
        elif max_idx == 3:
            action = 'down'
        else:
            raise ValueError('Invalid action')
        return action


    def check_for_agent_stuck(self, state):
        if np.all(state == self.old_state):
            self.stuck_counter = self.stuck_counter + 1
        else:
            self.stuck_counter = 0
            self.old_state = state
        if self.stuck_counter > 2: return True
        else: return False
        


    def check_for_game_over(self, game, flat_state):
        end_state = game.check_for_game_over()
        if end_state == 'WIN':
            return True, True
        elif end_state == 'LOSE':
            return True, False
        elif self.check_for_agent_stuck(flat_state):
            return True, False
        else:
            return False, False



    def play_game(self, gui_root = None):
        self.previous_game_random_seed = self.game_random_state.get_state()
        game = Game2048(self.game_size, random_stream = self.game_random_state)
        game_over = False
        win = False
        self.stuck_counter = 0
        self.old_state = game.get_state(flat = True)

        if gui_root:
            board = Board(gui_root, game)
            board.pack(side = tk.LEFT, padx = 1, pady = 1)
        while not game_over:
            flat_state = np.copy(game.get_state(flat = True))
            action = self.choose_action(flat_state)
            game.swipe(action)
            (game_over,win) = self.check_for_game_over(game, flat_state)
            if gui_root:
                board.update_tiles()
                gui_root.update()
                time.sleep(MOVE_PAUSE_TIME)
        return game.get_score(), win


    def replay_previous_game(self, gui_root):
        if self.previous_game_random_seed != -1:
            old_seed = self.game_random_state.get_state()
            self.game_random_state.set_state(self.previous_game_random_seed)
            score,win = self.play_game(gui_root)
            tmp = self.game_random_state.get_state()
            self.game_random_state.set_state(old_seed)
            return score, win
        else:
            raise ValueError("A game must be played before attempting to replay it!")




### Stochastic sampling: roulette wheel selection, stochastic sampling without replacement

In [21]:
def roulette_wheel_selection(scores,selection_points):
    idx_to_keep = []
    cummulateive_scores = np.cumsum(scores)
    count = 0
    for P in selection_points:
        while cummulateive_scores[count] < P:
            count += 1
        idx_to_keep.append(count)
    return np.array(idx_to_keep)




def sampling_without_replacement(population_scores_in,number_to_keep):
    idx_to_keep = []
    population_scores = np.copy(population_scores_in)
    for i in range(number_to_keep):
        selection_point = np.random.uniform(0, sum(population_scores), 1)
        idx = roulette_wheel_selection(population_scores, selection_point)
        idx_to_keep = np.append(idx_to_keep, idx)
        population_scores[idx] = 0
    return np.array(idx_to_keep)

###  Crossover function to imitate chromosomal crossing over

In [22]:
def crossover(genome1, genome2):
    if genome1.get_length() != genome2.get_length():
        raise ValueError('Genomes are not the same length!')
    n = int(np.random.randint(1, genome1.get_length(), 1))
    dna1 = genome1.get_dna()
    dna2 = genome2.get_dna()
    tmp = np.copy(dna1[n:])
    dna1[n:] = dna2[n:]
    dna2[n:] = tmp
    structure = genome1.get_structure()
    genome3 = Genome(structure)
    genome4 = Genome(structure)
    genome3.set_dna(dna1)
    genome4.set_dna(dna2)
    return genome3, genome4

### Implementation of Evolution :) 

In [23]:
class GeneticLearner:
    def __init__(self, n_agents, layer_sizes, seed=0):
        np.random.seed(seed)
        self.nn_struct = Structure(layer_sizes)
        self.n_agents = n_agents
        self.agents = [Agent(self.nn_struct) for i in range(self.n_agents)]
        self.agent_scores = []
        self.win = []
        self.survival_rate = 0.5
        self.mutation_rate = 0.001
        self.crossover_rate = 0.7
        self.best_always_survives = True
        self.n_to_keep = int(self.n_agents * self.survival_rate)
        self.n_to_create = self.n_agents - self.n_to_keep
        if self.n_to_create % 2 != 0:
            self.n_to_keep = self.n_to_keep + 1
            self.n_to_create = self.n_to_create - 1
        self.population_fitness_test()


    def run_one_generation(self):
        self.select_survivors()
        self.generate_offspring()
        self.mutate()
        self.population_fitness_test()



    def population_fitness_test(self):
        self.agent_scores = [self.agents[i].play_game() for i in range(self.n_agents)]


    def population_fitness_test_multithread(self):
        with concurrent.futures.ProcessPoolExecutor() as executor:
            futures = [ executor.submit(self.agents[i].play_game) for i in range(self.n_agents)]
            concurrent.futures.wait(futures)
            self.agent_scores = [f.result() for f in futures]


    def select_survivors(self):
        scores = np.array([self.agent_scores[i][0] for i in range(self.n_agents)])
        if self.best_always_survives:
            idx_best = np.argmax(scores)
            scores[idx_best] = 0
            idx_survived = sampling_without_replacement(scores, self.n_to_keep - 1)
            idx_survived = np.append(idx_survived, idx_best )
        else:
            idx_survived = sampling_without_replacement(scores, self.n_to_keep)
        idx_survived = tuple(idx_survived)
        self.agents = [ self.agents[i] for i in range(self.n_agents) if i in idx_survived]
        self.agent_scores = [ self.agent_scores[i] for i in range(self.n_agents) if i in idx_survived]



    def select_parents(self):
        cur_num_agents = len(self.agent_scores)
        pairs_of_parents_needed = int(self.n_to_create / 2)
        scores = np.array([self.agent_scores[i][0] for i in range(cur_num_agents)])
        return [tuple(sampling_without_replacement(scores, 2)) for i in range(pairs_of_parents_needed)]



    def generate_offspring(self):
        parent_idx = self.select_parents()
        new_generation = []
        for p1idx, p2idx in parent_idx:
            if np.random.uniform() < self.crossover_rate:
                genome1,genome2 = crossover(self.agents[int(p1idx)].get_genome(), self.agents[int(p2idx)].get_genome())
                new_generation = new_generation + [Agent(self.nn_struct,genome1), Agent(self.nn_struct,genome2)]
            else:
                new_generation = new_generation + [self.agents[int(p1idx)].copy(), self.agents[int(p1idx)].copy()]
        self.agents = self.agents + new_generation



    def mutate(self):
        for i in range(self.n_agents):
            self.agents[i].mutate_genome(self.mutation_rate)


    def get_current_generation_statistics(self):
        scores = self.get_score_array()
        return np.max(scores), np.median(scores), np.min(scores)


    def run_n_generations(self, n):
        for i in range(n):
            self.run_one_generation()
            maximum, med, minimum = self.get_current_generation_statistics()
            print("Generation " + str(i) + " - maximum: " + str(maximum) + "  median: " + str(med) + " minimum: " + str(minimum))


    def get_score_array(self):
        return np.array([self.agent_scores[i][0] for i in range(self.n_agents)])


    def get_best_agent(self):
        scores = self.get_score_array()
        idx = np.argmax(scores)
        return self.agents[idx]

    def get_worst_agent(self):
        scores = self.get_score_array()
        idx = np.argmin(scores)
        return self.agents[idx]

    def get_median_agent(self):
        scores = self.get_score_array()
        median = np.median(scores)
        median_idx = np.argmax(scores == median) 
        return self.agents[median_idx]

### INITIATE TRAINING FROM THE VERY BEGINNING

In [None]:
fname = './models/model_'
n_agents = 1000
generations_per_batch = 1
n_batches = 10
total_generations = generations_per_batch * n_batches
print('Preparing to run ' + str(total_generations) +' generations...')

G = GeneticLearner(n_agents,[16,8,4],seed = 4)
for i in range(n_batches):
    print("Starting batch " + str(i))
    G.run_n_generations(generations_per_batch)
    gen_num = i * generations_per_batch
    save_model_state(G, fname + str(gen_num) + '.p')
    print("Model " + str(gen_num) + " Saved")


### CONTINUE TRAINING

In [None]:
fname = 'models/model_'
gen_to_load = 193
generations_per_batch = 1
n_batches = 1000
total_generations = generations_per_batch * n_batches
full_filename = fname+str(gen_to_load)+'.p'
print('Loading ' + full_filename + ' and running ' + str(total_generations) + ' more generations')
try:
    G = load_model_state(full_filename)
except:
    raise ValueError('File does not exist: ' + full_filename)
for i in range(n_batches):
    print("Starting batch "+str(i))
    G.run_n_generations(generations_per_batch)
    gen_num = (1+i)*generations_per_batch + gen_to_load
    save_model_state(G,fname + str(gen_num) + '.p')
    print("Model " + str(gen_num) + " Saved")

### TEST PRETRAINED POPULATION WITH GUI

In [None]:
fname = 'models/model_488'
n_new_games = 2
G = load_model_state(fname + '.p')
A = G.get_best_agent()
root = tk.Tk()
root.title("2048 Game")
score,win = A.replay_previous_game(root)
print('Agent score: ' + str(score) + ' Win status: ' + str(win))
time.sleep(0.5)
for i in range(n_new_games):
    score, win = A.play_game(root)
    print('Agent score: ' + str(score) + ' Win status: ' + str(win))
    time.sleep(0.5)

input("Press Enter to end...")