In [1]:
import os
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import save_model
from tensorflow.keras.models import load_model
from keras_tuner import RandomSearch
from keras_tuner import HyperParameters

import warnings
# Suppress all warnings
warnings.filterwarnings("ignore")

In [2]:
# Load back X_train, X_test, y_train, and y_test
X_train = np.loadtxt('train_data/X_train.txt')
X_test = np.loadtxt('train_data/X_test.txt')
y_train = np.loadtxt('train_data/y_train.txt')
y_test = np.loadtxt('train_data/y_test.txt')

CNN for Feature Extraction:

- Start with a CNN to extract features from the power measurements at different frequencies. Since the frequency information is spatially related, we can treat it as an image where each frequency bin is a pixel. Design the CNN architecture to have convolutional layers to capture local patterns in the frequency domain. The output of the CNN will be a feature map capturing important frequency-domain features.

RNN for Temporal Modeling:
- Feed the output of the CNN into an RNN (such as LSTM or GRU) to model temporal dependencies in the time-series data.
- The RNN will capture the temporal dynamics and sequential patterns in the power measurements over time.

This architecture allows the model to learn both frequency-domain features and temporal dependencies simultaneously.

Hybrid CNN-RNN Architecture:
- We can design a hybrid architecture where the CNN and RNN parts are connected sequentially. The CNN part processes the input frequency data to extract features, and the output is then fed into the RNN for temporal modeling.
- This architecture allows for both spatial (frequency) and temporal information to be captured effectively.

In [None]:
print(X_train.shape)
print(X_test.shape)
print(y_train.shape)
print(y_test.shape)

(1637, 76802)
(410, 76802)
(1637,)
(410,)


In [None]:
# Reshape the data to 3D for CNN input (samples, time steps, features)
X_train_reshaped = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
X_test_reshaped = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)
y_train_reshaped = y_train.reshape((-1, 1))
y_test_reshaped = y_test.reshape((-1, 1))

In [None]:
print(X_train_reshaped.shape)
print(X_test_reshaped.shape)
print(y_train_reshaped.shape)
print(y_test_reshaped.shape)

(1637, 76802, 1)
(410, 76802, 1)
(1637, 1)
(410, 1)


I am building a hybrid CNN and RNN model, taking power measurement of RF activity from 0 to 6.0 GHz to determine whether a camera is being turned on or off, based on its electromagnetic emanation. I have the raw data splitted accoring to these variables: X_train, X_test, y_trarin, y_test are numpy array of dimension (1637, 76802), (410, 76802), (1637,), (410,) respectively, where the row is capture data taken every seconds, and columns are power measurement in dBm (watts) from 0 to 6.0 GHz frequency taken by spectrum analyzer


In [1]:
# Define the CNN-RNN hybrid model
def build_model(hp):
    model = keras.Sequential()
    
    # CNN layers for feature extraction
    model.add(keras.layers.Conv1D(filters=hp.Int('conv_filters', min_value=4, max_value=16, step=4),
                                   kernel_size=hp.Int('conv_kernel', min_value=3, max_value=5, step=1),
                                   activation='relu',
                                   input_shape=(X_train_reshaped.shape[1], 1)))
    model.add(keras.layers.MaxPooling1D(pool_size=2))
    model.add(keras.layers.Dropout(rate=hp.Float('dropout_cnn', min_value=0.0, max_value=0.5, step=0.1)))

    # LSTM layers for temporal modeling
    model.add(keras.layers.LSTM(units=hp.Int('lstm_units', min_value=4, max_value=16, step=4),
                                return_sequences=True))
    model.add(keras.layers.Dropout(rate=hp.Float('dropout_rnn', min_value=0.0, max_value=0.5, step=0.1)))

    # Output layer
    model.add(keras.layers.Dense(1, activation='sigmoid'))
    
    # Compile the model
    model.compile(optimizer='adam',
                  loss='binary_crossentropy',
                  metrics=['accuracy'])
    
    return model

Define a keras tuner for random search

In [7]:
# Define the Keras Tuner RandomSearch
tuner = RandomSearch(
    build_model,
    objective='val_accuracy',
    max_trials=1,
    executions_per_trial=2,
    directory='keras_tuning',
    project_name='cnn_rnn_tuner')

# Perform hyperparameter search
tuner.search(X_train_reshaped, y_train_reshaped,
             epochs=3,
             validation_data=(X_test_reshaped, y_test_reshaped))

Reloading Tuner from keras_tuning\cnn_rnn_tuner\tuner0.json

Search: Running Trial #3

Value             |Best Value So Far |Hyperparameter
32                |128               |conv_filters
4                 |4                 |conv_kernel
0.3               |0                 |dropout_cnn
96                |32                |lstm_units
0.4               |0.4               |dropout_rnn

