In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow.keras.layers as tfl  
from tensorflow.python.framework import ops 
import keras_tuner as kt

In [None]:
#Normalize an array
def fn_normalize_col(x):
    if x.dtype == 'object' :
        return x
    return (x - np.mean(x))/np.std(x)

#Convert array to tensor and specify shape
def fn_target_to_tensor(data) :
    return tf.reshape(tf.convert_to_tensor(data),[data.shape[0],1])
 
#Get and normalize data
def fn_get_fusion_coral_data(cols_X1, cols_X2, cols_X3, cols_Y, cols_norm, train_prob = 0.9, seed = 0) : 
    
    data = pd.read_csv(
    "data//coral_total_cover.csv",header=0).dropna()
    
    X1 = data.copy().filter(cols_X1)
    X2 = data.copy().filter(cols_X2)
    X3 = data.copy().filter(cols_X3)
    
    Y = data.copy().filter(cols_Y)
   
    Y = (Y > 8) * 1 
    X1[["o_ocean"]] = (X1[["o_ocean"]] == "CARIB" ) * 1
    
    X1 = X1.apply(fn_normalize_col,axis=0)
    X2 = X2.apply(fn_normalize_col,axis=0)
    X3 = X3.apply(fn_normalize_col,axis=0)
    
    return fn_fusion_split_data(X1,X2,X3, Y, train_prob, seed) 

#Split data into training and test sets using a defined probability
def fn_fusion_split_data(X1,X2,X3,Y, train_prob = 0.9, seed = 0) : 
    
    X1_train = X1.sample(frac=train_prob, random_state=seed) 
    X1_test   = X1.drop(X1_train.index)
    X2_train = X2.loc[X1_train.index] 
    X2_test = X2.drop(X1_train.index) 
    X3_train = X3.loc[X1_train.index] 
    X3_test = X3.drop(X1_train.index) 
    
    Y_train = Y.loc[X1_train.index] 
    Y_test = Y.drop(X1_train.index) 
    
    X1_train=tf.convert_to_tensor(X1_train) 
    X1_test=tf.convert_to_tensor(X1_test)
    X2_train=tf.convert_to_tensor(X2_train) 
    X2_test=tf.convert_to_tensor(X2_test)
    X3_train=tf.convert_to_tensor(X3_train) 
    X3_test=tf.convert_to_tensor(X3_test)

    Y_train=fn_target_to_tensor(Y_train) 
    Y_test=fn_target_to_tensor(Y_test) 
    
    return [X1_train,X1_test,X2_train,X2_test,X3_train,X3_test,Y_train,Y_test] 

In [None]:
#Create hyperband tunable model
def tunable_model(hp) :
    
    hp_g1_units = hp.Int('units_g1', min_value=40, max_value=256, step=32)
    hp_g2_units = hp.Int('units_g2', min_value=40, max_value=256, step=32)
    hp_e1_units = hp.Int('units_e1', min_value=40, max_value=256, step=32)
    hp_e2_units = hp.Int('units_e2', min_value=40, max_value=256, step=32)
    hp_h1_units = hp.Int('units_h1', min_value=40, max_value=256, step=32)
    hp_h2_units = hp.Int('units_h2', min_value=40, max_value=256, step=32)
    hp_1_units = hp.Int('units_1', min_value=40, max_value=128, step=32)
    hp_2_units = hp.Int('units_2', min_value=40, max_value=128, step=32)
    hp_l_rate = hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])
    
    geo_input = tf.keras.Input(shape=(3,)) 
    geo_out = tfl.Dense(hp_g1_units, activation='relu', kernel_initializer=tf.keras.initializers.HeNormal())(geo_input) 
    geo_out = tfl.Dense(hp_g2_units, activation='relu', kernel_initializer=tf.keras.initializers.HeNormal())(geo_out) 
    geo_out = tf.keras.layers.Dropout(rate=0.5)(geo_out) 
    geo_out = tfl.Dense(1, activation='sigmoid')(geo_out)

    env_input = tf.keras.Input(shape=(6,)) 
    env_out = tfl.Dense(hp_e1_units, activation='relu', kernel_initializer=tf.keras.initializers.HeNormal())(env_input) 
    env_out = tfl.Dense(hp_e2_units, activation='relu', kernel_initializer=tf.keras.initializers.HeNormal())(env_out) 
    env_out = tf.keras.layers.Dropout(rate=0.5)(env_out) 
    env_out = tfl.Dense(1, activation='sigmoid')(env_out)

    human_input = tf.keras.Input(shape=(2,)) 
    human_out = tfl.Dense(hp_h1_units, activation='relu', kernel_initializer=tf.keras.initializers.HeNormal())(human_input) 
    human_out = tfl.Dense(hp_h2_units, activation='relu', kernel_initializer=tf.keras.initializers.HeNormal())(human_out) 
    human_out = tfl.Dense(hp_2_units, activation='sigmoid')(human_out)

    X_late_fusion = tfl.concatenate([geo_out, env_out, human_out]) 
    X_late_fusion = tfl.Dense(hp_1_units, activation='relu', kernel_initializer=tf.keras.initializers.HeNormal())(X_late_fusion) 
    X_late_fusion = tfl.Dense(hp_2_units, activation='relu', kernel_initializer=tf.keras.initializers.HeNormal())(X_late_fusion)
    late_fusion_out = tfl.Dense(1, activation='sigmoid')(X_late_fusion)

    model = tf.keras.Model(inputs=[geo_input,env_input,human_input], outputs=late_fusion_out, name="deepcoral_late_fusion")
 

    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=hp_l_rate),
              loss=tf.keras.losses.BinaryCrossentropy(),
              metrics=[tf.keras.metrics.BinaryAccuracy(name='accuracy'),  tf.keras.metrics.Precision(name='precision'),
      tf.keras.metrics.Recall(name='recall'),])
    
    return model

