In [None]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import matplotlib as mpl
import matplotlib.ticker as ticker
import pandas as pd
import seaborn as sns
import h5py
from pathlib import Path
import time
from tqdm import tqdm
import csv
import os

from sklearn.model_selection import train_test_split, GridSearchCV, RandomizedSearchCV, RepeatedKFold, cross_val_score
from sklearn.svm import SVR
from sklearn.multioutput import MultiOutputRegressor
from sklearn.metrics import classification_report, mean_squared_error, mean_absolute_error, r2_score
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import RFECV
from sklearn.pipeline import make_pipeline
from sklearn.ensemble import RandomForestRegressor
from sklearn.neighbors import KNeighborsRegressor
from sklearn.neural_network import MLPRegressor
from sklearn.inspection import permutation_importance

import tensorflow as tf
from tensorflow import keras

from tensorflow.keras.wrappers.scikit_learn import KerasRegressor
from tensorflow.keras import backend as K
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Conv1D, LSTM, Bidirectional
from tensorflow.keras.layers import MaxPooling1D, GlobalAveragePooling1D, AveragePooling1D
from tensorflow.keras.layers import Flatten, Dropout, BatchNormalization
from tensorflow.keras import activations

%matplotlib inline
plt.rcParams['font.size'] = 16
plt.rcParams['savefig.facecolor'] = 'white'

In [None]:
print(f'tf.test.is_built_with_cuda(): {tf.test.is_built_with_cuda()}');
# print(f'tf.test.is_gpu_available(): {tf.test.is_gpu_available()}')

if tf.__version__[0] == '1':
    print(f'tf.config.experimental_list_devices(): {tf.config.experimental_list_devices()}');
else:
    print(f'tf.config.list_physical_devices("GPU"): {tf.config.list_physical_devices("GPU")}');
    
print(f'tf.test.gpu_device_name(): {tf.test.gpu_device_name()}');

In [None]:
# randst = np.random.randint(0,100)
randst = 94
print(randst)

In [None]:
verbosity = 0

# Define general functions

In [None]:
def print_my_results(results_mae, results_mae_per, results_r2, results_r2_per):
    
    print('MAE:')
    print('\t%.3f (%.3f) overall' % (np.mean(results_mae), np.std(results_mae)))
    for mae,std,idx in zip(np.mean(results_mae_per, axis=0), np.std(results_mae_per, axis=0), idx_params):
        print('\t%.3f (%.3f) %s' % (mae, std, labels[idx]) )
    print('R2:')
    print('\t%.3f (%.3f) overall' %(np.mean(results_r2), np.std(results_r2)))
    for r2,std,idx in zip(np.mean(results_r2_per, axis=0), np.std(results_r2_per, axis=0), idx_params):
        print('\t%.3f (%.3f) %s' % (r2, std, labels[idx]) )
        

# Import and partition data

## Import data and test case indices

In [None]:
name = "n4130"
here = Path.cwd()
data_path = Path(here.joinpath(f"data/data_control_{name}.h5"))

with h5py.File(data_path, 'r') as f:
    Datasetnames=f.keys()
    print(*list(Datasetnames), sep = "\n")
    trace = f['trace'][:,:200,:] # select time 0-200
    t = f['time'][...]
    adj_factors = f['adjustment_factors'][...]
    cost_terms = f['cost_terms'][...]
    
if trace.shape[0] != adj_factors.shape[0]:
    print('Number of samples do not match for trace and adj_factors!')

print("Number of traces:", trace.shape[0])
labels = ["g_Kr","g_CaL","lambda_B","g_NaCa","g_K1","J_SERCA_bar","lambda_diff","lambda_RyR","g_bCa","g_Na","g_NaL"]

idx_all = list(np.arange(0,trace.shape[0]))
idx_test = list(np.loadtxt(here.joinpath("data/idx_key_p11_s100_n5000_ns50.txt"), dtype=int))
idx_train = list(set(idx_all) - set(idx_test))

## Partition data
Test cases are pre-selected for out-of-sample testing

In [None]:
idx_params = [0,1,4,9,10]
trace_train = trace[idx_train,:,:]
trace_test = trace[idx_test,:,:]
af_train = adj_factors[idx_train,:][:,idx_params]
af_test = adj_factors[idx_test,:][:,idx_params]

# Parameter tuning for kNN, RF, SVM

