In [None]:
# Allows imports from modules in the project directory
import os
import sys
from pathlib import Path

CURRENT_DIRECTORY = Path(os.path.abspath('')).resolve()
MODULE_DIRECTORY = CURRENT_DIRECTORY.parent
PROJECT_DIRECTORY = MODULE_DIRECTORY.parents[1]
sys.path.extend([str(MODULE_DIRECTORY)])

print(f'Python {sys.version} on {sys.platform}')

In [None]:
import keras_tuner as kt
import numpy as np
import tensorflow as tf
from tensorflow import keras

import utilities

In [None]:
# Set parameter
LEARNING_RATE = 0.001
BATCH_SIZE = 32
SHUFFLE_BUFFER_SIZE = 100
EPOCHS = 5

wavelet_name = 'gaus5'
scales = 20

In [None]:
# Load wavelet data
train_path = CURRENT_DIRECTORY / f'{wavelet_name}_{scales}/train.npz'
test_path = CURRENT_DIRECTORY / f'{wavelet_name}_{scales}/test.npz'

train = np.load(train_path)
test = np.load(test_path)

In [None]:
# Get the train and test data
train_data = train['data']
train_labels = train['labels']

test_data = test['data']
test_labels = test['labels']

In [None]:
# Data generator allows to transform each batch in training and evaluate the test set after each epoche
class DataGenerator(utilities.BaseDataGenerator):
    # There is no transformation needed, but it is still convenient to use a generator here
    # since using a tensorflow dataset would use more memory than is available on my machine
    def transform(self, X: np.ndarray) -> np.ndarray:
        return X

In [None]:
# Initialize data generators
train_generator = DataGenerator(train_data, train_labels, batch_size=BATCH_SIZE)
test_generator = DataGenerator(test_data, test_labels, batch_size=BATCH_SIZE)

In [None]:
# Create model
def create_model(hp):
    model = keras.Sequential(name='cnn_wavelet_model')

    for idx in range(4):
        model.add(
            keras.layers.Conv2D(
                filters=hp.Choice(f'layer_{idx}_filters', values=[8, 16, 32, 64, 128]),
                kernel_size=hp.Choice(f'layer_{idx}_kernel', values=[3, 4, 5, 6]),
                activation='relu')
        )
        model.add(keras.layers.BatchNormalization())
        model.add(
            keras.layers.MaxPooling2D(
                pool_size=(1, hp.Choice(f'layer_{idx}_pool', values=[2, 3, 4, 5]))
            )
        )

    model.add(keras.layers.Flatten())

    model.add(
        keras.layers.Dense(
            units=hp.Int('dense_units', min_value=32, max_value=512, step=32),
            activation='relu'
        )
    )

    if hp.Boolean("dropout"):
        model.add(
            keras.layers.Dropout(
                rate=hp.Float('drop_rate', min_value=0.25, max_value=0.75, step=0.25)
            )
        )

    model.add(keras.layers.Dense(4, activation='softmax'))

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=LEARNING_RATE),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

In [None]:
# Using Hyperband to efficiently get the best result in the search space
tuner = kt.Hyperband(
    create_model,
    objective='val_accuracy',
    max_epochs=10,
    factor=3,
    directory='hyper-search',
    project_name='my_cnn_wavelet_search'
)

In [None]:
# Stop early if there is no improvement on the validation loss
stop_early = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5)

# Start search (this may take a very long while)
tuner.search(train_generator, epochs=10, callbacks=[stop_early], validation_data=test_generator)

In [None]:
# Print the best hyperparameters
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
print(best_hps.values)

In [None]:
# The best model could be saved like this:
best_model = tuner.get_best_models()[0]
best_model.save(CURRENT_DIRECTORY / 'models/my_wavelet_model_name')