In [1]:
import gym_2048 
import gym
from gym_2048.envs.game_2048 import Game2048

import numpy as np
from copy import deepcopy
import math

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader


## Intro to Game2048 OpenAI Gym environment

In [43]:
env = Game2048()  # the environment for the 2048 game
env._reset()      # initializes the 2048 game and places two numbers to start with
env._render()     # renders list into a board format using __str__ method in engine_2048.py
state = env._get_state()  # get board state in numpy 4x4 array
legal_moves = np.random.choice([i for indx,i in enumerate([0,1,2,3]) if env.moves_available()[indx]]) # sample a legal move
new_state = np.array([state, np.zeros(state.shape), np.zeros(state.shape), np.zeros(state.shape)])
#print(new_state)
print(new_state[0:3,0:4,0:4])
print(legal_moves)


4 0 0 0
0 0 2 0
0 0 0 0
0 0 0 0
[[[4. 0. 0. 0.]
  [0. 0. 2. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]]

 [[0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]]

 [[0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]]]
1


In [40]:
stacked_state = np.zeros((4,4,4))
current_state = state[None,]
stacked_state = torch.cat([torch.from_numpy(current_state),torch.from_numpy(stacked_state[0:3,:,:])]).numpy()
#np.array([current_state, stacked_state[0:3,:,:]], dtype=np.float32)
stacked_state = torch.cat([torch.from_numpy(current_state),torch.from_numpy(stacked_state[0:3,:,:])]).numpy()
stacked_state

array([[[0., 0., 0., 0.],
        [0., 2., 0., 0.],
        [0., 0., 0., 0.],
        [2., 0., 0., 0.]],

       [[0., 0., 0., 0.],
        [0., 2., 0., 0.],
        [0., 0., 0., 0.],
        [2., 0., 0., 0.]],

       [[0., 0., 0., 0.],
        [0., 0., 0., 0.],
        [0., 0., 0., 0.],
        [0., 0., 0., 0.]],

       [[0., 0., 0., 0.],
        [0., 0., 0., 0.],
        [0., 0., 0., 0.],
        [0., 0., 0., 0.]]])

In [6]:
recorded_env = deepcopy(env)
recorded_env

<gym_2048.envs.game_2048.Game2048 at 0x7fa023575820>

## DQN DNN Model

In [94]:
# input: the current state of the environment (board) -> output: best action to take in probability
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels=4,out_channels=8,kernel_size=2),
            nn.ReLU()
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(in_channels=8,out_channels=16,kernel_size=2),
            nn.ReLU()
        )
        self.conv3 = nn.Sequential(
            nn.Conv2d(in_channels=16,out_channels=32,kernel_size=2),
            nn.ReLU()
        )
        self.dense = nn.Sequential(
            nn.Linear(32,10,bias=True),
            nn.ReLU()
        )
        self.action = nn.Linear(10,4)
    def forward(self,x):
        output_conv = self.conv3(self.conv2(self.conv1(x)))
        output_flat = output_conv.view(output_conv.shape[0],-1)
        output_dense = self.dense(output_flat)
        output_action = self.action(output_dense)
        return output_action

## Test a single game

In [None]:
best = 0
best_env = 0
for i in range(100):
    env = Game2048()
    env._reset()
    env._render()
    print("\n")

    done = False
    moves = 0
    total = 0
    merge_count = 0
    while not done:
        action = np.random.choice([i for idx,i in enumerate([0,1,2,3]) if env.moves_available()[idx]])
        merge_count = env.merge_count()
        next_state, reward, done, info = env._step(action)
        moves += 1
        total += reward
        #print(moves,info,np.sum(merge_count))
        env._render()
        #print("\n")
        if total > best:
            best = total
            best_env = deepcopy(env)
    #print('Total Moves: {}'.format(moves))
    
print(best)

## Train 2048 agent

In [95]:
total_episodes = 20000
J = []
score = []
gamma = 0.7
memory_capacity = 5000
global epsilon
global replay_labels
global replay_memory
epsilon = 0.90
replay_memory = list()
replay_labels = list()
model = CNN()