In [None]:
#Get training and test data
col_Y =  ["o_coral_cover"]
cols_X = ["o_ocean", "o_lat", "o_long", "o_depth",
           "o_human_pop50", "o_land_area50", "o_storm_30yr","o_l_att_mean", "o_npp_mean", "o_sst_mean", "o_wave_mean"]


cols_geo = ["o_ocean","o_lat", "o_long"]


cols_env = ["o_depth", "o_storm_30yr","o_l_att_mean", "o_npp_mean", "o_sst_mean", "o_wave_mean"]


cols_human = ["o_human_pop50", "o_land_area50"]

    
cols_norm  = ["o_lat", "o_long", "o_depth",
           "o_human_pop50", "o_land_area50", "o_storm_30yr","o_l_att_mean", "o_npp_mean", "o_sst_mean", "o_wave_mean"] 

X_geo_train,X_geo_test,X_env_train,X_env_test,X_human_train,X_human_test,Y_train,Y_test = fn_get_fusion_coral_data(cols_geo,cols_env,cols_human,col_Y,cols_norm,train_prob=0.9, seed=230) 
X_train = [X_geo_train,X_env_train,X_human_train]
X_test = [X_geo_test,X_env_test,X_human_test]

In [None]:
#Configure Tuner

tuner = kt.Hyperband(tunable_model,
                     objective='val_accuracy',
                     max_epochs=1000,
                     project_name='deepreef_XL',
                     factor=3)

stop_early = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5)


In [None]:
# Get the optimal hyperparameters
tuner.search([X_geo_train,X_env_train,X_human_train], Y_train, epochs=300, validation_split=0.1, callbacks=[stop_early])

best_hps=tuner.get_best_hyperparameters(num_trials=10)[0]

print(f"""optimal number of units are 
{best_hps.get('units_g1')}, {best_hps.get('units_g2')}, 
{best_hps.get('units_e1')},{best_hps.get('units_e2')}, 
{best_hps.get('units_h1')}, {best_hps.get('units_h2')},
{best_hps.get('units_1')},and {best_hps.get('units_2')} and the optimal learning rate is {best_hps.get('learning_rate')}.
""")
 

In [None]:
# Find Optimal Epochs
model = tuner.hypermodel.build(best_hps)
history = model.fit(X_train, Y_train, epochs=300, validation_split=0.1)

val_prec_per_epoch = np.array(history.history['val_precision'])
val_recall_per_epoch = np.array(history.history['val_recall'])

val_f1_per_epoch = (2 * ((val_prec_per_epoch * val_recall_per_epoch)/(val_prec_per_epoch + val_recall_per_epoch))).tolist()
best_epoch = val_f1_per_epoch.index(max(val_f1_per_epoch)) + 1
print('Best epoch: %d' % (best_epoch,))

In [None]:
#Get hyperband tuned model
hypermodel = tuner.hypermodel.build(best_hps)

# Train the model until best epoch
hypermodel.fit(X_train, Y_train, epochs=best_epoch)

In [None]:
#Evaluate Test Set

eval_result = hypermodel.evaluate(X_test, Y_test)
print("[test loss, test accuracy, test f1]:", eval_result[0],  eval_result[1], 2*((eval_result[2]*eval_result[3])/(eval_result[2]+eval_result[3])))

In [None]:
#Compute absolute mean of geography input weights
abs(np.apply_along_axis(np.mean, 1,hypermodel.layers[2].get_weights()[0]))

In [None]:
#Compute absolute mean of environment input weights
abs(np.apply_along_axis(np.mean, 1,hypermodel.layers[3].get_weights()[0]))

In [None]:
#Compute absolute mean of human input weights
abs(np.apply_along_axis(np.mean, 1,hypermodel.layers[7].get_weights()[0]))