## k-Nearest Neighbors
https://scikit-learn.org/stable/modules/generated/sklearn.neighbors.KNeighborsRegressor.html

In [None]:
X = np.concatenate((trace_train[:,:,0],trace_train[:,:,1]),axis=1)
y = af_train
scaler = StandardScaler()
X = scaler.fit_transform(X)

print('X shape:',X.shape)
print('Feature shape:',y.shape)

In [None]:
knn_grid = {"n_neighbors": np.arange(1, 20),
            "weights": ['uniform', 'distance'],
            "p": [1,2],
              }

knn_base = KNeighborsRegressor()
knn_gscv = GridSearchCV(estimator = knn_base, param_grid = knn_grid,
                        cv=5, verbose=2,
                        n_jobs=-1)
knn_gscv.fit(X, y)

print(knn_gscv.best_params_)

## Random Forest
https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestRegressor.html

https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.RandomizedSearchCV.html

In [None]:
X = np.concatenate((trace_train[:,:,0],trace_train[:,:,1]),axis=1)
y = af_train
scaler = StandardScaler()
X = scaler.fit_transform(X)

print('X shape:',X.shape)
print('Feature shape:',y.shape)

In [None]:
# Number of trees in Random Forest
rf_n_estimators = [int(x) for x in np.linspace(200, 1000, 5)]
rf_n_estimators.append(1500)
rf_n_estimators.append(2000)

# Maximum number of levels in tree
rf_max_depth = [int(x) for x in np.linspace(5, 55, 6)]
rf_max_depth.append(None)

# Number of features to consider at every split
# rf_max_features = ['auto', 'sqrt', 'log2']
rf_max_features = ['log2']

# Criterion to split on
rf_criterion = ['absolute_error']

# Minimum number of samples required to split a node
rf_min_samples_split = [int(x) for x in np.linspace(2, 10, 9)]

# Minimum decrease in impurity required for split to happen
rf_min_impurity_decrease = [0.0, 0.05, 0.1]

# Method of selecting samples for training each tree
rf_bootstrap = [True, False]

# Create the grid
rf_grid = {'n_estimators': rf_n_estimators,
               'max_depth': rf_max_depth,
               'max_features': rf_max_features,
               'criterion': rf_criterion,
               'min_samples_split': rf_min_samples_split,
               'min_impurity_decrease': rf_min_impurity_decrease,
               'bootstrap': rf_bootstrap}

rf_base = RandomForestRegressor()

# Create the random search Random Forest
rf_gscv = RandomizedSearchCV(estimator = rf_base, param_distributions = rf_grid,
                               n_iter=2, cv = 5, verbose = 2,
                               n_jobs = -1)

# Fit the random search model
rf_gscv.fit(X, y)

# View the best parameters from the random search
print(rf_gscv.best_params_)
with open("rfr_cv_output.txt","w") as f:
    f.write(rf_gscv.best_params_)


## Support Vector
https://scikit-learn.org/stable/modules/generated/sklearn.svm.SVR.html

In [None]:
X = np.concatenate((trace_train[:,:,0],trace_train[:,:,1]),axis=1)
y = af_train
scaler = StandardScaler()
X = scaler.fit_transform(X)

print('X shape:',X.shape)
print('Feature shape:',y.shape)

In [None]:
gamma_range = list(np.logspace(-6, 3, 9))
gamma_range.append("scale")
gamma_range.append("auto")


svr_grid = {
#     "estimator__kernel": ["linear", "poly", "rbf", "sigmoid"],
    "estimator__kernel": ["rbf"],
    "estimator__gamma": ["scale", "auto"],
    "estimator__C": [1, 10, 100, 1000],
    "estimator__epsilon": [0.001, 0.01, 0.1, 1, 10],
#     "estimator__shrinking": [True, False]
}

start = time.time()
svr_base = MultiOutputRegressor(SVR())
svr_gscv = GridSearchCV(estimator = svr_base, param_grid = svr_grid,
                        cv=3, verbose=2,
                        n_jobs=-1)
svr_gscv.fit(X, y)
stop = time.time()
print(svr_gscv.best_params_)
print(stop-start)

## Define best parameters for kNN, SVR

In [None]:
knn_best = {'n_neighbors': 7, 'p': 2, 'weights': 'distance'}

