# Hyperparameter Tuning via Optuna

## Hyper Band + Random Search

### Loading Libraries

In [None]:
# Numerical Computing
import numpy as np

# Data Manipulation
import pandas as pd

# Warnings
import warnings

# Time
import time

# Notebook Optimizer
from tqdm.notebook import tqdm

# Scikit-Learn
from sklearn.metrics import f1_score
from sklearn.compose import ColumnTransformer
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder

# TensorFlow
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras.models import Sequential  
from tensorflow.keras.optimizers import Adam, SGD
from tensorflow.keras.layers import Dense, Dropout 

# Optuna
import optuna

In [None]:
warnings.filterwarnings("ignore")

In [None]:
from tqdm.notebook import tqdm
import time

for i in tqdm(range(10), desc="Loading"):
    time.sleep(0.1)

In [None]:
def create_model(trial, input_size): 
    model = Sequential()
    model.add(Dense(input_size, input_shape=(input_size,), activation='relu')) 

    num_layers = trial.suggest_int('num_layers', 0, 3)
    for layer_i in range(num_layers):
        n_units = trial.suggest_int(f'n_units_layer_{layer_i}', 10, 50, step=5)
        dropout_rate = trial.suggest_float(f'dropout_rate_layer_{layer_i}', 0.0, 0.5)
        actv_func = trial.suggest_categorical(f'actv_func_layer_{layer_i}', ['relu', 'tanh', 'elu'])

        model.add(Dropout(dropout_rate))
        model.add(Dense(n_units, activation=actv_func))

    model.add(Dense(1, activation='sigmoid'))
    return model

In [None]:
def create_optimizer(trial):
    opt_name = trial.suggest_categorical('optimizer', ['Adam', 'SGD'])
    if opt_name == 'SGD':
        return SGD(
            learning_rate=trial.suggest_float('sgd_lr', 1e-5, 1e-1, log=True),
            momentum=trial.suggest_float('sgd_momentum', 1e-5, 1e-1, log=True)
        )
    else:
        return Adam(learning_rate=trial.suggest_float('adam_lr', 1e-5, 1e-1, log=True))

In [None]:
@tf.function
def recall_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

@tf.function
def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

@tf.function
def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2 * ((precision * recall) / (precision + recall + K.epsilon()))

In [None]:
def preprocessing(X, numeric_preprocessor, categorical_preprocessor, is_train=True):
    X = X.copy()
    if is_train:
        X[numerical_feats] = numeric_preprocessor.fit_transform(X[numerical_feats])
        X_cat = categorical_preprocessor.fit_transform(X[categorical_feats]).toarray()
    else:
        X[numerical_feats] = numeric_preprocessor.transform(X[numerical_feats])
        X_cat = categorical_preprocessor.transform(X[categorical_feats]).toarray()

    X_cat = pd.DataFrame(X_cat, columns=categorical_preprocessor.get_feature_names_out())
    X = X.drop(columns=categorical_feats).reset_index(drop=True)
    X = pd.concat([X, X_cat], axis=1)
    return X, numeric_preprocessor, categorical_preprocessor

In [None]:
def train(trial, df_train, df_val=None, use_pruner=False):
    X_train, y_train = df_train.drop(columns=['y']), df_train['y']
    X_val, y_val = None, None

    if df_val is not None:
        X_val, y_val = df_val.drop(columns=['y']), df_val['y']

    # Preprocessing
    numeric_preprocessor = StandardScaler()
    categorical_preprocessor = OneHotEncoder(handle_unknown="ignore")
    X_train, numeric_preprocessor, categorical_preprocessor = preprocessing(X_train, numeric_preprocessor, categorical_preprocessor, is_train=True)
    
    if df_val is not None:
        X_val, _, _ = preprocessing(X_val, numeric_preprocessor, categorical_preprocessor, is_train=False)

    model = create_model(trial, X_train.shape[1])
    optimizer = create_optimizer(trial)

    callbacks = []
    if use_pruner and df_val is not None:
        callbacks.append(optuna.integration.TFKerasPruningCallback(trial, 'val_f1_m'))

    model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=[f1_m])
    history = model.fit(
        X_train, y_train,
        epochs=trial.suggest_int('epoch', 15, 50),
        batch_size=64,
        validation_data=(X_val, y_val) if df_val is not None else None,
        callbacks=callbacks,
        verbose=0
    )

    if df_val is not None:
        return np.mean(history.history['val_f1_m'])
    return model