# for episode with max score
maximum = -1
episode_num = -1
total_iters = 1
back = 0
model = CNN()


for episode in range(total_episodes):
    global env
    env = Game2048()
    env._reset()
    
    done = False        # whether episode finished or not
    total_reward = 0     # total_score of this episode
    local_iters = 1     # iters per episode
    stacked_state = np.zeros((4,4,4))
    while not done:
        model.eval()
        prev_env = deepcopy(env)
        prev_state = prev_env._get_state()
        current_state = env._get_state()[None,]
        #print(env._get_state())
        stacked_state = torch.cat([torch.from_numpy(current_state).float(), torch.from_numpy(stacked_state[0:3,:,:]).float()]).numpy()
        control_scores = model(torch.from_numpy(stacked_state)[None,])
        control_buttons = torch.argmax(control_scores)  # find the move with max Q value
        labels = control_scores.detach().clone()[0]              # copy the Q-values as labels
        num = np.random.uniform(0,1)                    # generate random number for epsilon greedy approach
        prev_max = np.max(prev_state)                   # store prev max
        
        # num is less epsilon generate random move
        if (num < epsilon):
            temp_env = deepcopy(prev_env)               # store a deep copy for a move
            temp_stacked_state = deepcopy(stacked_state)
            random_action = np.random.choice([i for indx,i in enumerate([0,1,2,3]) if env.moves_available()[indx]])        # generate a random move
            temp_state, reward, done, info = temp_env._step(random_action) # apply the move
            temp_stacked_state = torch.cat([torch.from_numpy(temp_state[None,]).float(), torch.from_numpy(temp_stacked_state[0:3,:,:]).float()]).numpy()
            total_reward += reward
            finish = temp_state
            temp_merge_count = temp_env.merge_count() # get number of merges
            env = deepcopy(temp_env)
            next_max = np.max(temp_state)               # get next max after applying the move
            
            # reward math.log(next_max,2)*0.1 if next_max is higher than prev max
            if (next_max==prev_max):
                labels[random_action] = 0
            else:
                labels[random_action] = reward # (math.log(next_max,2))*0.1 
            #labels[random_action] += prev_env.empty_cells() - temp_env.empty_cells()  # np.sum(temp_merge_count) # reward is also the number of merges
            
            # get the next state max Q-value
            temp_scores = model(torch.from_numpy(temp_stacked_state)[None,])
            max_qvalue = torch.max(temp_scores)
            
            # final labels add gamma*max_qvalue
            #labels[random_action] += gamma * max_qvalue

            if done:
                labels[random_action] = 0
        
        # generate the the max predicted move
        else:
            greedy_action = control_buttons.item()
            temp_env = deepcopy(prev_env)
            if (temp_env.moves_available()[greedy_action] == False):
                greedy_action = np.random.choice([i for indx,i in enumerate([0,1,2,3]) if temp_env.moves_available()[indx]])
                labels[greedy_action] = 0
            else:
                temp_state, reward, done, info = temp_env._step(greedy_action)
            temp_stacked_state = deepcopy(stacked_state)
            temp_stacked_state = torch.cat([torch.from_numpy(temp_state[None,]).float(), torch.from_numpy(temp_stacked_state[0:3,:,:]).float()]).numpy()

                
            temp_merge_count = temp_env.merge_count()
            env = deepcopy(temp_env)
            next_max = np.max(temp_state)               # get next max after applying the move
            
            # reward math.log(next_max,2)*0.1 if next_max is higher than prev max
            if (next_max==prev_max):
                labels[greedy_action] = 0
            else:
                labels[greedy_action] = reward #(math.log(next_max,2))*0.1 
            #labels[greedy_action] += prev_env.empty_cells() - temp_env.empty_cells() # np.sum(temp_merge_count) # reward is also the number of merges
            
            # get the next state max Q-value
            temp_scores = model(torch.from_numpy(temp_stacked_state)[None,])
            max_qvalue = torch.max(temp_scores)
            
            # final labels add gamma*max_qvalue
            #labels[greedy_action] += gamma * max_qvalue

            if done:
                labels[random_action] = 0

        # decrease the epsilon value
        if (epsilon > 0.1 and total_iters % 30000 == 0):
            epsilon /= 1.005
            
        
        # store them in memory
        prev_state = prev_env._get_state()
        replay_labels.append(labels.detach().numpy())
        replay_memory.append(prev_state)
        
        batch_size = 4

        # backpropagation
        if (len(replay_memory) >= memory_capacity):
            back_loss = 0
            batch_num = 0
            z = list(zip(replay_memory,replay_labels))
            #np.random.shuffle(z)
            #np.random.shuffle(z)
            replay_memory,replay_labels = zip(*z)
            
            for i in range(0,len(replay_memory),batch_size):
                if (i + batch_size > len(replay_memory)):
                    break
                    
                batch_data = deepcopy(replay_memory[i:i+batch_size])
                batch_labels = deepcopy(replay_labels[i:i+batch_size])
                batch_data = torch.from_numpy(np.array(batch_data,dtype=np.float32).reshape(batch_size,4,4))
                batch_labels = torch.from_numpy(np.array(batch_labels,dtype=np.float32).reshape(batch_size,4))
                dataset = TensorDataset(batch_data, batch_labels)
                dataloader = DataLoader(dataset, batch_size, shuffle=True)
                model.train()

                prediction = model(batch_data[None,])
                loss = torch.square(batch_labels - prediction)
                loss = torch.sum(loss,dim=1)
                loss = torch.mean(loss) / 2
                optimizer = optim.Adam(model.parameters(),lr=0.0003)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                back_loss += loss

                # for batch_idx, samples in enumerate(dataloader):
                #     x_train, y_train = samples
                #     prediction = model(x_train[None,])
                #     loss = torch.square(y_train - prediction)
                #     loss = torch.sum(loss,dim=1)
                #     loss = torch.mean(loss) / 2
                #     optimizer = optim.RMSprop(model.parameters())
                #     optimizer.zero_grad()
                #     loss.backward()
                #     optimizer.step()
                #     back_loss += loss
                
                batch_num += 1
            #print("Mini-Batch - {} Back-Prop : {}, Loss : {}".format(batch_num,back,loss))
            back_loss /= batch_num
            J.append(back_loss)

            #number of back-props
            back += 1
            
            #make new memory 
            replay_memory = list()
            replay_labels = list()
            
        
        if (local_iters % 1000 == 0):
            print("Episode : {}, Score : {}, Iters : {}, Finish : {}".format(episode,total_reward,local_iters,finish))
        
        local_iters += 1
        total_iters += 1
        
    score.append(total_reward)
    #print("Episode {} finished with score {} \n board : \n {} \n epsilon : {}".format(episode,total_reward,env._get_state(),epsilon))

    
    if ((episode+1) % 1000 == 0):
        print("Maximum Score : {} ,Episode : {}".format(maximum,episode))    
        print("Loss : {}".format(J[len(J)-1]))
        print()
        
    if (maximum < total_reward):
        maximum = total_reward
        episode_num = episode
        print("Episode {} finished with score {} and epsilon {} \n {}".format(episode,total_reward,epsilon, env._get_state()))
