
# Music NN
This model requires to add a .zip file with the song batches in a folder

In [1]:
sep=",;,;,;"
def wrapper_music(filters = 128, filters_end=128,window=5,window_end=5, learning_rate = 0.01, dropout = 0.7, epochs = 10, batch_size = 100):
    from keras.models import Sequential, Model
    from keras.layers import Dense, Conv1D, MaxPooling1D, Flatten, Input, GlobalMaxPooling1D, Dropout
    from keras.utils import normalize
    from keras import optimizers

    import numpy as np
    import os
    import keras
    import random
    from hops import hdfs
    import time

    # fix random seed for reproducibility
    np.random.seed(7)
    start = time.time()
    print("Start time: " + str(start))


    def createModel(filters,filters_end,window,window_end, learning_rate, dropout):
        '''Creates the NN model. As an input, the NN expects a normalized batch of 10 132300-dimensional
        For now, the following parameters are added to the model.
        @:param filters: number of filters in all the conv1d layers {50, 300}
        @:param learning_rate: learning rate of the model {0.0, 1.0}
        @:param dropout: Dropout value {0.0, 1.0}
            '''
        model = Sequential()
        
        model.add(MaxPooling1D(1,input_shape = (132300,1)))

        model.add(Conv1D(100, 100, strides=10 ,activation='sigmoid'))
    
        model.add(Conv1D(50, 100, strides=5 ,activation='sigmoid'))

        model.add(Conv1D(20, 100, strides=5 ,activation='sigmoid'))


        model.add(Conv1D(20, 20, activation='relu'))
        model.add(MaxPooling1D(5))

        model.add(Conv1D(10, 2, activation='relu'))
        model.add(MaxPooling1D(2))
        model.add(Flatten())
        model.add(Dense(1,activation='sigmoid'))

        print(str(model.summary()))

        sgd = optimizers.SGD(lr=learning_rate, decay=1e-6, momentum=0.9, nesterov=True)
        model.compile(optimizer=sgd, loss='binary_crossentropy', metrics=['accuracy'])

        return model


    def train_test_split(all_files, test_size=0.3):
        '''
        Randomly splits files into train and test files
        '''
        lenfiles = len(all_files)
        lentest = int(lenfiles * test_size)
        lentrain = lenfiles - lentest

        test = random.sample(all_files, lentest)
        train = list(set(all_files) - set(test))
        # print type(test)
        # print type(train)
        random.shuffle(test)
        random.shuffle(train)
        return train, test


    def main(filters = 128, filters_end=128,window=5,window_end=5, learning_rate = 0.01, dropout = 0.7, epochs = 10, batch_size = 100):
        '''
           Creates the NN model and train it with the batch data available in the input_training_folder file.
        '''

        # Global parameters
        # Input folder that contains all the subfolders A,B,C...containing the batch files.
        print("Parameters " + str(filters) + " "+ str(filters_end) + " "+ str(window) + " "+ str(window_end) + " "+ str(learning_rate) + " "+ str(dropout) + str(epochs) + " " + str(batch_size))

        input_folder = "small_output/small_output"  #In hops
        
        # Parameters: For the NN model, initially three parameters are going to be considered as a parameter.
        # The number of filters, learning rate, dropout, epochs and batch size.
          # batch_size should be a multiple of 10.
        m = createModel(filters,filters_end,window,window_end, learning_rate, dropout)
        metrics = m.metrics_names
        print(str(metrics))

        # Split training and testing files
        all_files = []
        for folder in os.listdir(input_folder):
            print(str(folder))
            for file in os.listdir(input_folder + "/" + folder):
                if file.startswith("songs_"):
                    all_files.append(input_folder + "/" + folder + "/" + file)

        training_files, test_files = train_test_split(all_files, test_size=0.3)
        #     print(all_files)
        #     print()
        #     print("Training files: " + str(training_files))
        #     print()
        #     print("Test files: " + str(test_files))
        #     print()
        print(str(len(all_files)))
        print(str(len(training_files)))
        print(str(len(test_files)))

        train_results_loss = []
        train_results_acc = []
        test_results_loss = []
        test_results_acc = []

        # We train the model. For each file in the training folder, we extract the batch, normalize it and then call the function
        # train on batch from keras.
        concatenations = int(batch_size / 10)
        for epoch in range(epochs):
            print("------- Executing epoch " + str(epoch))
            random.shuffle(training_files)
            for i in range(0, len(training_files)):
                # training_batch will store the final training batch of size batch_size. listY will do the same for the labels
                listY = []
                training_batch = np.empty(shape=(0, 132300))
                count_labels = 0
                for j in range(0, concatenations):
                    # Get input and label data from the same batch
                    if i<len(training_files) and "songs_" in training_files[i]:
                        label = training_files[i].replace("songs", "labels")
                        print("training " + training_files[i])
                        data = np.load(str(training_files[i]))
                        normalizedData = normalize(data, axis=0, order=2)
                        print(str(normalizedData.shape))
                        training_batch = np.concatenate((training_batch, normalizedData), axis=0)

                        labels = np.load(str(label))

                        # print("New Labels")
                        # print(labels)
                        for d in labels:
                            if(d == 10):
                                listY.append([1])
                                count_labels+=1
                            else:
                                listY.append([0])
                    i += 1
                print("training_batch shape" + str(training_batch.shape))
                print("training batch labels size " + str(len(listY)))
                print(str(count_labels))
                #print(listY)

                # As the first value of the training_batch is an initial array containing zeros, we start training from the
                # first element of the array
                training_batch = np.expand_dims(training_batch, axis=2)

                x = m.train_on_batch(training_batch,listY)
                print("New batch: Train Loss " + str(x[0]) + " Train accuracy " + str(x[1]))
                train_results_loss.append(x[0])
                train_results_acc.append(x[1])

            # We test the model. For each file in the test folder, we extract the batch, normalize it and then call the function
            # test on batch from keras. Note that the test_on_batch function does not add to all batches, but it gives testing metrics
            # per batch. Thus, it is necessary to add all the results.
            # we only test every few batches (when running on hops we only need to test at the end)
            # TODO: only test on last epoch on hops
            if (epoch % 3 == 0 or epoch == epochs - 1):
                total_loss = 0
                total_accuracy = 0
                total_test_data = 0

                for file in test_files:
                    if "songs_" in file:
                        label = file.replace("songs", "labels")
                        print("test " + file)
                        x_test = np.load(file)
                        x_test_normalized = normalize(x_test, axis=0, order=2)

                        labels_test = np.load(label)
                        y_test = []
                        for d in labels_test:
                            if(d == 10):
                                y_test.append([1])
                            else:
                                y_test.append([0])

                        x_test_normalized = np.expand_dims(x_test_normalized, axis=2)
                        x = m.test_on_batch(x_test_normalized, y_test)

                        print("New batch: Test Loss " + str(x[0]) + "Test accuracy " + str(x[1]))
                        total_test_data += 1
                        total_loss += x[0]
                        total_accuracy += x[1]

                test_results_loss.append(total_loss / total_test_data)
                test_results_acc.append(total_accuracy / total_test_data)
                print("[Total test files] " + str(total_test_data))
                print("[total_loss] " + str(total_loss) + " [total_loss] " + str(total_loss / total_test_data))
                print("[total_accuracy] " + str(total_accuracy) + " [total_accuracy] " + str(
                    total_accuracy / total_test_data))

        print("Final training loss: ")
        print(str(train_results_loss))
        print("Final training accuracy: ")
        print(str(train_results_acc))
        print("Final test loss: ")
        print(str(test_results_loss))
        print("Final test accuracy: ")
        print(str(test_results_acc))

        print(sep+str(total_accuracy/total_test_data)+sep)

        #return total_accuracy/total_test_data


    main(filters, filters_end,window,window_end, learning_rate, dropout, epochs, batch_size)
    print("End time: " + str(time.time()))
    print("Elapsed time: " + str(time.time()-start))


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
7303,application_1513605045578_4842,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