In [None]:
def objective(trial, df_train, use_pruner=False):
    df_train_hp, df_val = train_test_split(df_train, test_size=0.1, random_state=0)
    val_f1_score = train(trial, df_train_hp, df_val, use_pruner)
    return val_f1_score

In [None]:
def train_and_evaluate_final(df_train, df_test, **kwargs):
    X_train, y_train = df_train.drop(columns=['y']), df_train['y']
    X_test, y_test = df_test.drop(columns=['y']), df_test['y']

    numeric_preprocessor = StandardScaler()
    categorical_preprocessor = OneHotEncoder(handle_unknown="ignore")

    X_train, numeric_preprocessor, categorical_preprocessor = preprocessing(X_train, numeric_preprocessor, categorical_preprocessor, is_train=True)
    X_test, _, _ = preprocessing(X_test, numeric_preprocessor, categorical_preprocessor, is_train=False)

    model = Sequential()
    model.add(Dense(X_train.shape[1], input_shape=(X_train.shape[1],), activation='relu'))

    for i in range(kwargs.get('num_layers', 0)):
        model.add(Dropout(kwargs.get(f'dropout_rate_layer_{i}', 0)))
        model.add(Dense(kwargs.get(f'n_units_layer_{i}', 10), activation=kwargs.get(f'actv_func_layer_{i}', 'relu')))

    model.add(Dense(1, activation='sigmoid'))

    opt_name = kwargs.get('optimizer', 'Adam')
    if opt_name == 'SGD':
        optimizer = SGD(
            learning_rate=kwargs.get('sgd_lr', 1e-5),
            momentum=kwargs.get('sgd_momentum', 1e-5)
        )
    else:
        optimizer = Adam(learning_rate=kwargs.get('adam_lr', 1e-5))

    model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=[f1_m])
    model.fit(X_train, y_train, epochs=kwargs.get('epoch', 15), batch_size=64, verbose=1)

    y_test_pred_proba = model.predict(X_test)
    y_test_pred = [1 if x > 0.5 else 0 for x in y_test_pred_proba]

    print("=" * 100)
    print("F1-Score on Test Data:", f1_score(y_test, y_test_pred))

## Hyper Band + Random Search

In [None]:
df = pd.read_csv("/work/train.csv", sep=";")

In [None]:
df['y'] = df['y'].map({'yes':1,'no':0})

In [None]:
df_train, df_test = train_test_split(df, test_size=0.1, random_state=0)

In [None]:
numerical_feats = list(df_train.drop(columns='y').select_dtypes(include=np.number).columns)

In [None]:
categorical_feats = list(df_train.drop(columns='y').select_dtypes(exclude=np.number).columns)

### Performing Hyperparameter Tuning with 

In [None]:
study = optuna.create_study(direction='maximize',
                            sampler=optuna.samplers.RandomSampler(seed=0),
                            pruner=optuna.pruners.HyperbandPruner(reduction_factor=3,
                                                                  min_resource=5
                                                                 )
                           )
study.optimize(lambda trial: objective(trial, df_train, use_pruner=True),
               n_trials=100, n_jobs=-1,
              )

In [None]:
pruned_trials = study.get_trials(deepcopy=False, states=[optuna.trial.TrialState.PRUNED])
complete_trials = study.get_trials(deepcopy=False, states=[optuna.trial.TrialState.COMPLETE])
print("Study statistics: ")
print("  Number of finished trials: ", len(study.trials))
print("  Number of pruned trials: ", len(pruned_trials))
print("  Number of complete trials: ", len(complete_trials))

In [None]:
print("Best Trial:")
best_trial = study.best_trial

print("    Value: ", best_trial.value)

print("    Hyperparameters: ")
for key, value in best_trial.params.items():
    print(f"        {key}: {value}")

In [None]:
best_trial.params

In [None]:
train_and_evaluate_final(df_train, df_test, **best_trial.params)

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=8d9d968b-072e-450a-baa6-a22fc49031e6' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>