svr_best = {'C': 1, 'epsilon': 0.01, 'gamma': 'auto', 'kernel': 'rbf'}

# Evaluate models using k-fold cross-validation

https://scikit-learn.org/stable/modules/learning_curve.html


## k-Nearest Neighbor

### Prep data

In [None]:
X = np.concatenate((trace_train[:,:,0],trace_train[:,:,1]),axis=1)
y = af_train

print('X shape:',X.shape)
print('Feature shape:',y.shape)

### Define model

In [None]:
def get_model_knn(**kwargs):
    model = KNeighborsRegressor(**kwargs,
                               n_jobs=-1)
    return model

def evaluate_model_knn(X, y):
    results_mae = list()
    results_mae_per = list()
    results_r2 = list()
    results_r2_per = list()
    knn_train_error = list()
    # define evaluation procedure
    cv = RepeatedKFold(n_splits=5, n_repeats=3, random_state=randst)
    # enumerate folds
    for train_ix, test_ix in cv.split(X):
        # prepare data
        X_train, X_test = X[train_ix], X[test_ix]
        y_train, y_test = y[train_ix], y[test_ix]
        scaler = StandardScaler()
        X_train = scaler.fit_transform(X_train)
        X_test = scaler.transform(X_test)
        # define model
        model = get_model_knn(**knn_best)
        # fit model
        model.fit(X_train, y_train)
        # evaluate model on test set
        y_hat = model.predict(X_test)
        r2_per = r2_score(y_test, y_hat, multioutput='raw_values')
        r2 = r2_score(y_test, y_hat)
        mae = mean_absolute_error(y_test, y_hat)
        mae_per = mean_absolute_error(y_test, y_hat, multioutput='raw_values')
        knn_train_error.append(mean_absolute_error(y_train, model.predict(X_train)))
        # store result
        print('>%.3f' % mae)
        results_mae.append(mae)
        results_mae_per.append(mae_per)
        results_r2.append(r2)
        results_r2_per.append(r2_per)
    return results_mae, results_mae_per, results_r2, results_r2_per, knn_train_score

### Run model with k-fold cross-validation

In [None]:
start = time.time()
results_mae, results_mae_per, results_r2, results_r2_per, knn_trained_error = evaluate_model_knn(X, y)
stop = time.time()
print('Time of execution: %f' % (stop-start))

print_my_results(results_mae, results_mae_per, results_r2, results_r2_per)

### Evaluate ratio of training to validation error

In [None]:
ratio_knn = np.zeros(5)
for i in range(5):
    ratio_knn[i] = results_knn[i]/knn_trained_error[i].history["mae"][-1]

## Random Forest

### Prep data

In [None]:
X = np.concatenate((trace_train[:,:,0],trace_train[:,:,1]),axis=1)
y = af_train

print('X shape:',X.shape)
print('Feature shape:',y.shape)

### Define model using best_params

In [None]:
def get_model_rfr(**kwargs):
    model = RandomForestRegressor(**kwargs)
    return model

def evaluate_model_rfr(X, y):
    results_mae = list()
    results_mae_per = list()
    results_r2 = list()
    results_r2_per = list()
    # define evaluation procedure
    cv = RepeatedKFold(n_splits=5, n_repeats=3, random_state=randst)
    # enumerate folds
    for train_ix, test_ix in cv.split(X):
        # prepare data
        X_train, X_test = X[train_ix], X[test_ix]
        y_train, y_test = y[train_ix], y[test_ix]
        scaler = StandardScaler()
        X_train = scaler.fit_transform(X_train)
        X_test = scaler.transform(X_test)
        # define model
        model = get_model_rfr(**rf_gscv.best_params_)
        # fit model
        model.fit(X_train, y_train)
        # evaluate model on test set
        y_hat = model.predict(X_test)
        r2_per = r2_score(y_test,y_hat,multioutput='raw_values')
        r2 = r2_score(y_test,y_hat)
        mae = mean_absolute_error(y_test, y_hat)
        mae_per = mean_absolute_error(y_test, y_hat, multioutput='raw_values')
        # store result
        print('>%.3f' % mae)
        results_mae.append(mae)
        results_mae_per.append(mae_per)
        results_r2.append(r2)
        results_r2_per.append(r2_per)
    return results_mae, results_mae_per, results_r2, results_r2_per

### Run model with k-fold cross-validation

