In [23]:
# control random processes
import torch
torch.manual_seed(1)
import numpy as np
np.random.seed(1)

from torch import nn
from torch import optim
import time

def transform(layout):
    return np.log2(np.where(layout==0, 1, layout))/11-.25

class NeuralNetwork():
    def __init__(self, inputSize=16, outputSize=4, neuronCountJ=200, neuronCountK=100):
        # initialize network
        self.model = nn.Sequential(nn.Linear(inputSize, neuronCountJ),
                       nn.ReLU(), 
                       nn.Linear(neuronCountJ, neuronCountK),
                       nn.ReLU(),
                       nn.Linear(neuronCountK, outputSize),
                       nn.Softmax(dim=1),
                     )
        self.model.double()
        
    def random_init(self, max_tile=None):
        # generates a random initial layout, possibly given a max tile for the layout to build off of
        ## ex: max tile=64 will generate a 4x4 layout where the largest tile is a 64, all other tile values are smaller
        if max_tile:
            max_tile = int(np.log2(max_tile))
            assert max_tile in range(0,10) # maximum tile cannot be larger than 512
        else:
            max_tile = np.random.choice(range(3,10))
        goal_tile = 2**(max_tile+1)

        # insert the max tile into the matrix
        init_layout = np.zeros((4,4), dtype=np.int)
        init_layout[np.random.choice(range(4)), np.random.choice(range(4))] = 2**max_tile

        for move in range(np.random.choice(range(4,12))):
            row_idx = np.random.choice(np.where((init_layout==0).sum(axis=1)>0)[0])
            init_layout[row_idx, np.random.choice(np.where(init_layout[row_idx]==0)[0])] = 2**np.random.choice(range(1,max_tile))

        return init_layout, goal_tile

    def get_game_idx(self, data, game_metric=None):
        if game_metric=='tile_sum': # micro-game with largest ending tile sum
            return np.argmax(data.tile_sums)
        elif game_metric=='moves': # micro-game with lowest # of moves
            return np.argmin(data.num_moves)
        else: # default to the micro-game with highest score
            return np.argmax(data.final_scores)

    def best_init(self, data, game_idx, max_tile=None):
        possible_max = int(np.log2(data.layouts[game_idx].max()))
        if max_tile:
            max_tile = int(np.log2(max_tile))
            assert max_tile<possible_max # maximum tile must be feasible given the micro-game we've chosen
        else: # default to the second largest tile the micro-game ever reaches
            max_tile = possible_max-1

        goal_tile = 2**max_tile
        print(max_tile)
        # select the first time the max_tile appears in the game corresponding to game_idx
        init_layout = data.layouts[game_idx][np.where(data.layouts[game_idx].max(axis=1)==int(2**max_tile))[0][0]].reshape((4,4))
        goal_tile = 2**int((np.log2(init_layout.max())+1))

        return init_layout, goal_tile

    def compute_game_penalties(self, data, penalty_type):
        # all weights range [-1,1], where weight<0 indicates a "bad" game and weight>=0 indicates a "good" game
        if penalty_type=='scores':
            # computed by overall game score
            rank_values = data.final_scores
        elif penalty_type=='max':
            # computed by max tile on board at the end of game
            rank_values = data.max_tile
        elif penalty_type=='log2_max':
            # computed by the base 2 log of max tile on board at the end of game        
            rank_values = np.log2(data.max_tile)
        elif penalty_type=='tile_sums':
            # computed by sum of tile values at the end of game
            rank_values = data.tile_sums
        else: # use binary as default (-1:final score was below median, 1: final score was above median)
            penalties = np.ones(data.final_scores.shape)
            penalties[data.final_scores<=np.median(data.final_scores)] = -1  
        
        if penalty_type is not None: # runs for all except default (binary)
            # using distance to median by whatever metric was chosen using penalty_type
            maxes = np.repeat(rank_values.max(), rank_values.shape)
            maxes[rank_values<=np.median(rank_values)] = rank_values.min()
            maxes = np.absolute(maxes-np.median(rank_values))
            penalties = (rank_values-np.median(rank_values))/maxes
    
        return penalties

    def compute_move_penalties(self, data, penalty_type):
        # all weights range [0,1]
        if penalty_type=='nonzero':
            # weights by fraction of tiles that are nonzero
            weights = np.count_nonzero(np.concatenate(data.layouts), axis=1)/16
        elif penalty_type=='linear_move_num':
            # weights linearly by move number (move #/total # of moves)
            weights = np.concatenate([np.linspace(0,1,num_moves) for num_moves in data.num_moves])
        elif penalty_type=='exponential_move_num':
            # weights exponentially (1-e^(-3x)) by move number where x=(move #/total # of moves)
            weights = np.concatenate([1-np.exp(-3*np.linspace(0, 1, num=num_moves)) for num_moves in data.num_moves])
        else:
            # weight all moves equally
            weights = np.ones(data.num_moves.sum())
        return weights

    def train(self, lr=0.001, duration=1/600, random_games=None, random_frac=None, batch_size=10,
             move_penalty_type=None, game_penalty_type=None, game_type='full',
             test=False):
        # save parameters in the model object for later serialization
        self.model.lr = lr
        self.model.duration = duration
        self.model.random_games = random_games
        self.model.random_frac = random_frac
        self.model.batch_size = batch_size
        self.model.move_penalty_type = move_penalty_type
        self.model.game_penalty_type = game_penalty_type
        self.model.game_type = game_type
        
        # initialize optimizer and loss function
        opt = optim.Adam(self.model.parameters(), lr=lr)
        loss = nn.L1Loss()
        
        # actual game class (to run games and get data)
        from helper import GameDriver

        # define method for training (possibly including random moves with neural network-selected moves)
        if random_frac is not None:
            method = lambda layout: self.model(torch.from_numpy(transform(layout)).double().reshape(1,-1)).detach().numpy().flatten() if np.random.random()>random_frac else np.repeat(.25, 4)
        else:
            method = lambda layout: self.model(torch.from_numpy(transform(layout)).double().reshape(1,-1)).detach().numpy().flatten()
        
        #
        if game_type=='mini_iterative':
            max_idx = 3
        
        # initialize variables to hold data during training
        end_time = time.time()+60*60*duration
        scores = []
        while time.time()<end_time: # run loop for a certain duration (in hours)
            # initialize games class
            data = GameDriver()
            
            # run single-goal micro-games, with random initialization
            if game_type=='mini_random':
                init_layout, goal_tile = self.random_init()
                # run neural-network-run games
                data.run_games(batch_size, method=method, init_layout=init_layout,
                              early_stop=goal_tile) 
                if random_games: # run some number of completely random games, if applicable
                    data.run_games(int(batch_size*random_frac), init_layout=init_layout,
                                  early_stop=goal_tile) 
                    
            # run single-goal micro-games, with initialization based on successful games
            elif game_type=='mini_iterative':
                # make sure the maximum tile is always between 8 (2**3) and 1024 (2**10)
                max_idx = max(3,max_idx%11)
                print(max_idx)
                
                if max_idx==3: # need random start
                    init_layout, goal_tile = self.random_init()
                else: # use "best" end point from last run as initialization
                    init_layout, goal_tile = self.best_init(data, self.get_game_idx(data, game_metric=None), max_tile=2**max_idx)
                
                # run neural-network-run games
                data.run_games(batch_size, method=method, init_layout=init_layout,
                              early_stop=goal_tile)
                if random_games:
                    data.run_games(int(batch_size*random_frac), init_layout=init_layout,
                              early_stop=goal_tile) # run some number of completely random games, if applicable


                max_idx += 1
            
            # default option is to run entire games
            else:
                data.run_games(batch_size, method=method) # run neural-network-run games
                if random_games:
                    data.run_games(int(batch_size*random_frac)) # run some number of completely random games, if applicable
            
                
            # find weights for good/bad game performance and weights for move importance
            game_penalties = self.compute_game_penalties(data, game_penalty_type)
            move_penalties = self.compute_move_penalties(data, move_penalty_type)
            expanded_game_penalties = np.repeat(game_penalties, data.num_moves)
            
            # set up data to train
            L = torch.from_numpy(transform(np.concatenate(data.layouts))).double()
            M = torch.from_numpy(np.concatenate(data.moves)).double()
            y_hat = self.model(L)
            
            # make "true" labels - M where the game was "good", (1-M)/3 where the game was "bad"
            y = M.clone()
            mask = expanded_game_penalties<0
            mask = torch.from_numpy(mask).nonzero().flatten()
            y[mask,:] = (1-y[mask,:])/3        
        
            # align penalties for torch compatibility
            ## note absolute value for game penalties, since directionality was taken care of when generating true labels
            expanded_game_penalties = torch.abs(torch.from_numpy(expanded_game_penalties)[:,None].double())
            move_penalties = torch.from_numpy(move_penalties)[:,None].double()
                  
            if test:
                return
        
            # update model weights
            output = loss(expanded_game_penalties*move_penalties*y, expanded_game_penalties*move_penalties*y_hat)
            output.backward()
            opt.step()
            opt.zero_grad()
            
        # save scores in the model object for later serialization
        self.model.scores = scores
        
        # call log_results to save the trained model
        self.log_results()
            
    def log_results(self):
        import os
        directory='.\\model_results'
        
        # generate nested folder structure: folder for all models, then subfolder for this model
        if not os.path.exists(directory):
            os.makedirs(directory)
            model_num = 0
        else:
            model_num = int(max(os.listdir(directory)))+1
        subdirectory = os.path.join(directory, str(model_num))
        os.makedirs(subdirectory)
        
        # save model, scores (including a graph), parameters
        import pickle
        pickle.dump(self.model, open(os.path.join(subdirectory, 'model.pickle'), 'wb'))
        
        # save graph of scores
        import matplotlib.pyplot as plt

        def moving_average(a, n=3) :
            ret = np.cumsum(a, dtype=float)
            ret[n:] = ret[n:] - ret[:-n]
            return ret[n - 1:] / n

        a = moving_average(np.array(self.model.scores), 30)
        plt.plot(a)
        plt.title("Average game score during training")
        plt.ylabel('Game score')
        plt.xlabel('Epoch')

        plt.savefig(open(os.path.join(subdirectory, 'results.png'), 'wb'), dpi=1200)

In [24]:
network = NeuralNetwork()

In [25]:
network.train(game_type='mini_iterative')

3
4


AttributeError: 'GameDriver' object has no attribute 'final_scores'