Epoch 1/3


Traceback (most recent call last):
  File "c:\Users\theor\OneDrive\Desktop\Master\Project\rf_anomaly_detection\venv\lib\site-packages\keras_tuner\src\engine\base_tuner.py", line 274, in _try_run_and_update_trial
    self._run_and_update_trial(trial, *fit_args, **fit_kwargs)
  File "c:\Users\theor\OneDrive\Desktop\Master\Project\rf_anomaly_detection\venv\lib\site-packages\keras_tuner\src\engine\base_tuner.py", line 239, in _run_and_update_trial
    results = self.run_trial(trial, *fit_args, **fit_kwargs)
  File "c:\Users\theor\OneDrive\Desktop\Master\Project\rf_anomaly_detection\venv\lib\site-packages\keras_tuner\src\engine\tuner.py", line 314, in run_trial
    obj_value = self._build_and_fit_model(trial, *args, **copied_kwargs)
  File "c:\Users\theor\OneDrive\Desktop\Master\Project\rf_anomaly_detection\venv\lib\site-packages\keras_tuner\src\engine\tuner.py", line 233, in _build_and_fit_model
    results = self.hypermodel.fit(hp, model, *args, **kwargs)
  File "c:\Users\theor\OneDrive\D

RuntimeError: Number of consecutive failures exceeded the limit of 3.
Traceback (most recent call last):
  File "c:\Users\theor\OneDrive\Desktop\Master\Project\rf_anomaly_detection\venv\lib\site-packages\keras_tuner\src\engine\base_tuner.py", line 274, in _try_run_and_update_trial
    self._run_and_update_trial(trial, *fit_args, **fit_kwargs)
  File "c:\Users\theor\OneDrive\Desktop\Master\Project\rf_anomaly_detection\venv\lib\site-packages\keras_tuner\src\engine\base_tuner.py", line 239, in _run_and_update_trial
    results = self.run_trial(trial, *fit_args, **fit_kwargs)
  File "c:\Users\theor\OneDrive\Desktop\Master\Project\rf_anomaly_detection\venv\lib\site-packages\keras_tuner\src\engine\tuner.py", line 314, in run_trial
    obj_value = self._build_and_fit_model(trial, *args, **copied_kwargs)
  File "c:\Users\theor\OneDrive\Desktop\Master\Project\rf_anomaly_detection\venv\lib\site-packages\keras_tuner\src\engine\tuner.py", line 233, in _build_and_fit_model
    results = self.hypermodel.fit(hp, model, *args, **kwargs)
  File "c:\Users\theor\OneDrive\Desktop\Master\Project\rf_anomaly_detection\venv\lib\site-packages\keras_tuner\src\engine\hypermodel.py", line 149, in fit
    return model.fit(*args, **kwargs)
  File "c:\Users\theor\OneDrive\Desktop\Master\Project\rf_anomaly_detection\venv\lib\site-packages\keras\utils\traceback_utils.py", line 70, in error_handler
    raise e.with_traceback(filtered_tb) from None
  File "c:\Users\theor\OneDrive\Desktop\Master\Project\rf_anomaly_detection\venv\lib\site-packages\tensorflow\python\eager\execute.py", line 54, in quick_execute
    tensors = pywrap_tfe.TFE_Py_Execute(ctx._handle, device_name, op_name,
tensorflow.python.framework.errors_impl.InternalError: Graph execution error:

Failed to call ThenRnnBackward with model config: [rnn_mode, rnn_input_mode, rnn_direction_mode]: 2, 0, 0 , [num_layers, input_size, num_units, dir_count, max_seq_length, batch_size, cell_num_units]: [1, 32, 96, 1, 38399, 32, 96] 
	 [[{{node gradients/CudnnRNN_grad/CudnnRNNBackprop}}]]
	 [[Adam/gradients/PartitionedCall]] [Op:__inference_train_function_3393]


Get the best model

In [None]:
# Get the best model
best_model = tuner.get_best_models(num_models=1)[0]
# Save the Keras model to an HDF5 file
best_model.save("hybrid_cnn_rnn.h5")

In [None]:
# Get best hyperparameters
best_hyperparameters = tuner.get_best_hyperparameters(num_trials=1)[0]

In [None]:
# Evaluate the best model
evaluation = best_model.evaluate(X_test, y_test)
# Print evaluation results
print(evaluation)

In [None]:
# Load the saved model
model = load_model("hybrid_cnn_rnn.h5")

# Convert the Keras model to TensorFlow Lite format
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()

# Save the TensorFlow Lite model to a file
with open("hybrid_cnn_rnn.tflite", "wb") as f:
    f.write(tflite_model)