# Parallely executes and returns list of accuracies

In [3]:
def get_accuracy(v):
    if sep in v["_c0"]:
        i = v["_c0"].find(sep)
        substr = v["_c0"][i+len(sep):]
        i = substr.find(sep)
        return [substr[:i]]
    else:
        return []

def get_all_accuracies(tensorboard_hdfs_logdir, args_dict, number_params):
    '''
    Retrieves all accuracies from the parallel executions (each one is in a 
    different file, one per combination of wrapper function parameter)
    '''
    from hops import hdfs
    print(tensorboard_hdfs_logdir)
    hdfs.log(tensorboard_hdfs_logdir)
    results=[]
    
    #Important, this must be ordered equally than _parse_to_dict function
    population_dict = ['learning_rate', 'dropout',
                          'num_steps','batch_size','filters','filters_end','kernel','kernel_end']
    for i in range(number_params):
        path_to_log=tensorboard_hdfs_logdir+"/"
        for k in population_dict:
            path_to_log+=k+"="+str(args_dict[k][i])+"."
        path_to_log+="log"
        print("Path to log: ")
        hdfs.log("Path to log: ")
        print(path_to_log)
        hdfs.log(path_to_log)
        raw = spark.read.csv(path_to_log, sep="\n")
        
        r = raw.rdd.flatMap(lambda v: get_accuracy(v)).collect()
        results.extend(r)

    #print(results)
    return [float(res) for res in results]