print("Maximum Score : {} ,Episode : {}".format(maximum,episode))    

Episode 0 finished with score 763 and epsilon 0.9 
 [[ 4.  8. 16.  2.]
 [64.  4.  2.  4.]
 [16. 64. 16.  8.]
 [ 2.  4.  8.  4.]]
Episode 1 finished with score 2031 and epsilon 0.9 
 [[  4.   8.   4.   2.]
 [  8.  32.   2.   8.]
 [  4.  16. 256.  32.]
 [  2.   4.   8.   2.]]
Episode 47 finished with score 2139 and epsilon 0.9 
 [[  2.  64.   4.   8.]
 [ 32.   2.  16.   2.]
 [  4.   8. 256.   8.]
 [  2.  16.   4.   2.]]
Episode 55 finished with score 2935 and epsilon 0.9 
 [[  4.   8.   2.  64.]
 [ 16. 256.  32.  16.]
 [  2.  16. 128.   4.]
 [  4.   2.  16.   2.]]
Maximum Score : 2935 ,Episode : 999
Loss : 101.21589660644531

Episode 1242 finished with score 3567 and epsilon 0.8822227695311732 
 [[  2.  16.   4.   2.]
 [  4.  32. 512.   8.]
 [ 16.  64.   8.   4.]
 [  4.   8.   4.   2.]]