In [None]:
start = time.time()
results_mae, results_mae_per, results_r2, results_r2_per = evaluate_model_rfr(X, y)
stop = time.time()
print('Time of execution: %f' % (np.divide(stop-start, 60))

print_my_results(results_mae, results_mae_per, results_r2, results_r2_per)

## Support Vector Regression

### Prep data

In [None]:
X = np.concatenate((trace_train[:,:,0],trace_train[:,:,1]),axis=1)
y = af_train

print('X shape:',X.shape)
print('Feature shape:',y.shape)

### Define model

In [None]:
def get_model_svr(**kwargs):
    svr = SVR(**kwargs)

    model = MultiOutputRegressor(svr)
    return model

def evaluate_model_svr(X, y):
    results_mae = list()
    results_mae_per = list()
    results_r2 = list()
    results_r2_per = list()
    svr_train_error = list()
    # define evaluation procedure
    cv = RepeatedKFold(n_splits=5, n_repeats=3, random_state=randst)
    # enumerate folds
    for train_ix, test_ix in cv.split(X):
        # prepare data
        X_train, X_test = X[train_ix], X[test_ix]
        y_train, y_test = y[train_ix], y[test_ix]
        scaler = StandardScaler()
        X_train = scaler.fit_transform(X_train)
        X_test = scaler.transform(X_test)
        # define model
        model = get_model_svr(**svr_best)
        # fit model
        model.fit(X_train, y_train)
        # evaluate model on test set
        y_hat = model.predict(X_test)
        r2_per = r2_score(y_test,y_hat,multioutput='raw_values')
        r2 = r2_score(y_test,y_hat)
        mae = mean_absolute_error(y_test, y_hat)
        mae_per = mean_absolute_error(y_test, y_hat, multioutput='raw_values')
        svr_train_error.append(mean_absolute_error(y_train, model.predict(X_train)))
        # store result
        print('>%.3f' % mae)
        results_mae.append(mae)
        results_mae_per.append(mae_per)
        results_r2.append(r2)
        results_r2_per.append(r2_per)
    return results_mae, results_mae_per, results_r2, results_r2_per, svr_train_error

### Run model with k-fold cross-validation

In [None]:
start = time.time()
results_mae, results_mae_per, results_r2, results_r2_per, svr_train_error = evaluate_model_svr(X, y)
stop = time.time()
print('Time of execution: %f' % (np.divide(stop-start, 60)))

print_my_results(results_mae, results_mae_per, results_r2, results_r2_per)

### Evaluate ratio of training and validation error (last epoch)

In [None]:
svr_validate = np.copy(results_mae)
results_svr = np.zeros((15,3))
for i in range(15):
    results_svr[i,0] = svr_train_error[i]
    results_svr[i,1] = svr_validate[i]
    results_svr[i,2] = svr_validate[i]/svr_train_error[i]
    
np.savetxt("svr_train_test_loss.csv", results_svr, delimiter=',')

## Multi-Layer Perceptron

### Prep data

In [None]:
X = np.concatenate((trace_train[:,:,0],trace_train[:,:,1]),axis=1)
y = af_train

print('X shape:',X.shape)
print('Feature shape:',y.shape)

### Define model

In [None]:
def get_model_mlp1(n_inputs, n_outputs):
    model = Sequential()
    model.add(Dense(500, input_dim=n_inputs, kernel_initializer='he_uniform', activation=activations.swish))
    model.add(Dense(n_outputs))
    model.compile(loss='mae', optimizer='adam', metrics="mae")
    return model

def get_model_mlp3(n_inputs, n_outputs):
    model = Sequential()
    model.add(Dense(500, input_dim=n_inputs, kernel_initializer='he_uniform', activation=activations.swish))
    model.add(Dense(500, activation=activations.swish))
    model.add(Dense(500, activation=activations.swish))
    model.add(Dense(n_outputs))
    model.compile(loss='mae', optimizer='adam', metrics="mae")
    return model

def evaluate_model_mlp(X, y):
    results_mae = list()
    results_mae_per = list()
    results_r2 = list()
    results_r2_per = list()
    mlp_trained = list()
    n_inputs, n_outputs = X.shape[1], y.shape[1]
    # define evaluation procedure
    cv = RepeatedKFold(n_splits=5, n_repeats=3, random_state=randst)
    # enumerate folds
    for train_ix, test_ix in cv.split(X):
        X_train, X_test = X[train_ix], X[test_ix]
        y_train, y_test = y[train_ix], y[test_ix]
        scaler = StandardScaler()
        X_train = scaler.fit_transform(X_train)
        X_test = scaler.transform(X_test)
        model = get_model_mlp3(n_inputs, n_outputs) # <<<<< SPECIFY WHICH MLP MODEL
        
        mlp_trained.append(model.fit(X_train, y_train, validation_split = 0.1, shuffle = False, epochs=200,
                                      verbose=verbosity
                                     )
                          )
        
        y_hat = model.predict(X_test)
        r2_per = r2_score(y_test,y_hat,multioutput='raw_values')
        r2 = r2_score(y_test,y_hat)
        mae = mean_absolute_error(y_test, y_hat)
        mae_per = mean_absolute_error(y_test, y_hat, multioutput='raw_values')
        print('>%.3f' % mae)
        results_mae.append(mae)
        results_mae_per.append(mae_per)
        results_r2.append(r2)
        results_r2_per.append(r2_per)
    return results_mae, results_mae_per, results_r2, results_r2_per, mlp_trained

### Run model with k-fold cross-validation

In [None]:
start = time.time()
results_mae, results_mae_per, results_r2, results_r2_per, mlp_trained = evaluate_model_mlp(X, y)
stop = time.time()
print('Time of execution: %f' % (np.divide(stop-start, 60)))

print_my_results(results_mae, results_mae_per, results_r2, results_r2_per)

### Evaluate ratio of training and validation error (last epoch)

In [None]:
mlp_validate = np.copy(results_mae)
results_mlp = np.zeros((15,3))
for i in range(15):
    results_mlp[i,0] = mlp_trained[i].history["mae"][-1]
    results_mlp[i,1] = mlp_validate[i]
    results_mlp[i,2] = mlp_validate[i]/mlp_trained[i].history["mae"][-1]
    
np.savetxt("mlp_train_test_loss.csv", results_mlp, delimiter=',')
    

### Plot learning curves

In [None]:
fig, axs = plt.subplots(3,5,figsize=(14,8), sharey = True, sharex = True)
axs = axs.T.flatten()
for i, ax in enumerate(axs):
    ax.plot(mlp_trained[i].history["loss"])
    ax.plot(mlp_trained[i].history["val_loss"])
    ax.set_xticks([0,100,200])
    if i in [0,3,6,9,12,15]:
        ax.set_title(f"Fold {i//3+1:d}")
        
axs[-1].legend(["Train","Validation"], prop={'size': 12})
fig.supxlabel('Epoch')
fig.supylabel('MAE')

plt.savefig("learning_curve_mlp.png", dpi=300, bbox_inches="tight")

## Convolutional Neural Network

### Prep data

In [None]:
X = np.copy(trace_train)
y = af_train

print('X shape:',X.shape)
print('Feature shape:',y.shape)

### Define model

In [None]:
def get_model_cnn(n_inputs, n_outputs):
    model = Sequential()
    model.add(Conv1D(filters=64, kernel_size=3, input_shape=n_inputs))
    model.add(MaxPooling1D(pool_size=2))
    model.add(Flatten())
    model.add(Dense(100, activation=activations.swish))
    model.add(Dense(n_outputs))
    model.compile(loss='mae', optimizer='adam', metrics="mae")
    return model

def evaluate_model_cnn(X, y):
    results_mae = list()
    results_mae_per = list()
    results_r2 = list()
    results_r2_per = list()
    cnn_trained = list()
    n_inputs, n_outputs = X.shape[1:], y.shape[1]
    # define evaluation procedure
    cv = RepeatedKFold(n_splits=5, n_repeats=3, random_state=randst)
    # enumerate folds
    for train_ix, test_ix in cv.split(X):
        X_train, X_test = X[train_ix], X[test_ix]
        y_train, y_test = y[train_ix], y[test_ix]
        scalers = {}
        for i in range(X_train.shape[2]):
            scalers[i] = StandardScaler()
            X_train[:, :, i] = scalers[i].fit_transform(X_train[:, :, i])
        for i in range(X_test.shape[2]):
            X_test[:, :, i] = scalers[i].transform(X_test[:, :, i])
        model = get_model_cnn(n_inputs, n_outputs)
        
        cnn_trained.append(model.fit(X_train, y_train, validation_split = 0.1, shuffle = False, epochs=200,
                                      verbose=verbosity
                                     )
                          )
        
        y_hat = model.predict(X_test)
        r2_per = r2_score(y_test,y_hat,multioutput='raw_values')
        r2 = r2_score(y_test,y_hat)
        mae = mean_absolute_error(y_test, y_hat)
        mae_per = mean_absolute_error(y_test, y_hat, multioutput='raw_values')
        print('>%.3f' % mae)
        results_mae.append(mae)
        results_mae_per.append(mae_per)
        results_r2.append(r2)
        results_r2_per.append(r2_per)
    return results_mae, results_mae_per, results_r2, results_r2_per, cnn_trained

### Run model with k-fold cross-validation

In [None]:
start = time.time()
results_mae, results_mae_per, results_r2, results_r2_per, cnn_trained = evaluate_model_cnn(X, y)
stop = time.time()
print('Time of execution: %f' % (np.divide(stop-start, 60)))

print_my_results(results_mae, results_mae_per, results_r2, results_r2_per)

### Evaluate ratio of training and validation error (last epoch)

In [None]:
cnn_validate = np.copy(results_mae)
results_cnn = np.zeros((15,3))
for i in range(15):
    results_cnn[i,0] = cnn_trained[i].history["loss"][-1]
    results_cnn[i,1] = cnn_validate[i]
    results_cnn[i,2] = cnn_validate[i]/cnn_trained[i].history["loss"][-1]
    
np.savetxt("cnn_train_test_loss.csv", results_cnn, delimiter=',')
    

### Plot learning curves

In [None]:
fig, axs = plt.subplots(3,5,figsize=(14,8), sharey = True, sharex = True)
axs = axs.T.flatten()
for i, ax in enumerate(axs):
    ax.plot(cnn_trained[i].history["loss"])
    ax.plot(cnn_trained[i].history["val_loss"])
    ax.set_xticks([0,100,200])
    if i in [0,3,6,9,12,15]:
        ax.set_title(f"Fold {i//3+1:d}")
        
axs[-1].legend(["Train","Validation"], prop={'size': 12})
fig.supxlabel('Epoch')
fig.supylabel('MAE')

plt.savefig("learning_curve_cnn.png", dpi=300, bbox_inches="tight")

## Fully Convolutional Neural Networks (FCN)

### Prep data

In [None]:
X = np.copy(trace_train)
y = af_train

print('X shape:',X.shape)
print('Feature shape:',y.shape)

### Define model

In [None]:
def get_model_fcn(n_inputs, n_outputs):
    x = keras.layers.Input(n_inputs)
    drop_out = Dropout(0.1)(x)
    conv1 = keras.layers.Conv1D(filters=64, kernel_size=8, input_shape=n_inputs, padding='same')(x) # default filter 128
    conv1 = keras.layers.BatchNormalization()(conv1)
    conv1 = keras.layers.Activation(activations.swish)(conv1)

    drop_out = Dropout(0.1)(conv1)
    conv2 = keras.layers.Conv1D(filters=128, kernel_size=5, padding='same')(conv1) # default filter 256
    conv2 = keras.layers.BatchNormalization()(conv2)
    conv2 = keras.layers.Activation(activations.swish)(conv2)

    drop_out = Dropout(0.1)(conv2)
    conv3 = keras.layers.Conv1D(filters=64, kernel_size=3, padding='same')(conv2) # default filter 128
    conv3 = keras.layers.BatchNormalization()(conv3)
    conv3 = keras.layers.Activation(activations.swish)(conv3)

    full = keras.layers.GlobalAveragePooling1D()(conv3)
#     full = keras.layers.GlobalMaxPooling1D()(conv3)
    out = keras.layers.Dense(n_outputs)(full)
    model = keras.models.Model(inputs=x, outputs=out)

    optimizer = keras.optimizers.Adam()
    model.compile(loss='mae',
                  optimizer=optimizer,
                  metrics=['mae'])
    return model

def get_model_fcn_2(n_inputs, n_outputs):
    model = Sequential()
#     model.add(Dropout(0.1))
    model.add(Conv1D(filters=128, kernel_size=8, input_shape=n_inputs, padding='same'))
    model.add(BatchNormalization())
    model.add(keras.layers.Activation('relu'))
    
#     model.add(Dropout(0.1))
    model.add(Conv1D(filters=256, kernel_size=5, input_shape=n_inputs, padding='same'))
    model.add(BatchNormalization())
    model.add(keras.layers.Activation('relu'))
    
#     model.add(Dropout(0.1))
    model.add(Conv1D(filters=128, kernel_size=3, input_shape=n_inputs, padding='same'))
    model.add(BatchNormalization())
    model.add(keras.layers.Activation('relu'))
    
    model.add(GlobalAveragePooling1D())
#     model.add(MaxPooling1D())
#     model.add(AveragePooling1D())
    model.add(Dense(n_outputs))
    model.compile(loss='mae', optimizer='adam')
    return model
    
def evaluate_model_fcn(X, y):
    results_mae = list()
    results_mae_per = list()
    results_r2 = list()
    results_r2_per = list()
    fcn_trained = list()
    n_inputs, n_outputs = X.shape[1:], y.shape[1]
    cv = RepeatedKFold(n_splits=5, n_repeats=3, random_state=randst)
    for train_ix, test_ix in cv.split(X):
        X_train, X_test = X[train_ix,:,:], X[test_ix,:,:]
        y_train, y_test = y[train_ix], y[test_ix]
        nbatch, n_time, n_channel  = X_train.shape[0], X_train.shape[1], X_train.shape[2]
        scalers = {}
        for i in range(X_train.shape[2]):
            scalers[i] = StandardScaler()
            X_train[:, :, i] = scalers[i].fit_transform(X_train[:, :, i])
        for i in range(X_test.shape[2]):
            X_test[:, :, i] = scalers[i].transform(X_test[:, :, i])
        model = get_model_fcn(n_inputs, n_outputs)
        
        fcn_trained.append(model.fit(X_train, y_train,
                                     validation_split = 0.1,
                                     shuffle = True,
                                     epochs=300,
                                     verbose=verbosity
                                     )
                          )
        
        y_hat = model.predict(X_test)
        r2_per = r2_score(y_test,y_hat,multioutput='raw_values')
        r2 = r2_score(y_test,y_hat)
        mae = mean_absolute_error(y_test, y_hat)
        mae_per = mean_absolute_error(y_test, y_hat, multioutput='raw_values')
        print('>%.3f' % mae)
        results_mae.append(mae)
        results_mae_per.append(mae_per)
        results_r2.append(r2)
        results_r2_per.append(r2_per)
    return results_mae, results_mae_per, results_r2, results_r2_per, fcn_trained


### Run model with k-fold cross-validation

In [None]:
start = time.time()
results_mae, results_mae_per, results_r2, results_r2_per, fcn_trained = evaluate_model_fcn(X, y)
keras.backend.clear_session()
stop = time.time()
print('Time of execution: %f' % (np.divide(stop-start, 60)))

print_my_results(results_mae, results_mae_per, results_r2, results_r2_per)

### Evaluate ratio of training and validation error (last epoch)

In [None]:
fcn_validate = np.copy(results_mae)
results_fcn = np.zeros((15,3))
for i in range(15):
    results_fcn[i,0] = fcn_trained[i].history["mae"][-1]
    results_fcn[i,1] = fcn_validate[i]
    results_fcn[i,2] = fcn_validate[i]/fcn_trained[i].history["mae"][-1]
    
np.savetxt("fcn_train_test_loss.csv", results_fcn)

### Plot learning curves

In [None]:
fig, axs = plt.subplots(3,5,figsize=(14,8), sharey = True, sharex = True)
axs = axs.T.flatten()
for i, ax in enumerate(axs):
    ax.plot(fcn_trained[i].history["loss"])
    ax.plot(fcn_trained[i].history["val_loss"])
    ax.set_xticks([0,100,200])
    if i in [0,3,6,9,12,15]:
        ax.set_title(f"Fold {i//3+1:d}")
        
axs[-1].legend(["Train","Validation"], prop={'size': 12})
fig.supxlabel('Epoch')
fig.supylabel('MAE')

plt.savefig("learning_curve_fcn_300epoch.png", dpi=300, bbox_inches="tight")

# Final train and evaluation on target test cases

## kNN

In [None]:
X_train = np.concatenate((trace_train[:,:,0],trace_train[:,:,1]),axis=1)
X_test = np.concatenate((trace_test[:,:,0],trace_test[:,:,1]),axis=1)
y_train = af_train
y_test = af_test

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transformformsform(X_test)

In [None]:
model = get_model_knn()

t_train_start = time.time()
model.fit(X_train, y_train)
t_train_stop = time.time()

t_test_start = time.time()
y_hat = model.predict(X_test)
t_test_stop = time.time()

print("Time to train model: ", t_train_stop-t_train_start)
print("Time to test model: ", t_test_stop-t_test_start)

## Random Forest

In [None]:
X_train = np.concatenate((trace_train[:,:,0],trace_train[:,:,1]),axis=1)
X_test = np.concatenate((trace_test[:,:,0],trace_test[:,:,1]),axis=1)
y_train = af_train
y_test = af_test

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

In [None]:
model = get_model_rfr()

t_train_start = time.time()
model.fit(X_train, y_train)
t_train_stop = time.time()

t_test_start = time.time()
y_hat = model.predict(X_test)
t_test_stop = time.time()

print("Time to train model: ", t_train_stop-t_train_start)
print("Time to test model: ", t_test_stop-t_test_start)

## Support Vector

In [None]:
X_train = np.concatenate((trace_train[:,:,0],trace_train[:,:,1]),axis=1)
X_test = np.concatenate((trace_test[:,:,0],trace_test[:,:,1]),axis=1)
y_train = af_train
y_test = af_test

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

In [None]:
model = get_model_svr()

t_train_start = time.time()
model.fit(X_train, y_train)
t_train_stop = time.time()

t_test_start = time.time()
y_hat = model.predict(X_test)
t_test_stop = time.time()

print("Time to train model: ", t_train_stop-t_train_start)
print("Time to test model: ", t_test_stop-t_test_start)

## MLP

In [None]:
X_train = np.concatenate((trace_train[:,:,0],trace_train[:,:,1]),axis=1)
X_test = np.concatenate((trace_test[:,:,0],trace_test[:,:,1]),axis=1)
y_train = af_train
y_test = af_test

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

In [None]:
n_inputs, n_outputs = X_train.shape[1], y_train.shape[1]

model = get_model_mlp1(n_inputs, n_outputs)

t_train_start = time.time()
model.fit(X_train, y_train, epochs=200, verbose=0)
t_train_stop = time.time()

t_test_start = time.time()
y_hat = model.predict(X_test)
t_test_stop = time.time()

print("Time to train model: ", t_train_stop-t_train_start)
print("Time to test model: ", t_test_stop-t_test_start)

## CNN

In [None]:
X_train = trace_train
X_test = trace_test
y_train = af_train
y_test = af_test

scalers = {}
for i in range(X_train.shape[2]):
    scalers[i] = StandardScaler()
    X_train[:, :, i] = scalers[i].fit_transform(X_train[:, :, i])
for i in range(X_test.shape[2]):
    X_test[:, :, i] = scalers[i].transform(X_test[:, :, i])

In [None]:
n_inputs, n_outputs = X_train.shape[1:], y_train.shape[1]

model = get_model_cnn(n_inputs, n_outputs)

t_train_start = time.time()
model.fit(X_train, y_train, epochs=200, verbose=0)
t_train_stop = time.time()

t_test_start = time.time()
y_hat = model.predict(X_test)
t_test_stop = time.time()

print("Time to train model: ", t_train_stop-t_train_start)
print("Time to test model: ", t_test_stop-t_test_start)

## FCN

In [None]:
X_train = trace_train
X_test = trace_test
y_train = af_train
y_test = af_test

scalers = {}
for i in range(X_train.shape[2]):
    scalers[i] = StandardScaler()
    X_train[:, :, i] = scalers[i].fit_transform(X_train[:, :, i])
for i in range(X_test.shape[2]):
    X_test[:, :, i] = scalers[i].transform(X_test[:, :, i])

In [None]:
n_inputs, n_outputs = X_train.shape[1:], y_train.shape[1]

model = get_model_fcn(n_inputs, n_outputs)
model.fit(X_train, y_train, epochs=200, verbose=0)
y_hat = model.predict(X_test)

# Validation curves

In [None]:
from sklearn.model_selection import learning_curve

train_sizes, train_scores, valid_scores = learning_curve(SVC(kernel='linear'), X, y, train_sizes=[50, 80, 110], cv=5)