In [None]:
import os
import sys  
sys.path.insert(0, '..')
import time

from tensorflow import keras
import tensorflow as tf
import numpy as np
from util.plots import show_plot
import pandas as pd

feature = pd.read_csv('../data/feature_nirspec100.csv').values
label = pd.read_csv('../data/label_nirspec100.csv').values
print(feature.shape)

In [None]:
# random data
p = [4826, 31757, 25834, 26607, 37618, 16953, 1633, 38559, 16174, 6779, 28881, 14980, 38729, 13816, 32933, 7992, 39395, 3649, 9174, 39568, 18124, 21122, 7959, 15201, 31648, 39339, 29350, 37429, 13912, 24092, 10823, 32187, 12371, 25361, 5640, 25517, 38293, 18361, 2025, 24981, 34470, 2797, 5242, 33505, 25206, 23115, 11510, 28940, 16147, 14488, 6678, 18509, 16665, 30961, 31126, 29867, 39406, 1107, 19318, 29004, 15840, 36030, 34252, 16743, 17627, 8950, 25917, 31001, 299, 8428, 39252, 28270, 17296, 7729, 14925, 345, 23389, 23128, 14290, 37979, 21191, 21253, 13476, 9408, 16741, 26915, 34880, 16170, 38485, 38943, 13480, 20817, 23309, 19511, 9810, 34506, 2270, 37415, 206, 19076]

In [None]:
# best data
p = [7, 17, 21, 48, 54, 60, 66, 74, 88, 90, 95, 110, 111, 112, 118, 140, 160, 170, 171, 174, 197, 202, 206, 217, 220, 241, 248, 260, 274, 287, 307, 311, 314, 316, 322, 328, 341, 353, 355, 361, 365, 373, 376, 384, 385, 386, 389, 390, 393, 398, 403, 406, 418, 422, 423, 427, 433, 434, 451, 452, 484, 487, 499, 509, 511, 516, 531, 538, 543, 554, 578, 588, 594, 597, 610, 614, 615, 618, 629, 642, 652, 669, 677, 678, 688, 692, 700, 705, 709, 717, 722, 725, 731, 734, 740, 745, 757, 759, 760, 775]

In [None]:
train_feature = feature[p]
train_label = label[p]
test_feature = feature[50000:]
test_label = label[50000:]
features_and_labels = list(zip(train_feature, train_label))

In [None]:
loss = keras.losses.MeanSquaredError()
layers_available_for_evolution = (keras.layers.Dense,)

layers_format = [(keras.layers.InputLayer, [198]),
                 (keras.layers.Dense, 150, 'relu'),
                 (keras.layers.Dense, 44, 'relu'),
                 (keras.layers.Dense, 12, 'relu')]


class Individual:
    def __init__(self, layers, loss, optimizer='Adam', weights=None, biases=None):
        self.model = keras.Sequential()

        layer_instance = layers[0][0](input_shape=layers[0][1])
        self.model.add(layer_instance)

        i = 0
        for layer_data in layers[1:]:
            if layer_data[0] == keras.layers.Dense:
                layer_instance = layer_data[0](units=layer_data[1], activation=layer_data[2],
                                               kernel_initializer=tf.initializers.constant(weights[i].numpy())
                                               if weights else 'glorot_uniform',
                                               bias_initializer=tf.initializers.constant(biases[i].numpy())
                                               if biases else 'zeros')
                i += 1
            elif layer_data[0] == keras.layers.Dropout:
                layer_instance = layer_data[0](rate=layer_data[1])

            self.model.add(layer_instance)

        self.model.compile(loss=loss, optimizer=optimizer)

    @staticmethod
    def weights_and_biases(model):
        w = [None for i in range(len(model.layers))
             if type(model.layers[i]) in layers_available_for_evolution]
        b = [None for _ in range(len(w))]

        i = 0
        for layer in model.layers:
            if type(layer) not in layers_available_for_evolution:
                continue
            w[i], b[i] = layer.trainable_weights
            i += 1

        return w, b

    def assign(self, individual):
        w, b = Individual.weights_and_biases(individual.model)
        for i, layer in enumerate(self.model.layers):
            weights = layer.trainable_weights[0]
            indexes = np.argwhere(np.random.random(size=weights.shape) < 2)
            weights.scatter_nd_update(indexes, w[i].numpy().reshape((len(indexes),)))

            biases = layer.trainable_weights[1]
            indexes = np.argwhere(np.random.random(size=biases.shape) < 2)
            biases.scatter_nd_update(indexes, b[i].numpy().reshape((len(indexes),)))
        return self


