In [6]:
# Ensure KerasTuner is installed
!pip install -q keras-tuner


[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/129.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m51.2/129.1 kB[0m [31m1.3 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m129.1/129.1 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
[?25h

In [9]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Conv2D, MaxPool2D, Flatten, Dense, TimeDistributed
from sklearn.model_selection import train_test_split
from scipy import stats
from ncps.tf import LTC
from ncps.wirings import AutoNCP
from tensorflow.keras.callbacks import ModelCheckpoint
import keras_tuner as kt
import matplotlib.pyplot as plt
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

# Function to scale input data
def scaled_in(matrix_spec):
    "Global scaling applied to noisy voice spectrograms (scale between -1 and 1)"
    return (matrix_spec + 46) / 50

# Function to scale output data
def scaled_ou(matrix_spec):
    "Global scaling applied to noise models spectrograms (scale between -1 and 1)"
    return (matrix_spec - 6) / 82

# Load data
path_save_spectrogram = '/content/drive/MyDrive/npy/'  # Spectrogram directory
print("Loading data...")
X_in = np.load(os.path.join(path_save_spectrogram, 'noisy_voice_amp_db.npy'))
X_ou = np.load(os.path.join(path_save_spectrogram, 'voice_amp_db.npy'))
print("Data loaded successfully.")

# Model of noise to predict
X_ou = X_in - X_ou

# Check distribution
print("Checking distribution of input data...")
print(stats.describe(X_in.reshape(-1, 1)))
print(stats.describe(X_ou.reshape(-1, 1)))
print("Distribution check completed.")

# Scale input and output data
print("Scaling input and output data...")
X_in = scaled_in(X_in)
X_ou = scaled_ou(X_ou)
print("Data scaled successfully.")

# Split data into train and validation sets
print("Splitting data into train and validation sets...")
x_train, x_val, y_train, y_val = train_test_split(X_in, X_ou, test_size=0.10, random_state=42)
print("Data split completed.")

# Reshape data to include the time dimension
x_train_reshaped = x_train.reshape((x_train.shape[0], 1, x_train.shape[1], x_train.shape[2], 1))
x_val_reshaped = x_val.reshape((x_val.shape[0], 1, x_val.shape[1], x_val.shape[2], 1))

y_train_reshaped = y_train.reshape((y_train.shape[0], 1, -1))
y_val_reshaped = y_val.reshape((y_val.shape[0], 1, -1))

# Define a model building function for Keras Tuner
def build_model(hp):
    ncp = LTC(AutoNCP(32, output_size=8), return_sequences=True)

    # Create a Keras Sequential model
    model = tf.keras.models.Sequential([
        tf.keras.layers.InputLayer(input_shape=(None, 128, 128, 1)),  # Adjusted to your input shape
        TimeDistributed(Conv2D(32, (5, 5), activation="relu")),
        TimeDistributed(MaxPool2D()),
        TimeDistributed(Conv2D(hp.Int("num_filters", 32, 64, step=16), (5, 5), activation="relu")),  # Tuning num_filters
        TimeDistributed(MaxPool2D()),
        TimeDistributed(Flatten()),
        TimeDistributed(Dense(hp.Int("dense_units", 32, 64, step=16), activation="relu")),  # Tuning dense_units
        ncp,
        TimeDistributed(Dense(1))  # Single output unit without activation
    ])

    # Configure the learning rate as a hyperparameter
    learning_rate = hp.Float("learning_rate", min_value=0.001, max_value=0.02, step=0.001)

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate),
        loss='mse',  # Mean squared error for regression
        metrics=['mae']
    )

    return model

# Set up the tuner with random search
tuner = kt.RandomSearch(
    build_model,
    objective='val_loss',  # Optimize for validation loss
    max_trials=10,  # Number of different configurations to try
    executions_per_trial=3,  # Number of times to run each configuration
    directory='my_tuning_results',  # Directory to save tuning results
    project_name='rnn_ncp_tuning'  # Project name
)

# Define the ModelCheckpoint callback
model_checkpoint_path = 'best_model_rnn-ltc-f.keras'
checkpoint = ModelCheckpoint(
    model_checkpoint_path,
    verbose=1,
    monitor='val_loss',
    save_best_only=True,
    mode='auto'
)

# Search for the best hyperparameters
tuner.search(
    x_train_reshaped,
    y_train_reshaped,
    validation_data=(x_val_reshaped, y_val_reshaped),
    epochs=10,
    batch_size=10,
    callbacks=[checkpoint, tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3)]
)

# Retrieve the best hyperparameters and model
best_hyperparameters = tuner.get_best_hyperparameters(1)[0]
best_model = tuner.get_best_models(1)[0]

print("Best Hyperparameters:", best_hyperparameters.values)

# Save the best model's architecture and weights
model_json = best_model.to_json()
with open('best_model_rnn-ltc-f.json', 'w') as json_file:
    json_file.write(model_json)

best_model.save_weights('best_model_rnn-ltc-f_weights.keras')

# Plot training and validation loss
history = tuner.oracle.get_trial(0).best_step
loss = history.metrics['loss']
val_loss = history.metrics['val_loss']

epochs = range(1, len(loss) + 1)

plt.plot(epochs, loss, label='Training loss')
plt.plot(epochs, val_loss, label='Validation loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

# Save trial results for later analysis
tuner.results_summary()


Trial 10 Complete [00h 07m 51s]
val_loss: 0.013882565312087536

Best val_loss So Far: 0.012614714913070202
Total elapsed time: 01h 01m 50s
Best Hyperparameters: {'num_filters': 32, 'dense_units': 32, 'learning_rate': 0.001}


KeyError: 0