<a href="https://colab.research.google.com/github/mtscott321/Reinforcement2048/blob/main/reinforcement_2048.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [7]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Categorical
from torch.autograd import Variable
import matplotlib.pyplot as plt
import numpy as np
import sys
import random
from google.colab import files
import pandas as pd

In [2]:
LAYER_1 = 12
LAYER_2 = 12
LAYER_3 = 12
LAYER_4 = 12

In [3]:
class PolicyNet(nn.Module):
    def __init__(self):
        super(PolicyNet, self).__init__()

        self.fc1 = nn.Linear(16, LAYER_1)  # all the values at each of the 16 locations
        self.fc2 = nn.Linear(LAYER_1, LAYER_2)
        self.fc3 = nn.Linear(LAYER_2, LAYER_3)
        self.fc4 = nn.Linear(LAYER_3, LAYER_4)
        self.fc5 = nn.Linear(LAYER_4, 4)  # Pre-probability of N, S, E, W (not in that order)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = F.relu(self.fc4(x))
        x = torch.softmax(self.fc5(x),dim=-1) 
        # x = torch.sigmoid(self.fc3(x)) # Alternative: Categorical just needs positive inputs
        return x

In [4]:
#evaluate the vector like a left swipe (<--)
def init():
    #making grid
    vals = np.zeros((4, 4), dtype = int)
    
    #starting values
    new_number(vals)
    new_number(vals)
    
    vals = torch.tensor(vals)
    
    return vals

def eval_vector(v):
    #remove spaces to compress
    def compress(v):
        temp = [x for x in v if x!=0]
        v = np.zeros(4)
        v[0:len(temp)] = temp
        return v
    
    v = compress(v)
    
    #only go to three because we are looking to see if the ith will combine with the i+1th
    for i in range(3):
        if v[i] == v[i+1]:
            v[i] = v[i] *2
            v[i+1] = 0
            v = compress(v)
            
    return v

def update(updated, move):
    if type(updated) == type(np.ndarray([0])): #I don't know how but it keeps reverting to numpy array
        updated = torch.from_numpy(updated)
    for i in range(4):
        #up
        if move == 0:
            temp = eval_vector(updated[:, i])
            updated[:, i] = torch.from_numpy(temp)

        #left
        elif move == 1:
            temp = eval_vector(updated[i, :])
            updated[i, :] = torch.from_numpy(temp)

        #down
        elif move == 2:
            temp = eval_vector(np.flip(updated[:, i].numpy()))
            updated[:, i] = torch.from_numpy(np.flip(temp).copy())
        #right
        else: 
            temp = eval_vector(np.flip(updated[i, :].numpy()))
            updated[i, :] = torch.from_numpy(np.flip(temp).copy())

    rew = sum(updated.flatten())
    score = max(updated.flatten())   

    return updated, rew, score
            
def new_number(vals): 
    possibles = [i for i in range(len(vals.flatten())) if vals.flatten()[i] == 0]
    #starting values
    x = random.sample(possibles, 1)[0] #pick without replacement
    a = random.randint(1, 2) #pick with replacement
    vals[int(x/4), x%4] = a*2
    return vals
    
def check_valid_move(vals, i):
    updated = vals.numpy().copy()
    update(updated, i)
    empty = len([i for i in range(len(updated.flatten())) if updated.flatten()[i] == 0])
    if empty != 0 and updated != vals: #if not everything is empty after our move (if a new thing can generate without killing us) and the move actually did something
        return True #if possible, return yes for possible move
    return False
    
def check_alive(vals):
    empty = len([i for i in range(len(vals.flatten())) if vals.flatten()[i] == 0])
    if empty == 0:
        possible = False
        i = 0
        while not possible and i < 4:
            updated = vals.numpy().copy() #we don't want to perform this operation on the actual vals array
            update(updated, i)
            empty = len([i for i in range(len(updated.flatten())) if updated.flatten()[i] == 0])
            if empty != 0:
                possible = True
            i += 1
        if not possible:
            return False #if there are no possible moves, we're dead
    return True #if there are still empty cells, we're alive


def display(vals):
    color_dict = {
    0:  '#ffffff',
    2: '#cfcfcf',
    4: '#b0b0b0',
    8: '#ffdf4f',
    16: '#ffbc4f',
    32: '#ff924f',
    64: '#ff6f4f',
    128: '#ffdf4f',
    256: '#ffbc4f',
    512: '#ff924f',
    1048: '#ff6f4f',
    2048: '#ffdf4f',       
    }

    #making the color array
    colors = np.empty((4, 4), dtype = object)
    for i in range(4):
        for j in range(4):
            colors[i, j] = color_dict[vals[i, j]]


    fig,ax = plt.subplots(figsize =(4, 4))
    the_table = ax.table(vals, loc = 'center', cellColours = colors)
    the_table.auto_set_font_size(False)
    the_table.set_fontsize(15)
    ax.axes.xaxis.set_visible(False)
    ax.axes.yaxis.set_visible(False)
    cell_height = 1 / 4
    for pos, cell in the_table.get_celld().items():
        cell.set_height(cell_height)