def execute_all(population_dict):
    '''
    Executes wrapper function with all values of population_dict parallely. 
    Returns a list of accuracies (or metric returned in the wrapper) in the 
    same order as in the population_dict.
    '''
    from hops import tflauncher
    number_params=[len(v) for v in population_dict.values()][0]
    tensorboard_hdfs_logdir = tflauncher.launch(spark, wrapper_mnist, population_dict)
    return get_all_accuracies(tensorboard_hdfs_logdir, population_dict,number_params)

# Evolutionary algorithm for hyperparameter optimization
To run code just adapt the last fuction (parse_to_dict) to include the items you wanna optimize.
Also adapt the bounds and types in the main section to reflect the parameters you wanna optimize

In [4]:
'''
Differential evolution algorithm extended to allow for categorical and integer values for optimization of hyperparameter
space in Neural Networks, including an option for parallelization.

This algorithm will create a full population to be evaluated, unlike typical differential evolution where each
individual get compared and selected sequentially. This allows the user to send a whole population of parameters
to a cluster and run computations in parallel, after which each individual gets evaluated with their respective
target or trial vector.

User will have to define:
- Objective function to be optimized
- Bounds of each parameter (all possible values)
- The Types of each parameter, in order to be able to evaluate categorical, integer or floating values.
- Direction of the optimization, i.e. maximization or minimization
- Number of iterations, i.e. the amount of generations the algorithm will run
- The population size, rule of thumb is to take between 5-10 time the amount of parameters to optimize
- Mutation faction between [0, 2)
- Crossover between [0, 1], the higher the value the more mutated values will crossover
'''

import random
from hops import hdfs