def replace(individual, interval_w=(-1, 1), interval_b=(-1, 1), uniform_w=False, uniform_b=False):
    for layer in individual.model.layers:
        if type(layer) not in layers_available_for_evolution:
            continue

        weights = layer.trainable_weights[0]
        indexes = np.argwhere(np.random.random(size=weights.shape) < 2)
        f = np.random.uniform if uniform_w else np.random.normal
        weights.scatter_nd_update(indexes, f(*interval_w, size=(len(indexes),)))

        biases = layer.trainable_weights[1]
        indexes = np.argwhere(np.random.random(size=biases.shape) < 2)
        f = np.random.uniform if uniform_b else np.random.normal
        biases.scatter_nd_update(indexes, f(*interval_b, size=(len(indexes),)))


def evaluate(individual, features, labels):
    return individual.model.evaluate(features, labels, batch_size=3000)


def pure_random_search(iterations, t=None):
    nn = Individual(layers_format, loss)
    best_nn = [evaluate(nn, train_feature, train_label), Individual(layers_format, loss).assign(nn)]
    i = 1
    start = time.time()
    while i < iterations or (t is not None and time.time() - start < t):
        replace(nn, (0.0, 0.15), (0, 0.15), uniform_w=False, uniform_b=False)
        loss_value = evaluate(nn, train_feature, train_label)
        if loss_value < best_nn[0]:
            best_nn[0] = loss_value
            best_nn[1].assign(nn)
        i += 1
    print(i)
    return best_nn


def get_random_samples_in_n_sphere(n, r, nr_val):
    x = np.random.normal(size=(nr_val, n))
    u = np.random.random((nr_val, 1))

    return r * u ** (1 / n) / np.sqrt(np.sum(x ** 2, 1, keepdims=True)) * x


def sphere_replace(individual, radius_w, radius_b):
    for layer in individual.model.layers:
        if type(layer) not in layers_available_for_evolution:
            continue

        weights = layer.trainable_weights[0]
        indexes = np.argwhere(np.random.random(size=weights.shape) < 2)
        values_to_change = weights.numpy()
        weights.scatter_nd_update(indexes, values_to_change.reshape(-1) +
                                  get_random_samples_in_n_sphere(np.prod(values_to_change.shape), radius_w, 1)[0])

        biases = layer.trainable_weights[1]
        indexes = np.argwhere(np.random.random(size=biases.shape) < 2)
        values_to_change = biases.numpy()
        biases.scatter_nd_update(indexes, values_to_change.reshape(-1) +
                                 get_random_samples_in_n_sphere(np.prod(values_to_change.shape), radius_b, 1)[0])


def local_random_search(iterations, t=None):
    nn = Individual(layers_format, loss)
    best_nn = [evaluate(nn, train_feature, train_label), Individual(layers_format, loss).assign(nn)]
    i = 1
    start = time.time()
    while i < iterations or (t is not None and time.time() - start < t):
        w, b = Individual.weights_and_biases(best_nn[1].model)
        new_nn = Individual(layers_format, loss, weights=w, biases=b)
        sphere_replace(new_nn, 0.15, 0.15)
        loss_value = evaluate(new_nn, train_feature, train_label)
        if loss_value <= best_nn[0]:
            best_nn[0] = loss_value
            best_nn[1].assign(new_nn)
        i += 1
    print(i)
    return best_nn

In [None]:
best_loss, best_nn = pure_random_search(100, 900)
print('Best train loss:', best_loss)
print('Best test loss:', evaluate(best_nn, test_feature, test_label))
best_nn.model.save('best_neural_network_pure_rs')

In [None]:
best_loss, best_nn = local_random_search(100, 900)
print('Best train loss:', best_loss)
print('Best test loss:', evaluate(best_nn, test_feature, test_label))
best_nn.model.save('best_neural_network_local_rs')

In [None]:
def stats_weights_biases(model):
    w, b = Individual.weights_and_biases(model)
    fcts = [np.min, np.max, np.sum]
    w_stats = [np.inf, 0, 0]
    b_stats = [np.inf, 0, 0]
    b_n = w_n = 0
    for w_val in w:
        w_val = w_val.numpy()
        w_n += len(w_val)
        for i, op in enumerate(fcts):
            w_stats[i] = op([w_stats[i], op(w_val)])
    for b_val in b:
        b_val = b_val.numpy()
        b_n += len(b_val)
        for i, op in enumerate(fcts):
            b_stats[i] = op([b_stats[i], op(b_val)])

    print(*w_stats[:-1], w_stats[-1]/w_n)
    print(*b_stats[:-1], b_stats[-1]/b_n)

In [None]:
model = tf.keras.models.load_model('best_neural_network_local_rs')
stats_weights_biases(model)

In [None]:
show_plot(model.predict(test_feature[:2500]), test_label[:2500],
          model.predict(train_feature[:5000]), train_label[:5000],
         (0.5, 0.5), 600)