In [1]:
'''
Worked on by: Meena Hari and Tarini Singh.

We perform data preprocessing using KNearestNeighbors.
66 new features are generated using the KNN. 

The board is also one-hot encoded, and
x/y displacements to the solved configation and
Manhattan and Hamming distances are included as features.

In total, the input feature length is transformed from 16
to 356.

Trained a 2 layer ANN with the transformed dataset.
'''

import numpy as np
import pandas as pd
from sklearn.neighbors import NearestNeighbors
from tqdm import tqdm
import keras.backend as K
from keras.models import Sequential, Model
from keras.layers import Dense, Dropout, Conv2D, Flatten, Input
from sklearn.model_selection import KFold
from sklearn.model_selection import cross_val_score
from keras.models import load_model
import keras.losses

from constants import *
import heuristic as h
import io_help as io
import neural_net as nn
import solver as s

def load_data(file_name):
    """
    This function reads in training data from a file and returns 
    the boards in X and their labels in Y as a tuple. 
    """
    file = open(file_name, "r")
    X = []
    Y = []
    

    for string in file: 
        (board, dist) = io.string_to_board_and_dist(string)
        X_temp = np.concatenate((board.reshape(16)), axis=None)
        X.append(X_temp)
        Y.append(dist)
        
    file.close()
    X_train = np.asarray(X)
    Y_train = np.asarray(Y)
    return(X_train, Y_train)

Using TensorFlow backend.


In [2]:
# Load dataset. 
# X: board inputs, Y: true output.
(X_train,Y_train) = load_data('All_Data.txt')
print(X_train.shape)

(395715, 16)


In [3]:
# Generates additional features.
# X: the input data file.
# X_train: the original training data file (not transformed).
def gen_features (X, X_train, knn_model):
    data_arr = np.zeros([len(X), (16)*2*2 + 2])
    man_ham_2D = np.zeros([len(X), 2])
    one_hot_2D = np.zeros([len(X), 256+32])
    pred = knn_model.kneighbors(X)
    

    for i in range(len(X)):
        row = X[i]
        # Grabs the rows in X corresponding to 35 nearest neighbors of X[i].
        # pred[1][i] contains a list of the indices of the 35 nearest neighbors of X[i].
        data = X_train[pred[1][i]]
        # Divide X[i] by each of its neighbors. div should be a 
        # 35 x 16 matrix, i.e. div[j] = X[i] / X[j].
        div = (row / data)
        # Subtract X[i] by each of its neighbors. diff should be a 
        # 35 x 16 dimension matrix.
        diff = (row - data)
        # concat is a 35 x 32 matrix.
        concat = np.concatenate([div, diff], axis = 1)
        # means is a 35 x 32 matrix.
        # std is a 35 x 32 matrix.
        means, stds = np.nanmean(concat, axis = 0), np.nanstd(concat, axis = 0)
        # Populate data_arr with newly generated features.
        data_arr[i, :len(means)] = means
        data_arr[i, len(means):len(means) + len(stds)] = stds
        # Add average and standard deviation of distance away from 35 neighbors.
        data_arr[i, -1] = np.nanmean(pred[0][i])
        data_arr[i, -2] = np.nanstd(pred[0][i])
        
        # Calculate Manhattan/Hamming distances.
        man = h.manhattan(row.reshape(4,4), None)
        ham = h.hamming(row.reshape(4,4), None)
        man_ham_2D[i,0] = man
        man_ham_2D[i,1] = ham
        # Calculate one-hot encoding of the sample.
        one_hot_2D[i] = nn.get_rep_2(row.reshape(4,4))
        
    # Concatenate all features plus one-hot encoded dataset.
    return np.concatenate([data_arr, one_hot_2D, man_ham_2D], axis=1)

In [4]:
knn_model = NearestNeighbors(n_neighbors=35, n_jobs = -1).fit(X_train,Y_train)
X_train_2 = gen_features(X_train, X_train, knn_model)

In [5]:
X_train_2.shape

(395715, 356)

In [6]:
def shift_mse(y_true, y_pred):
    """custom loss functions"""
    loss = (1 + 6/(1 + K.exp(-(y_pred - y_true)))) * K.square(y_pred - y_true)
    loss = K.mean(loss, axis = 1)
    return loss


def exp_loss_2(y_true, y_pred):
    """
    Custom loss function. 
    """
    loss = K.exp((y_pred - y_true))
    loss = loss + K.square(y_pred - y_true)
    loss = K.mean(loss, axis = 1)

    return loss
    
keras.losses.shift_mse = shift_mse
keras.losses.exp_loss_2 = exp_loss_2

In [7]:
def rep_4_model (X, Y):
    # Build Model
    model = Sequential()

    # Input Layer
    i = Input(shape = (16*2*2+2+(256 + 32 + 2),))
    x_1 = Dense(16*2*2+2+(256 + 32 + 2), activation='relu')(i)
    x_2 = Dropout(0.1)(x_1)
    x_3 = Dense(356, activation='relu')(x_2)
    x_4 = Dropout(0.1)(x_3)
    x_5 = Dense(17, activation='relu')(x_4)
    o = Dense(1, activation='linear')(x_1)
    model = Model(i,o)

    # Define the optimizer and loss function
    model.compile(optimizer='adam', loss=shift_mse, metrics=['accuracy'])

    # Train 
    model.fit(X, Y, epochs=15)

    return model

In [9]:
# Load test dataset. X_test: board inputs, Y_test: true output.
(X_test,Y_test) = load_data('All_Data.txt')

# Transform X_test to higher dimension.
X_test_2 = gen_features (X_test, X_train, knn_model)

In [None]:
# Calculate in-sample performance metrics.

