In [68]:
import numpy as np
import pandas as pd
import random
import tensorflow as tf

## Randomized hyper-parameter optimization

In [69]:
def setup_model(num_layers, k_constr, dropout_rate, input_size, output_size):
    # initialize sequential dnn model
    model = tf.keras.Sequential()
    # initialize lower and upper bounds to randomly pick the number of neurons at each layer
    lb, ub = output_size+1, input_size
    # main loop -> setup dnn model (generate dnn topology)
    topology = []
    for i in range(num_layers):
        ub = random.randint(lb, ub)
        topology.append(ub)
        model.add(tf.keras.layers.Dense(ub,
                                        activation="relu",
                                        kernel_constraint=tf.keras.constraints.max_norm(k_constr),
                                        input_dim=input_size if i == 0 else None))
        model.add(tf.keras.layers.Dropout(dropout_rate))
    # add last layer to dnn (dim output = 1 with linear activation)
    model.add(tf.keras.layers.Dense(output_size,
                                    kernel_constraint=tf.keras.constraints.max_norm(k_constr),
                                    activation="linear"))
    return topology, model

In [70]:
def cv_dnn(model, optimizer, X_train, y_train, epochs=400, batch_size=64, cv=5, val_size=0.3):
    # cross-validate dnn (repeated random subsampling)
    num_val = int(X_train.shape[0] * val_size)
    val_losses = []
    for _ in range(cv):
        shuffle = np.random.permutation(X_train.shape[0])
        x_train_shuf, y_train_shuf = X_train.iloc[shuffle, :], y_train.iloc[shuffle, :]
        x_trn, y_trn = x_train_shuf.iloc[num_val:, :], y_train_shuf.iloc[num_val:, :]
        x_val, y_val = x_train_shuf.iloc[:num_val, :], y_train_shuf.iloc[:num_val, :]
        # compile and fit dnn model                                                                     
        model.compile(optimizer=optimizer, loss="mse") # metrics=["mse"] ideally should be r2_score
        model.fit(x_trn, y_trn, epochs=epochs, batch_size=batch_size, verbose=0)
        val_loss = model.evaluate(x_val, y_val, verbose=0)
        val_losses.append(val_loss)
    return np.mean(val_losses)

In [71]:
k_opt = tf.keras.optimizers
OPTIMIZERS = {"adam": k_opt.Adam(),
              "rmsprop": k_opt.RMSprop(),
              "sgd": k_opt.SGD(),
              "sgd_momentum": k_opt.SGD()}

In [72]:
def dnn_randomized_opt(param_grid, X_train, y_train, n_iter=10, verbose=True):
    # get input and output sizes
    input_size, output_size = X_train.shape[1], y_train.shape[1]
    # set up search space for hyper-parameter optimization
    num_layers = param_grid["num_layers"] if "num_layers" in param_grid else [4]
    k_constrs = param_grid["kernel_constraint"] if "kernel_constraint" in param_grid else [np.inf]
    dropouts = param_grid["dropout"] if "dropout" in param_grid else [0.0]
    lrs = param_grid["learning_rate"] if "learning_rate" in param_grid else [0.001]
    algs = param_grid["algorithm"] if "algorithm" in param_grid else ["adam"]
    mms = param_grid["momentum"] if "momentum" in param_grid else [0.9]
    # initialize best_loss
    best_loss = np.inf
    # main loop -> optimize dnn hyper-parameters
    for i in range(n_iter):
        # randomly choose the number of dense layers, kernel_constraint and dropout rate -> to setup dnn model
        init_ = random.choice(num_layers), random.choice(k_constrs), random.choice(dropouts)
        n_layers, k_constr, dropout_rate = init_
        topo, dnn_init = setup_model(*init_, input_size, output_size)
        # randomly pick learning rate, optimizer and momentum (if applicable), and cross-validate dnn model
        alg, lr = random.choice(algs), random.choice(lrs)
        optimizer = OPTIMIZERS[alg]
        setattr(optimizer, "lr", lr)
        if alg == "sgd_momentum":
            mm = random.choice(mms)
            setattr(optimizer, "momentum", mm)
        val_loss = cv_dnn(dnn_init, optimizer, X_train, y_train)
        # print iter information if verbose
        if verbose:
            # before printing, assign mm (momentum) if the optimizer is not "sgd_momentum"
            if alg != "sgd_momentum": mm = "NA"
            print(f"ITERATION {i+1}\nTopology: {topo}\nKernel constraint: {k_constr}\nDropout: {dropout_rate}\n"
                  f"Algorithm: {alg}\nLearning rate: {lr}\nMomentum: {mm}\nValidation loss: {val_loss}\n")
        # update best_acc and best_config if the attained result is the best so far
        if val_loss < best_loss:
            best_loss = val_loss
            best_config = (topo, k_constr, dropout_rate, alg, lr, mm)
    # "cast" best_config tuple to dictionary
    topo, k_constr, dropout_rate, alg, lr, mm = best_config
    best_config = {"topology": topo, "kernel_constraint": k_constr, "dropout": dropout_rate,
                   "algorithm": alg, "learning_rate": lr, "momentum": mm}
    return best_loss, best_config