Maximum Score : 3567 ,Episode : 1999
Loss : 110.9919662475586

Maximum Score : 3567 ,Episode : 2999
Loss : 99.55525970458984

Maximum Score : 3567 ,Episode : 3999
Loss : 90.50254821777344

Maximum Score :

KeyboardInterrupt: 

In [84]:
batch_data

()

## Let agent play 2048

In [96]:
model.eval()
best = 0
best_env = 0
sum = 0
for i in range(100):
    env = Game2048()
    env._reset()
    env._render()
    print("\n")

    done = False
    moves = 0
    total = 0
    merge_count = 0
    stacked_state = np.zeros((4,4,4))
    while not done:
        state = env._get_state()[None,]
        stacked_state = torch.cat([torch.from_numpy(state).float(),torch.from_numpy(stacked_state[0:3,:,:]).float()]).numpy()
        action = [0,1,2,3][torch.argmax(model(torch.from_numpy(stacked_state[None,])))]
        if (env.moves_available()[action] == False):
            action = np.random.choice([i for indx,i in enumerate([0,1,2,3]) if env.moves_available()[indx]])
        merge_count = env.merge_count()
        next_state, reward, done, info = env._step(action)
        #print(reward)
        moves += 1
        total += reward
        #print(moves,info,np.sum(merge_count))
        #env._render()
        #print("\n")
        if total > best:
            best = total
            best_env = deepcopy(env)
    #print('Total Moves: {}'.format(moves))
    sum += total
print(best_env._get_state())
print(best)
print(sum/100)









































































































































































































[[  2. 256.   4.   2.]
 [ 32. 128.  64.  32.]
 [  8.  16.  32.   4.]
 [  2.   2.   8.   2.]]
3240
1241.24


In [54]:
model.eval()
best = 0
best_env = 0
sum = 0
for i in range(100):
    env = Game2048()
    env._reset()
    env._render()
    print("\n")

    done = False
    moves = 0
    total = 0
    merge_count = 0
    stacked_state = np.zeros((4,4,4))
    while not done:
        random_action = np.random.choice([i for indx,i in enumerate([0,1,2,3]) if env.moves_available()[indx]]) 
        next_state, reward, done, info = env._step(random_action)
        moves += 1
        total += reward
        print(moves,info,np.sum(merge_count))
        #env._render()
        #print("\n")
        if total > best:
            best = total
            best_env = deepcopy(env)
    print('Total Moves: {}'.format(moves))
    sum += total
print(env._get_state())
print(best)
print(sum/100)




1 {'score': 0, 'action': 'up', 'won': False} 0
2 {'score': 4, 'action': 'down', 'won': False} 0
3 {'score': 4, 'action': 'left', 'won': False} 0
4 {'score': 8, 'action': 'left', 'won': False} 0
5 {'score': 8, 'action': 'down', 'won': False} 0
6 {'score': 28, 'action': 'left', 'won': False} 0
7 {'score': 28, 'action': 'up', 'won': False} 0
8 {'score': 28, 'action': 'left', 'won': False} 0
9 {'score': 28, 'action': 'down', 'won': False} 0
10 {'score': 32, 'action': 'up', 'won': False} 0
11 {'score': 32, 'action': 'right', 'won': False} 0
12 {'score': 36, 'action': 'right', 'won': False} 0
13 {'score': 36, 'action': 'left', 'won': False} 0
14 {'score': 48, 'action': 'down', 'won': False} 0
15 {'score': 56, 'action': 'down', 'won': False} 0
16 {'score': 72, 'action': 'right', 'won': False} 0
17 {'score': 76, 'action': 'up', 'won': False} 0
18 {'score': 84, 'action': 'down', 'won': False} 0
19 {'score': 84, 'action': 'down', 'won': False} 0
20 {'score': 84, 'action': 'up', 'won': False} 0