# Store values due to truncation of the model's prediction.
dist_over_i = []
misclass_i = 0
dist_under_i = []
dist_over_man_i = []
dist_under_man_i = []

# Store values due to rounding of the model's prediction.
dist_over_r = []
misclass_r = 0
dist_under_r = []

model = rep_4_model(X_test_2, Y_test)

for i in tqdm(range(len(X_test))):
    nn_heur_i = int(model.predict(X_test_2[i:(i+1),:]))
    nn_heur_r = np.around(model.predict(X_test_2[i:(i+1),:]))
    man_heur = h.manhattan(X_test[i].reshape(4,4), model)
    y = Y_test[i]
    
    ### TRUNCATE ###
    if (nn_heur_i > y):
        dist_over_i.append(nn_heur_i - y)
    
    if (nn_heur_i <= y):
        dist_under_i.append(y - nn_heur_i)
    
    if (nn_heur_i != y):
        misclass_i += 1
    
    if (nn_heur_i > man_heur):
        dist_over_man_i.append(nn_heur_i - man_heur)
        
    if (nn_heur_i < man_heur):
        dist_under_man_i.append(man_heur - nn_heur_i)
    
        
    ##### ROUND ##### 
    if (nn_heur_r > y):
        dist_over_r.append(nn_heur_r - y)
    
    if (nn_heur_r <= y):
        dist_under_r.append(y - nn_heur_r)
    
    if (nn_heur_r != y):
        misclass_r += 1
    
avg_dist_over_i = np.mean(np.asarray(dist_over_i))
avg_dist_under_i = np.mean(np.asarray(dist_under_i))
out_sample_error_i = misclass_i / len(X_test)
avg_dist_over_man_i = np.mean(np.asarray(dist_over_man_i))
avg_dist_under_man_i = np.mean(np.asarray(dist_under_man_i))

avg_dist_over_r = np.mean(np.asarray(dist_over_r))
avg_dist_under_r = np.mean(np.asarray(dist_under_r))
out_sample_error_r = misclass_r / len(X_test)
 
print("------ TRUCATION: ------")
print("Avg distance overestimated: ", avg_dist_over_i)
print("Avg distance underestimated: ", avg_dist_under_i)
print("E_admiss: ", len(dist_over_i)/len(X_test))
print("E_out: ", out_sample_error_i)
print("Avg distance over Manhattan: ", avg_dist_over_man_i)
print("Avg distance under Manhattan: ", avg_dist_under_man_i)
print("Percent over Manhattan: ", len(dist_over_man_i)/len(X_test) * 100)
print("Percent under Manhattan: ", len(dist_under_man_i)/len(X_test) * 100)
print("\n")
print("------ ROUNDED: ------")
print("Avg distance overestimated: ", avg_dist_over_r)
print("Avg distance underestimated: ", avg_dist_under_r)
print("E_admiss: ", len(dist_over_r)/len(X_test))
print("E_out: ", out_sample_error_r)

In [19]:
def string_to_test_info(string):
    """
    given a string containing the standard form of test info, returns tuple of 
    board, number of states to solution, time, and lenght of solution
    (Function copied from run_testing.py).
    """
    split = string.split("!")
    board = io.string_to_board(split[0])
    n_states = int(split[1])
    time = float(split[2])
    sol_len = int(split[3])
    return (board, n_states, time, sol_len)


def load_boards(filename):
    """
    given name of file containing test boards, loads all test boards
    (Function copied from run_testing.py).
    """
    file = open(filename, "r")

    boards = []
    n_states = []
    times = []
    dists = []

    for line in file:
        (board, c_states, c_time, sol_len) = string_to_test_info(line)
        boards.append(board)
        n_states.append(c_states)
        times.append(c_time)
        dists.append(sol_len)

    return (boards, n_states, times, dists)

def run_testing(data_file, model, h_func):
    """
    given a data_file containing testing data, a model, and heuristic function
    for said model, computes average number of states to solution, number to 
    times solution length is non-optimal, and average estimates of solution 
    lengths
    (Function copied from run_testing.py).
    """
    (boards, n_states, times, dists) = load_boards(data_file)

    cust_states = []
    cust_wrong = 0
    cust_distance = []

    for i in tqdm(range(len(boards))):
        (c_states, c_time, sol_path) = s.solve(boards[i], h_func, model)
        cust_states.append(c_states)
        sol_len = len(sol_path) - 1
        if not (sol_len == dists[i]):
            cust_wrong += 1
        cust_distance.append(sol_len)

    print("average number of states explored to find solution:")
    print("\tfor learned model: " + str(np.mean(cust_states)))
    print("\tfor manhattan distance: " + str(np.mean(n_states)))
    print("----------------------------------------------------")
    print("solution was non-optimal " + str(cust_wrong / NUM_TEST_BOARDS * 100) + "% of the time")
    print("----------------------------------------------------")
    print("average length of solution path was:")
    print("\tfor learned model: " + str(np.mean(cust_distance)))
    print("\tfor manhattan distance: " + str(np.mean(dists)))


In [21]:
dic = {}
def heur_rep_4(board, model):
    b1 = board.reshape(16)
    
    # Calculate Manhattan/Hamming distances.
    man = np.array(h.manhattan(b1.reshape(4,4), None))
    ham = np.array(h.hamming(b1.reshape(4,4), None))
    
    b1_str = np.array_str(b1)
    
    if b1_str in dic:
        return dic.get(b1_str)
    else:
        # Transform board.
        X_2 = gen_features(np.asarray([b1]), X_train, knn_model)
        [[pred]] = model.predict(X_2[0].reshape(1,-1))
        # Memoize prediction.
        dic[b1_str] = int(pred)
        return dic[b1_str]

In [23]:
run_testing('Test_boards.txt', model, heur_rep_4)