In [5]:
def play():
    r = 0
    states = []
    actions = []
    vals = init()
    alive = True
    while alive:
        p = net(vals.flatten().float()) #get the next move distribution
        m = Categorical(p).sample() #pick next move
        valid = check_valid_move(vals, m)
        if valid:
            vals, rew, score = update(vals, m) #get the most recent score
            vals = new_number(vals)
            alive = check_alive(vals)
        vals_reshape = vals.reshape(16).float()
        states.append(vals_reshape)
        actions.append(m)
    return states, actions, rew, score

In [6]:
"""
Loading a saved state from previous training
"""
net = PolicyNet()
optimizer = torch.optim.Adam(net.parameters(), lr=0.0025)
from google.colab import drive
drive.mount('/content/drive')
path = F"/content/drive/My Drive/Colab Notebooks/2048/2048_state.pth"
net.load_state_dict(torch.load(path))

Mounted at /content/drive


<All keys matched successfully>

In [None]:
"""
Loading previous scores for graphing purposes (if we get disconnected, want to show all the training in the past, not just from this session)
"""
df = pd.read_excel(F"/content/drive/My Drive/Colab Notebooks/2048/score_sheet.xlsx")
highScores = df["HighScores"]
avgScores = df["AverageScores"]

In [8]:
"""
#This is to start a new one if you don't want to do it from existing state. Commented to avoid accidentally overwriting existing training
net = PolicyNet()
optimizer = torch.optim.Adam(net.parameters(), lr=0.0025)
highScores = []
avgScores = []
"""

"\n#This is to start a new one if you don't want to do it from existing state. Commented to avoid accidentally overwriting existing training\nnet = PolicyNet()\noptimizer = torch.optim.Adam(net.parameters(), lr=0.0025)\nhighScores = []\navgScores = []\n"

In [None]:
"""
Now, train the model!
"""

losses = []
EPOCHS = 1000
BATCH_SIZE = 300

for i in range(EPOCHS):

    # Create a batch
    states_batch = []
    actions_batch = []
    rewards_batch = []
    scores_batch = []
    
    for j in range(BATCH_SIZE):
        states, actions, rew, score = play()
        states_batch.append(states)
        actions_batch.append(actions)
        rewards_batch.append(rew)
        scores_batch.append(score) #this is just for plotting later; not involved in training
    #formatting these tensors
    states_batch = [torch.stack(states_batch[i]) for i in range(len(states_batch))]
    actions_batch = [torch.stack(actions_batch[i]).long() for i in range(len(actions_batch))]
    rewards_batch= torch.FloatTensor(rewards_batch)
    
    # Estimate gradient (there is probably a nice torch way to compute the log_probs)
    outputs = outputs = [net(states_batch[i]) for i in range(len(states_batch))] #pdf outputs for all the states we were in
    log_probs = log_probs = [Categorical(outputs[i]).log_prob(actions_batch[i]) for i in range (len(outputs))] #get the log probs of selecting the ones that were selected
    log_means = [log_probs[i].mean() for i in range(len(log_probs))]
    log_means = torch.stack(log_means)
    loss = (-rewards_batch* log_means).mean()
    losses.append(loss.detach())
      
    # Backprop gradient and take an optimization step
    optimizer.zero_grad()
    loss.backward(retain_graph = True)
    optimizer.step()    

    highScores.append(max(scores_batch))
    avgScores.append(sum(scores_batch)/len(scores_batch))

    #save the scores to excel file
    df = pd.DataFrame({"HighScores" : highScores, "AverageScores": avgScores})
    df.to_excel(F"/content/drive/My Drive/Colab Notebooks/2048/score_sheet.xlsx")

    #save the state
    path = F"/content/drive/My Drive/Colab Notebooks/2048/2048_state.pth" 
    torch.save(net.state_dict(), path)

    #plot the data (update the plot)
    plt.cla()
    plt.plot(highScores, label = "High Score")
    plt.plot(avgScores, label = "Average Score")
    plt.legend()
    plt.title("Scores over Time\n Highest Score Ever: %d" % max(highScores))
    plt.savefig(F"/content/drive/My Drive/Colab Notebooks/2048/highscores.jpg")
    sys.stdout.write("\rEpoch %d: Loss: %f      " % (i,loss))

plt.show()