class DifferentialEvolution:
    _types = ['float', 'int', 'cat']
    _generation = 0
    _scores = []

    def __init__(self, objective_function, parbounds, types, direction = 'max', maxiter=10, popsize=10, mutationfactor=0.5, crossover=0.7):
        self.objective_function = objective_function
        self.parbounds = parbounds
        self.direction = direction
        self.types = types
        self.maxiter = maxiter
        self.n = popsize
        self.F = mutationfactor
        self.CR = crossover

        #self.m = -1 if maximize else 1

    # run differential evolution algorithms
    def solve(self):
        # initialise generation based on individual representation
        population, bounds = self._population_initialisation()
        hdfs.log(str(population))
        print(str(population))
        for _ in range(self.maxiter):
            donor_population = self._mutation(population, bounds)
            trial_population = self._recombination(population, donor_population)
            population = self._selection(population, trial_population)

            new_gen_avg = sum(self._scores)/self.n

            if self.direction == 'max':
                new_gen_best = max(self._scores)
            else:
                new_gen_best = min(self._scores)
            new_gen_best_param = self._parse_back(population[self._scores.index(new_gen_best)])

            hdfs.log("Generation: " + str(self._generation) + " || " + "Average score: " + str(new_gen_avg)+
                  ", best score: " + str(new_gen_best) + "best param: " + str(new_gen_best_param))

            print("Generation: " + str(self._generation) + " || " + "Average score: " + str(new_gen_avg)+
                  ", best score: " + str(new_gen_best) + "best param: " + str(new_gen_best_param))

        parsed_back_population = []
        for indiv in population:
            parsed_back_population.append(self._parse_back(indiv))

        return parsed_back_population, self._scores

    # define bounds of each individual depending on type
    def _individual_representation(self):
        bounds = []

        for index, item in enumerate(self.types):
            b =()
            # if categorical then take bounds from 0 to number of items
            if item == self._types[2]:
                b = (0, int(len(self.parbounds[index]) - 1))
            # if float/int then take given bounds
            else:
                b = self.parbounds[index]
            bounds.append(b)
        return bounds

    # initialise population
    def _population_initialisation(self):
        population = []
        num_parameters = len(self.parbounds)
        for i in range(self.n):
            indiv = []
            bounds = self._individual_representation()

            for i in range(num_parameters):
                indiv.append(random.uniform(bounds[i][0], bounds[i][1]))
            indiv = self._ensure_bounds(indiv, bounds)
            population.append(indiv)
        return population, bounds

    # ensure that any mutated individual is within bounds
    def _ensure_bounds(self, indiv, bounds):
        indiv_correct = []

        for i in range(len(indiv)):
            par = indiv[i]

            # check if param is within bounds
            lowerbound = bounds[i][0]
            upperbound = bounds[i][1]
            if par < lowerbound:
                par = lowerbound
            elif par > upperbound:
                par = upperbound

            # check if param needs rounding
            if self.types[i] != 'float':
                par = int(round(par))
            indiv_correct.append(par)
        return indiv_correct

    # create donor population based on mutation of three vectors
    def _mutation(self, population, bounds):
        donor_population = []
        for i in range(self.n):

            indiv_indices = list(range(0, self.n))
            indiv_indices.remove(i)

            candidates = random.sample(indiv_indices, 3)
            x_1 = population[candidates[0]]
            x_2 = population[candidates[1]]
            x_3 = population[candidates[2]]

            # substracting the second from the third candidate
            x_diff = [x_2_i - x_3_i for x_2_i, x_3_i in zip(x_2, x_3)]
            donor_vec = [x_1_i + self.F*x_diff_i for x_1_i, x_diff_i in zip (x_1, x_diff)]
            donor_vec = self._ensure_bounds(donor_vec, bounds)
            donor_population.append(donor_vec)

        return donor_population

    # recombine donor vectors according to crossover probability
    def _recombination(self, population, donor_population):
        trial_population = []
        for k in range(self.n):
            target_vec = population[k]
            donor_vec = donor_population[k]
            trial_vec = []
            for p in range(len(self.parbounds)):
                crossover = random.random()

                # if random number is below set crossover probability do recombination
                if crossover <= self.CR:
                    trial_vec.append(donor_vec[p])
                else:
                    trial_vec.append(target_vec[p])
            trial_population.append(trial_vec)
        return trial_population

    # select the best individuals from each generation
    def _selection(self, population, trial_population):
        # Calculate trial vectors and target vectors and select next generation

        if self._generation == 0:
            parsed_population = []
            for target_vec in population:
                parsed_target_vec = self._parse_back(target_vec)
                parsed_population.append(parsed_target_vec)

            parsed_population = self._parse_to_dict(parsed_population)
            self._scores = self.objective_function(parsed_population)

        parsed_trial_population = []
        for index, trial_vec in enumerate(trial_population):
            parsed_trial_vec = self._parse_back(trial_vec)
            parsed_trial_population.append(parsed_trial_vec)

        parsed_trial_population =  self._parse_to_dict(parsed_trial_population)
        trial_population_scores = self.objective_function(parsed_trial_population)

        hdfs.log('Pop scores: ' + str(self._scores))
        print('Pop scores: ' + str(self._scores))
        hdfs.log('Trial scores: ' + str(trial_population_scores))
        print('Trial scores: ' + str(trial_population_scores))
        
        for i in range(self.n):
            trial_vec_score_i = trial_population_scores[i]
            target_vec_score_i = self._scores[i]
            if self.direction == 'max':
                if trial_vec_score_i > target_vec_score_i:
                    self._scores[i] = trial_vec_score_i
                    population[i] = trial_population[i]
            else:
                if trial_vec_score_i < target_vec_score_i:
                    self._scores[i] = trial_vec_score_i
                    population[i] = trial_population[i]

        self._generation += 1

        return population
    # parse the converted values back to original
    def _parse_back(self, individual):
        original_representation = []
        for index, parameter in enumerate(individual):
            if self.types[index] == self._types[2]:
                original_representation.append(self.parbounds[index][parameter])
            else:

                original_representation.append(parameter)

        return original_representation

    # for parallelization purposes one can parse the population from a list to a  dictionary format
    # User only has to add the parameters he wants to optimize to population_dict
    def _parse_to_dict(self, population):
        population_dict = {'filters':[],'filters_end':[],'window':[],'window_end':[],'learning_rate': [], 'dropout': [],
                          'epochs':[],'batch_size':[]}
        for indiv in population:
            population_dict['filters'].append(indiv[0])
            population_dict['filters_end'].append(indiv[1])
            population_dict['window'].append(indiv[2])
            population_dict['window_end'].append(indiv[3])
            population_dict['learning_rate'].append(indiv[4])
            population_dict['dropout'].append(indiv[5])
            population_dict['epochs'].append(indiv[6])
            population_dict['batch_size'].append(indiv[7])

        return population_dict

In [None]:
#Observe that, for some combinations of parameters the network might exceed default RAM allocation on Hops

#The parameters can be float, int or cat (categorical, tuple of values), this parameters must be specified in 
#function _parse_to_dict
diff_evo = DifferentialEvolution(execute_all,
                 [(50, 120),(30, 80),(5,20),(2,10),(0.001,0.02),(0.5,0.9),(3,20),(40,60,80,90,100)], 
                 ['int', 'int','int','int','float','float','int','cat'], 
                 direction='max', maxiter=10,popsize=8)

results = diff_evo.solve()

print("Population: ", results[0])
print("Scores: ", results[1])