In [1]:
import os
import pickle
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import seaborn as sns
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

# Load the dataset
with open("RML2016.10a_dict.pkl", "rb") as f:
    data = pickle.load(f, encoding="latin1")

# Prepare data and return SNR values along with samples and labels
def prepare_data_with_snr(data):
    X, y, snr_values = [], [], []
    for (mod_type, snr), signals in data.items():
        for signal in signals:
            iq_signal = np.vstack([signal[0], signal[1]]).T
            # Create a column with the SNR value (same value repeated for each time step)
            snr_signal = np.full((128, 1), snr)
            combined_signal = np.hstack([iq_signal, snr_signal])
            X.append(combined_signal)
            y.append(mod_type)
            snr_values.append(snr)
    X = np.array(X)
    y = np.array(y)
    snr_values = np.array(snr_values)
    label_encoder = LabelEncoder()
    y_encoded = label_encoder.fit_transform(y)
    return X, y_encoded, snr_values, label_encoder

# Get full dataset with SNR values
X_all, y_all, snr_all, label_encoder = prepare_data_with_snr(data)

# Split dataset into high SNR (>= -6dB) and low SNR (< -6dB)
mask_high = snr_all >= 12
mask_low = snr_all < 12

X_high = X_all[mask_high]
y_high = y_all[mask_high]
X_low = X_all[mask_low]
y_low = y_all[mask_low]

print("High SNR samples:", X_high.shape[0])
print("Low SNR samples:", X_low.shape[0])

# Split the high SNR dataset into training and testing sets
X_train_high, X_test_high, y_train_high, y_test_high = train_test_split(
    X_high, y_high, test_size=0.2, random_state=42
)

# Ensure proper shape (each sample is already of shape (128, 3))
X_train_high = X_train_high.reshape(-1, X_train_high.shape[1], X_train_high.shape[2])
X_test_high = X_test_high.reshape(-1, X_test_high.shape[1], X_test_high.shape[2])

2025-04-23 21:16:17.937618: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-04-23 21:16:17.949653: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1745457377.962869  865759 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1745457377.966772  865759 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-04-23 21:16:17.981115: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instr

High SNR samples: 44000
Low SNR samples: 176000


In [2]:


# Define the model
def build_model(input_shape, num_classes, learning_rate=1e-5):
    model = Sequential([
        LSTM(128, input_shape=input_shape, return_sequences=True),
        Dropout(0.5),
        LSTM(128, return_sequences=False),
        Dropout(0.2),
        Dense(128, activation="relu"),
        Dropout(0.1),
        Dense(num_classes, activation="softmax")
    ])
    optimizer = Adam(learning_rate=learning_rate)
    model.compile(loss="sparse_categorical_crossentropy", optimizer=optimizer, metrics=["accuracy"])
    return model

# Model setup for high SNR training
model_path = "lstm_rnn_2024.keras"
if os.path.exists(model_path):
    print(f"Loading existing model from {model_path}")
    model = load_model(model_path)
else:
    print("Building new model")
    model = build_model(input_shape=X_train_high.shape[1:], num_classes=len(label_encoder.classes_))

# Train the model using only the high SNR (>= -6 dB) data
history = model.fit(X_train_high, y_train_high, validation_data=(X_test_high, y_test_high),
                    epochs=1, batch_size=64, verbose=1)

# Save the trained model
model.save(model_path)
print(f"Model saved to {model_path}")


Loading existing model from lstm_rnn_2024.keras


I0000 00:00:1745457424.249501  865759 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 1041 MB memory:  -> device: 0, name: NVIDIA A30, pci bus id: 0000:ca:00.0, compute capability: 8.0
I0000 00:00:1745457425.995286  866060 cuda_dnn.cc:529] Loaded cuDNN version 90300


[1m550/550[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 11ms/step - accuracy: 0.9265 - loss: 0.1594 - val_accuracy: 0.9153 - val_loss: 0.1699
Model saved to lstm_rnn_2024.keras


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from tensorflow.keras.models import Model
from sklearn.metrics import confusion_matrix, classification_report
import numpy as np
import tensorflow as tf

# --- Ensure the model is built ---
# If the model hasn't been called yet, perform a dummy prediction.
dummy_input = np.zeros((1, X_train_high.shape[1], X_train_high.shape[2]))
_ = model.predict(dummy_input)

# --- 1. Plot Training and Validation Accuracy ---
plt.figure(figsize=(10, 4))
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title("Training and Validation Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()
plt.show()

# --- 2. Visualize Activations ("Feature Maps") of Selected Layers ---
def visualize_feature_maps(model, sample_index=0):
    # Select a sample from the high-SNR test set
    sample = X_test_high[sample_index:sample_index+1]  # shape: (1, 128, 3)
    
    # Loop over layers of interest (here, LSTM and Dense layers)
    for layer in model.layers:
        if isinstance(layer, (tf.keras.layers.LSTM, tf.keras.layers.Dense)):
            try:
                # Create an intermediate model to output this layer's activation
                intermediate_model = Model(inputs=model.input, outputs=layer.output)
                activation = intermediate_model.predict(sample)
                
                plt.figure(figsize=(10, 4))
                if activation.ndim == 3:
                    # For LSTM layers with return_sequences=True, visualize as a heatmap
                    # (features x time steps)
                    sns.heatmap(activation[0].T, cmap='viridis')
                    plt.title(f"Activation from {layer.name} (Heatmap)")
                    plt.xlabel("Time steps")
                    plt.ylabel("Features")
                elif activation.ndim == 2:
                    # For Dense layers (or LSTM with single output), plot as a line plot
                    plt.plot(activation[0])
                    plt.title(f"Activation from {layer.name}")
                    plt.xlabel("Neuron index")
                    plt.ylabel("Activation")
                plt.tight_layout()
                plt.show()
            except Exception as e:
                print(f"Could not visualize layer {layer.name}: {e}")

# Visualize activations for a sample from X_test_high
visualize_feature_maps(model, sample_index=0)

# --- 3. Plot the Confusion Matrix for High SNR Test Data ---
# Predict on the high SNR test set
y_pred_prob = model.predict(X_test_high)
y_pred = np.argmax(y_pred_prob, axis=1)

# Compute the confusion matrix
cm = confusion_matrix(y_test_high, y_pred)

plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
            xticklabels=label_encoder.classes_, yticklabels=label_encoder.classes_)
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix for High SNR (>= -6 dB)")
plt.show()

# Print a detailed classification report
print(classification_report(y_test_high, y_pred, target_names=label_encoder.classes_))


In [None]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import load_model, Model
from tensorflow.keras.layers import (
    Input, LSTM, Dense, Dropout, Add, Concatenate, Lambda
)
from tensorflow.keras.optimizers import Adam

# -------------------------------
# Helper: FFT-Based Filtering Lambda Layer
# -------------------------------
def fft_filter(x, keep_bins=20):
    """
    Perform FFT on the IQ channels (first two features), zero out all bins 
    except for the center 'keep_bins' (i.e. the narrow band around DC), then 
    perform the inverse FFT to get back to the time domain.
    """
    # x shape: (batch, time, features); we use only I and Q.
    I = x[..., 0]  # shape: (batch, time)
    Q = x[..., 1]  # shape: (batch, time)
    # Combine into a complex signal.
    complex_signal = tf.complex(I, Q)  # shape: (batch, time)
    
    # Compute FFT along time axis.
    fft_signal = tf.signal.fft(complex_signal)  # shape: (batch, time)
    # Shift the zero frequency to the center.
    fft_shifted = tf.signal.fftshift(fft_signal, axes=[-1])
    
    # Create a mask that is 1 for the middle 'keep_bins' indices and 0 elsewhere.
    time_dim = tf.shape(fft_shifted)[-1]
    center = time_dim // 2
    half_keep = keep_bins // 2
    lower = center - half_keep
    upper = center + half_keep
    # Create a mask over the last dimension.
    freq_indices = tf.range(time_dim)
    mask = tf.cast((freq_indices >= lower) & (freq_indices < upper), fft_shifted.dtype)
    mask = tf.reshape(mask, (1, -1))  # shape: (1, time)
    # Broadcast the mask to each sample in the batch.
    fft_masked = fft_shifted * mask
    
    # Inverse shift and inverse FFT.
    fft_unshifted = tf.signal.ifftshift(fft_masked, axes=[-1])
    filtered_complex = tf.signal.ifft(fft_unshifted)
    # Take the real part as the filtered time-domain signal.
    filtered = tf.math.real(filtered_complex)
    # Expand dims to have a channel dimension (so shape becomes (batch, time, 1)).
    filtered = tf.expand_dims(filtered, axis=-1)
    return filtered

# -------------------------------
# Model Definition
# -------------------------------

# Load the original model (frozen left branch)
orig_model_path = "lstm_rnn_2024.keras"
if os.path.exists(orig_model_path):
    print(f"Loading original model from {orig_model_path}")
    orig_model = load_model(orig_model_path)
else:
    raise FileNotFoundError(f"{orig_model_path} not found.")

# Use the input from the first layer
first_input = orig_model.layers[0].input

# Create the left branch by extracting the output from the Dense(128, "relu") layer (assumed index=4).
left_branch = Model(inputs=first_input, outputs=orig_model.layers[4].output)
left_branch.trainable = True  # you mentioned making it trainable for now

# Define the overall input (shape: (128, 3) for [I, Q, SNR])
input_shape = (128, 3)
inputs = Input(shape=input_shape)

# Left branch output (processing the raw IQ/SNR input)
left_output = left_branch(inputs)

# ---------------------------
# Right Branch: Apply FFT Filtering, then compute amplitude and phase differences
# ---------------------------
# Apply FFT filtering to the IQ channels (this should zero out AM-DSB if its energy lies outside the kept bins)
filtered_input = Lambda(lambda x: fft_filter(x, keep_bins=20), name="fft_filter")(inputs)
# Now compute amplitude and phase differences from the filtered signal.

def amplitude_diff_fn(x):
    iq = x[..., :1]  # Note: filtered_input has shape (batch, time, 1)
    # Since we already combined I and Q in fft_filter, here we just use the filtered result.
    # Alternatively, you could compute amplitude from the raw signal if needed.
    # For demonstration, we'll treat the filtered output as the "amplitude"
    amp_diff = x[:, 1:, :] - x[:, :-1, :]
    amp_diff = tf.pad(amp_diff, [[0, 0], [1, 0], [0, 0]])
    return amp_diff

def phase_diff_fn(x):
    # Here, since we've applied an FFT filter on IQ, you might choose not to compute phase difference,
    # or you could compute it on the raw input. For now, we'll compute it on the raw input.
    iq = x[..., :2]  # use the raw I and Q channels
    phase = tf.atan2(iq[..., 1], iq[..., 0])
    phase_diff = phase[:, 1:] - phase[:, :-1]
    phase_diff = tf.pad(phase_diff, [[0, 0], [1, 0]])
    phase_diff = tf.expand_dims(phase_diff, axis=-1)
    return phase_diff

# For this branch, let's use the filtered input to compute amplitude differences,
# and use the raw input to compute phase differences.
amp_input   = Lambda(amplitude_diff_fn, name="amp_diff")(filtered_input)   # shape: (batch, time, 1)
phase_input = Lambda(phase_diff_fn,    name="phase_diff")(inputs)          # shape: (batch, time, 1)

# Concatenate amplitude diff and phase diff along the channel axis => shape: (batch, time, 2)
combined_input = Concatenate(axis=-1, name="combined_amp_phase")([amp_input, phase_input])

# Build a deeper ResLSTM branch with skip connections (using Xavier initialization)
r1 = LSTM(128, return_sequences=True, kernel_initializer='glorot_uniform', name="res_lstm_1")(combined_input)
r1 = Dropout(0.5, name="res_dropout_1")(r1)

r2 = LSTM(128, return_sequences=True, kernel_initializer='glorot_uniform', name="res_lstm_2")(r1)
skip1 = Add(name="res_skip1")([r1, r2])
skip1 = Dropout(0.5, name="res_dropout_2")(skip1)

r3 = LSTM(128, return_sequences=True, kernel_initializer='glorot_uniform', name="res_lstm_3")(skip1)
r3 = Dropout(0.5, name="res_dropout_3")(r3)

r4 = LSTM(128, return_sequences=True, kernel_initializer='glorot_uniform', name="res_lstm_4")(r3)
skip2 = Add(name="res_skip2")([skip1, r4])
skip2 = Dropout(0.5, name="res_dropout_4")(skip2)

# Final block: produce a single vector from the sequence.
r5 = LSTM(128, return_sequences=False, kernel_initializer='glorot_uniform', name="res_lstm_5")(skip2)
r5 = Dropout(0.2, name="res_dropout_5")(r5)
r5 = Dense(128, activation="relu", kernel_initializer='glorot_uniform', name="res_dense")(r5)
r5 = Dropout(0.1, name="res_dropout_6")(r5)

# ---------------------------
# Combine Branches and Final Classification
# ---------------------------
combined = Concatenate(name="combined_features")([left_output, r5])
num_classes = 11  # Adjust as needed
outputs = Dense(num_classes, activation="softmax", name="final_output")(combined)

# Build the final Y-network model.
y_model = Model(inputs=inputs, outputs=outputs)

# Compile the model.
optimizer = Adam(learning_rate=1e-5)
y_model.compile(loss="sparse_categorical_crossentropy", optimizer=optimizer, metrics=["accuracy"])

# Display the architecture.
y_model.summary()


In [None]:
am_dsb_label = label_encoder.transform(['AM-DSB'])[0]  # e.g., 1
wb_fm_label  = label_encoder.transform(['WBFM'])[0]    # e.g., 10

# Create a dictionary mapping label indices to weights.
class_weight = {
    am_dsb_label: 4.0,  # Double weight for AM-DSB
    wb_fm_label:  4.0   # Double weight for WBFM
}

# Now train your Y-network model with class_weight:
history = y_model.fit(
    X_train_high, y_train_high,
    validation_data=(X_test_high, y_test_high),
    epochs=10, 
    batch_size=64,
    verbose=1,
    class_weight=class_weight
)
# --- Plot Training and Validation Accuracy ---
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 5))
plt.plot(history_y.history['accuracy'], label='Training Accuracy')
plt.plot(history_y.history['val_accuracy'], label='Validation Accuracy')
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.title("Training and Validation Accuracy (Y-Network)")
plt.legend()
plt.show()

# --- Plot the Confusion Matrix for High SNR Test Data ---
import seaborn as sns
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report

# Predict on the high SNR test set
y_pred_prob = y_model.predict(X_test_high)
y_pred = np.argmax(y_pred_prob, axis=1)

# Compute the confusion matrix
cm = confusion_matrix(y_test_high, y_pred)

plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
            xticklabels=label_encoder.classes_,
            yticklabels=label_encoder.classes_)
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix for High SNR (>= -6 dB) Test Data")
plt.show()

# Print a detailed classification report
print(classification_report(y_test_high, y_pred, target_names=label_encoder.classes_))


In [None]:
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt

# Fine tune settings: use a lower learning rate for fine tuning.
fine_tune_lr = 1e-7
optimizer_finetune = Adam(learning_rate=fine_tune_lr)

# Recompile the model so that only the new (trainable) branch and final layers are updated.
y_model.compile(loss="sparse_categorical_crossentropy", optimizer=optimizer_finetune, metrics=["accuracy"])

# Fine tune the model on high SNR (>= -6 dB) data
history_finetune = y_model.fit(
    X_train_high, y_train_high,
    validation_data=(X_test_high, y_test_high),
    epochs=5,          # Adjust the number of fine-tuning epochs as needed
    batch_size=64,
    verbose=1
)

# Plot fine-tuning training and validation accuracy
plt.figure(figsize=(10, 5))
plt.plot(history_finetune.history['accuracy'], label='Fine Tune Training Accuracy')
plt.plot(history_finetune.history['val_accuracy'], label='Fine Tune Validation Accuracy')
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.title("Fine Tuning: Training and Validation Accuracy (Y-Network)")
plt.legend()
plt.show()


In [3]:
import itertools
import numpy as np
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.layers import (
    Dense, Flatten, Conv2D, MaxPooling2D, Dropout, Activation, 
    LSTM, TimeDistributed, BatchNormalization, GlobalAveragePooling2D
)
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.regularizers import l2
from tensorflow.keras.models import Model
from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt
import seaborn as sns

###############################################################################
# 1. Data: We'll assume you have X_all, y_all, label_encoder (for WBFM vs. AM-DSB).
#    We'll do a 2-class scenario with binary labels: 0=AM-DSB, 1=WBFM.
###############################################################################
# e.g.:
# am_dsb_label = label_encoder.transform(['AM-DSB'])[0]
# wbfm_label   = label_encoder.transform(['WBFM'])[0]
# mask = (y_all == am_dsb_label) | (y_all == wbfm_label)
# X_2c = X_all[mask]  # shape: (num_samples, 128, 3) or (num_samples, 128, 2) etc.
# y_2c = np.where(y_all[mask] == wbfm_label, 1, 0)
# Then train_test_split -> X_train, X_test, y_train, y_test
#
# We'll assume you already have:
#   X_train, X_test, y_train, y_test
###############################################################################
y_test = y_test_high
y_train = y_train_high
X_test = X_test_high
X_train = X_train_high
###############################################################################
# 2. Define Feature Transformation Functions
###############################################################################
# Each function takes raw IQ data (shape: (128, 2) or (128, 3)) and returns
# a new feature array (and possibly shape info).

def transform_raw_iq(iq_sample):
    """
    Return the raw IQ data (flattened).
    E.g. shape -> (128*2,) if ignoring SNR or (128*3,) if you have it.
    """
    # If you have 3 columns [I, Q, SNR], slice if needed. 
    # For simplicity, assume shape (128,2).
    return iq_sample.reshape(-1)  # shape (256,)

def transform_fft_mag(iq_sample, n_fft=128):
    """
    Compute FFT magnitude of the IQ signal.
    Return flattened log-magnitude, shape ~ (n_fft,).
    """
    # shape (128,2) -> complex
    complex_signal = iq_sample[:,0] + 1j*iq_sample[:,1]
    fft_result = np.fft.fft(complex_signal, n=n_fft)
    mag = np.abs(fft_result)
    log_mag = np.log(mag + 1e-6)
    return log_mag.astype(np.float32)  # shape (n_fft,)

def transform_phase_diff(iq_sample):
    """
    Compute phase difference between successive samples.
    Returns shape (128,) for example.
    """
    # shape (128,2)
    I = iq_sample[:,0]
    Q = iq_sample[:,1]
    phase = np.arctan2(Q, I)
    diff = np.diff(phase, prepend=phase[0])  # shape (128,)
    return diff.astype(np.float32)

def transform_amp_diff(iq_sample):
    """
    Compute amplitude difference between successive samples.
    """
    amp = np.sqrt(iq_sample[:,0]**2 + iq_sample[:,1]**2)
    diff = np.diff(amp, prepend=amp[0])  # shape (128,)
    return diff.astype(np.float32)

def transform_square(iq_sample):
    """
    Square each IQ sample: [I^2, Q^2].
    Flatten the result.
    """
    squared = iq_sample**2  # shape (128,2)
    return squared.reshape(-1).astype(np.float32)

def transform_cube(iq_sample):
    """
    Cube each IQ sample: [I^3, Q^3].
    Flatten the result.
    """
    cubed = iq_sample**3  # shape (128,2)
    return cubed.reshape(-1).astype(np.float32)

###############################################################################
# 2B. If we want 2D shapes for CNN or ResNet, we can do a small 2D transform:
###############################################################################
def transform_fft_2d(iq_sample, n_fft=128, height=16, width=16):
    """
    Compute FFT magnitude, reshape to (height, width, 1).
    """
    complex_signal = iq_sample[:,0] + 1j*iq_sample[:,1]
    fft_result = np.fft.fft(complex_signal, n=n_fft)
    mag = np.abs(fft_result)
    log_mag = np.log(mag + 1e-6)
    # Reshape to 2D, then expand channel
    # e.g. 16x8 or 16x16 etc. We'll do 16x8 if n_fft=128 -> 128 elements
    # but let's just do 16x8 as an example. 
    # We'll do 16x8=128. If we want 16x16, we can zero-pad or something.
    # For demonstration, let's do 16x8 => height=16, width=8
    # but user said 16x16 => we can zero-pad or just do 256 point FFT.
    # We'll keep it flexible. We'll assume n_fft=256 => 256 magnitude points.
    # Then 16x16 => shape (16,16).
    
    reshaped = log_mag.reshape(height, width)
    reshaped = reshaped[..., np.newaxis]  # add channel
    return reshaped.astype(np.float32)  # shape (16,16,1) if height=16,width=16

###############################################################################
# 3. Collect Transformations in a Dictionary
###############################################################################
# Key = name of transform, Value = function that does it (plus info if 1D or 2D)
# We'll define which ones produce 1D vs. 2D shapes.

transformations = {
    'raw_iq':       (transform_raw_iq,  '1d'),
    'fft_mag':      (transform_fft_mag, '1d'),
    'phase_diff':   (transform_phase_diff, '1d'),
    'amp_diff':     (transform_amp_diff,   '1d'),
    'square':       (transform_square,  '1d'),
    'cube':         (transform_cube,    '1d'),
    'fft_2d':       (transform_fft_2d,  '2d'),  # for CNN / ResNet
}

###############################################################################
# 4. Define Some Model-Building Functions
###############################################################################
def build_mlp(input_dim, lr=1e-4, wd=1e-4):
    model = Sequential()
    model.add(Dense(256, activation='relu', kernel_regularizer=l2(wd), input_shape=(input_dim,)))
    model.add(Dropout(0.3))
    model.add(Dense(64, activation='relu', kernel_regularizer=l2(wd)))
    model.add(Dropout(0.3))
    model.add(Dense(1, activation='sigmoid', kernel_regularizer=l2(wd)))
    model.compile(optimizer=Adam(lr), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_small_cnn_1d(input_dim, lr=1e-4, wd=1e-4):
    """
    A 1D CNN for 1D sequences. 
    We'll reshape (input_dim,) -> (input_dim,1) for conv1D.
    """
    from tensorflow.keras.layers import Conv1D, GlobalMaxPooling1D
    
    model = Sequential()
    model.add(tf.keras.layers.Reshape((input_dim, 1), input_shape=(input_dim,)))
    model.add(Conv1D(32, kernel_size=3, padding='same', kernel_regularizer=l2(wd), activation='relu'))
    model.add(GlobalMaxPooling1D())
    model.add(Dense(64, activation='relu', kernel_regularizer=l2(wd)))
    model.add(Dropout(0.3))
    model.add(Dense(1, activation='sigmoid', kernel_regularizer=l2(wd)))
    model.compile(optimizer=Adam(lr), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_small_cnn_2d(input_shape, lr=1e-4, wd=1e-4):
    """
    A small 2D CNN for shape e.g. (16,16,1).
    """
    model = Sequential()
    model.add(Conv2D(32, (3,3), padding='same', kernel_regularizer=l2(wd), activation='relu', input_shape=input_shape))
    model.add(MaxPooling2D(pool_size=(2,2)))
    model.add(Dropout(0.3))
    model.add(Conv2D(64, (3,3), padding='same', kernel_regularizer=l2(wd), activation='relu'))
    model.add(GlobalAveragePooling2D())
    model.add(Dense(64, activation='relu', kernel_regularizer=l2(wd)))
    model.add(Dropout(0.3))
    model.add(Dense(1, activation='sigmoid', kernel_regularizer=l2(wd)))
    model.compile(optimizer=Adam(lr), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_resnet20_2d(input_shape, lr=1e-4, wd=1e-4):
    """
    A ResNet20 for 2D inputs (like your existing code).
    """
    from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, Activation, Add, AveragePooling2D, Flatten
    from tensorflow.keras.models import Model
    
    def resnet_layer(inputs, num_filters=16, kernel_size=3, strides=1,
                     activation='relu', batch_normalization=True, conv_first=True):
        conv = Conv2D(num_filters, kernel_size=kernel_size, strides=strides, padding='same',
                      kernel_initializer='he_normal', kernel_regularizer=l2(wd))
        x = inputs
        if conv_first:
            x = conv(x)
            if batch_normalization:
                x = BatchNormalization()(x)
            if activation is not None:
                x = Activation(activation)(x)
        else:
            if batch_normalization:
                x = BatchNormalization()(x)
            if activation is not None:
                x = Activation(activation)(x)
            x = conv(x)
        return x

    def resnet_block(inputs, num_filters, downsample=False):
        strides = 2 if downsample else 1
        x = resnet_layer(inputs, num_filters=num_filters, strides=strides)
        x = resnet_layer(x, num_filters=num_filters, activation=None)
        # shortcut
        if downsample or inputs.shape[-1] != num_filters:
            shortcut = Conv2D(num_filters, kernel_size=1, strides=strides, padding='same',
                              kernel_initializer='he_normal', kernel_regularizer=l2(wd))(inputs)
            shortcut = BatchNormalization()(shortcut)
        else:
            shortcut = inputs
        x = Add()([x, shortcut])
        x = Activation('relu')(x)
        return x

    inputs = Input(shape=input_shape)
    x = resnet_layer(inputs, num_filters=16, conv_first=True)
    
    num_filters = 16
    for stack in range(3):
        for block in range(3):
            downsample = True if (stack > 0 and block == 0) else False
            x = resnet_block(x, num_filters, downsample=downsample)
        num_filters *= 2

    x = AveragePooling2D(pool_size=2)(x)  # if input is small, adjust
    x = Flatten()(x)
    outputs = Dense(1, activation='sigmoid', kernel_regularizer=l2(wd))(x)
    
    model = Model(inputs=inputs, outputs=outputs)
    model.compile(optimizer=Adam(lr), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_lstm_1d(input_dim, lr=1e-4, wd=1e-4):
    """
    A simple LSTM model for 1D sequences. We'll reshape (input_dim,) -> (input_dim,1).
    """
    from tensorflow.keras.layers import Reshape
    model = Sequential()
    model.add(Reshape((input_dim,1), input_shape=(input_dim,)))
    model.add(LSTM(64, return_sequences=False))
    model.add(Dropout(0.3))
    model.add(Dense(64, activation='relu', kernel_regularizer=l2(wd)))
    model.add(Dropout(0.3))
    model.add(Dense(1, activation='sigmoid', kernel_regularizer=l2(wd)))
    model.compile(optimizer=Adam(lr), loss='binary_crossentropy', metrics=['accuracy'])
    return model

###############################################################################
# 5. Create a Dictionary of Models (by name) that Expect 1D or 2D Input
###############################################################################
model_registry_1d = {
    'mlp':        build_mlp,
    'cnn_1d':     build_small_cnn_1d,
    'lstm_1d':    build_lstm_1d
}
model_registry_2d = {
    'cnn_2d':     build_small_cnn_2d,
    'resnet20_2d': build_resnet20_2d
}

###############################################################################
# 6. Grid Search
###############################################################################
transform_choices = ['raw_iq', 'fft_mag', 'phase_diff', 'amp_diff', 'square', 'cube', 'fft_2d']
model_choices_1d = ['mlp', 'cnn_1d', 'lstm_1d']
model_choices_2d = ['cnn_2d', 'resnet20_2d']
learning_rates = [1e-4, 1e-5]
weight_decays = [1e-4]
batch_sizes = [32]

results = []
early_stopping = EarlyStopping(patience=3, restore_best_weights=True, monitor='val_loss')

for transform_name in transform_choices:
    transform_func, transform_type = transformations[transform_name]
    
    # Determine which model registry to use based on transform_type
    if transform_type == '1d':
        possible_models = model_choices_1d
    else:
        possible_models = model_choices_2d
    
    # Pre-transform the data once so we don't redo for every hyperparam
    print(f"\n=== Applying transform: {transform_name} ===")
    X_train_transformed = []
    for i in range(len(X_train)):
        X_train_transformed.append(transform_func(X_train[i]))
    X_train_transformed = np.array(X_train_transformed)
    
    X_test_transformed = []
    for i in range(len(X_test)):
        X_test_transformed.append(transform_func(X_test[i]))
    X_test_transformed = np.array(X_test_transformed)
    
    # If transform_type='2d', figure out input shape for the model
    # e.g. if it's (16,16,1) or something
    if transform_type == '2d':
        shape_2d = X_train_transformed[0].shape  # e.g. (16,16,1)
    else:
        shape_1d = X_train_transformed[0].shape  # e.g. (256,) or (128,)

    for model_name in possible_models:
        for lr in learning_rates:
            for wd in weight_decays:
                for bs in batch_sizes:
                    print(f"--> transform={transform_name}, model={model_name}, lr={lr}, wd={wd}, bs={bs}")
                    
                    # Build the model
                    if transform_type == '1d':
                        input_dim = shape_1d[0] if len(shape_1d) == 1 else np.prod(shape_1d)
                        # In case shape_1d is (128,2), flatten
                        # But we've typically already flattened in transform...
                        # We'll assume it's 1D now. 
                        model_fn = model_registry_1d[model_name]
                        model = model_fn(input_dim=input_dim, lr=lr, wd=wd)
                    else:
                        # 2D
                        model_fn = model_registry_2d[model_name]
                        model = model_fn(input_shape=shape_2d, lr=lr, wd=wd)
                    
                    # Train
                    history = model.fit(
                        X_train_transformed, y_train,
                        validation_data=(X_test_transformed, y_test),
                        epochs=10,
                        batch_size=bs,
                        callbacks=[early_stopping],
                        verbose=0
                    )
                    
                    # Evaluate
                    val_loss, val_acc = model.evaluate(X_test_transformed, y_test, verbose=0)
                    y_pred_prob = model.predict(X_test_transformed, verbose=0)
                    y_pred = (y_pred_prob > 0.5).astype(int).flatten()
                    
                    results.append({
                        'transform': transform_name,
                        'model': model_name,
                        'lr': lr,
                        'weight_decay': wd,
                        'batch_size': bs,
                        'val_acc': val_acc,
                        'val_loss': val_loss
                    })

###############################################################################
# 7. Print a Table of Accuracies
###############################################################################
# Sort results by val_acc descending
results_sorted = sorted(results, key=lambda x: x['val_acc'], reverse=True)

print("\n================ Grid Search Results ================")
print(f"{'Transform':<12} {'Model':<10} {'LR':<8} {'WD':<8} {'BS':<5} {'Val_Acc':<8} {'Val_Loss'}")
print("-"*70)
for r in results_sorted:
    print(f"{r['transform']:<12} {r['model']:<10} {r['lr']:<8.0e} {r['weight_decay']:<8.0e} {r['batch_size']:<5} {r['val_acc']:<8.3f} {r['val_loss']:.3f}")

# You could also print only top 10, etc.



=== Applying transform: raw_iq ===
--> transform=raw_iq, model=mlp, lr=0.0001, wd=0.0001, bs=32


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)
I0000 00:00:1743191602.762889 1269837 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 22280 MB memory:  -> device: 0, name: NVIDIA A30, pci bus id: 0000:4a:00.0, compute capability: 8.0
I0000 00:00:1743191603.891976 1270112 service.cc:148] XLA service 0x15533801d4a0 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
I0000 00:00:1743191603.891997 1270112 service.cc:156]   StreamExecutor device (0): NVIDIA A30, Compute Capability 8.0
2025-03-28 15:53:23.911922: I tensorflow/compiler/mlir/tensorflow/utils/dump_mlir_util.cc:268] disabling MLIR crash reproducer, set env var `MLIR_CRASH_REPRODUCER_DIRECTORY` to enable.
I0000 00:00:1743191604.008709 1270112 cuda_dnn.cc:529] Loaded cuDNN version 90300
2025-03-28 15:53:24.270355: W external/local_xla/xla/service/gpu/nvptx_compiler.cc:930] The NVIDIA driver's CUDA version is 12.4 which is older than th

--> transform=raw_iq, model=mlp, lr=1e-05, wd=0.0001, bs=32
--> transform=raw_iq, model=cnn_1d, lr=0.0001, wd=0.0001, bs=32


  super().__init__(**kwargs)


--> transform=raw_iq, model=cnn_1d, lr=1e-05, wd=0.0001, bs=32
--> transform=raw_iq, model=lstm_1d, lr=0.0001, wd=0.0001, bs=32
--> transform=raw_iq, model=lstm_1d, lr=1e-05, wd=0.0001, bs=32

=== Applying transform: fft_mag ===
--> transform=fft_mag, model=mlp, lr=0.0001, wd=0.0001, bs=32
--> transform=fft_mag, model=mlp, lr=1e-05, wd=0.0001, bs=32
--> transform=fft_mag, model=cnn_1d, lr=0.0001, wd=0.0001, bs=32
--> transform=fft_mag, model=cnn_1d, lr=1e-05, wd=0.0001, bs=32
--> transform=fft_mag, model=lstm_1d, lr=0.0001, wd=0.0001, bs=32
--> transform=fft_mag, model=lstm_1d, lr=1e-05, wd=0.0001, bs=32

=== Applying transform: phase_diff ===
--> transform=phase_diff, model=mlp, lr=0.0001, wd=0.0001, bs=32
--> transform=phase_diff, model=mlp, lr=1e-05, wd=0.0001, bs=32
--> transform=phase_diff, model=cnn_1d, lr=0.0001, wd=0.0001, bs=32
--> transform=phase_diff, model=cnn_1d, lr=1e-05, wd=0.0001, bs=32
--> transform=phase_diff, model=lstm_1d, lr=0.0001, wd=0.0001, bs=32
--> transform=p

ValueError: cannot reshape array of size 128 into shape (16,16)

In [None]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

############################################
# 1. Prepare the Binary Dataset (WBFM vs. AM-DSB)
############################################
# Assume X_all, y_all, and label_encoder are already defined.
# For example, label_encoder.classes_ might be:
# ['8PSK','AM-DSB','AM-SSB','BPSK','CPFSK','GFSK','PAM4','QAM16','QAM64','QPSK','WBFM']
am_dsb_label = label_encoder.transform(['AM-DSB'])[0]
wbfm_label   = label_encoder.transform(['WBFM'])[0]

# Filter to only these two classes.
mask = (y_all == am_dsb_label) | (y_all == wbfm_label)
X_2c = X_all[mask]
y_2c = y_all[mask]
# Convert labels to binary: 0 for AM-DSB, 1 for WBFM.
y_2c_binary = np.where(y_2c == wbfm_label, 1, 0)

# Split into train and test sets.
X_train, X_test, y_train, y_test = train_test_split(
    X_2c, y_2c_binary, test_size=0.2, random_state=42, shuffle=True
)

# For FFT processing we use only the IQ channels (first 2 columns).
X_train_iq = X_train[..., :2]  # shape: (num_samples, 128, 2)
X_test_iq  = X_test[..., :2]

############################################
# 2. Group 20 Successive Signals
############################################
group_size = 20

def group_signals(X, y, group_size):
    """
    Group every group_size signals into one sample.
    Returns new X with shape (num_groups, group_size, 128, channels)
    and new y with shape (num_groups,) (using the label of the first signal).
    """
    n_groups = X.shape[0] // group_size
    X_grouped = X[:n_groups*group_size].reshape(n_groups, group_size, X.shape[1], X.shape[2])
    y_grouped = y[:n_groups*group_size].reshape(n_groups, group_size)[:,0]
    return X_grouped, y_grouped

X_train_grouped, y_train_grouped = group_signals(X_train_iq, y_train, group_size)
X_test_grouped, y_test_grouped   = group_signals(X_test_iq, y_test, group_size)

############################################
# 3. Compute FFT on Cubed IQ Data
############################################
def compute_fft_for_signal_cubed(signal, n_fft=128):
    """
    1) Cube the IQ data: [I^3, Q^3]
    2) Form a complex signal
    3) Compute FFT (length n_fft)
    4) Return log-magnitude as 1D array (n_fft,)
    """
    # shape: (128,2)
    cubed = signal**3
    # Create a complex signal from the cubed IQ
    complex_signal = cubed[:,0] + 1j * cubed[:,1]
    fft_result = np.fft.fft(complex_signal, n=n_fft)
    mag = np.abs(fft_result)
    log_mag = np.log(mag + 1e-6)
    return log_mag  # shape: (n_fft,)

def transform_group_cubed(group, n_fft=128):
    """
    For a group of 20 signals (shape: (20, 128, 2)), compute cubed-FFT log-magnitude 
    for each signal and vertically stack them.
    Returns an array of shape (20, n_fft).
    """
    fft_list = []
    for signal in group:
        fft_list.append(compute_fft_for_signal_cubed(signal, n_fft))
    stacked = np.stack(fft_list, axis=0)  # shape: (20, n_fft)
    return stacked

def transform_dataset_cubed(X_grouped, n_fft=128):
    """
    Apply transform_group_cubed to each group, add a channel dim => shape: (20, n_fft, 1)
    """
    transformed = []
    for group in X_grouped:
        fft_image = transform_group_cubed(group, n_fft=n_fft)
        fft_image = np.expand_dims(fft_image, axis=-1)  # (20, n_fft, 1)
        transformed.append(fft_image)
    return np.array(transformed, dtype=np.float32)

print("Transforming training groups (cubed IQ) ...")
X_train_trans = transform_dataset_cubed(X_train_grouped, n_fft=128)
print("Transforming test groups (cubed IQ) ...")
X_test_trans  = transform_dataset_cubed(X_test_grouped, n_fft=128)

print("Transformed training shape:", X_train_trans.shape)  # e.g. (num_groups, 20, 128, 1)
print("Transformed test shape:", X_test_trans.shape)

############################################
# 4. Build a CNN Model for Group FFT Images
############################################
def build_group_cnn(input_shape=(20,128,1), lr=1e-4):
    model = Sequential([
        Conv2D(32, kernel_size=(3,3), activation='relu', padding='same', input_shape=input_shape),
        MaxPooling2D(pool_size=(2,2)),
        Dropout(0.3),
        Conv2D(64, kernel_size=(3,3), activation='relu', padding='same'),
        MaxPooling2D(pool_size=(2,2)),
        Dropout(0.3),
        Flatten(),
        Dense(64, activation='relu'),
        Dropout(0.3),
        Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer=Adam(lr), loss='binary_crossentropy', metrics=['accuracy'])
    return model

group_cnn_cubed = build_group_cnn(input_shape=(20,128,1), lr=1e-4)
group_cnn_cubed.summary()

############################################
# 5. Train the CNN Model
############################################
history_cubed = group_cnn_cubed.fit(
    X_train_trans, y_train_grouped,
    validation_data=(X_test_trans, y_test_grouped),
    epochs=20,
    batch_size=32,
    verbose=1
)

############################################
# 6. Evaluate and Plot a Confusion Matrix
############################################
y_pred_prob = group_cnn_cubed.predict(X_test_trans)
y_pred = (y_pred_prob > 0.5).astype(int).flatten()

cm = confusion_matrix(y_test_grouped, y_pred)
plt.figure(figsize=(6,5))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=['AM-DSB', 'WBFM'],
            yticklabels=['AM-DSB', 'WBFM'])
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Confusion Matrix (Grouped FFT CNN - Cubed IQ)")
plt.show()

print(classification_report(y_test_grouped, y_pred, target_names=['AM-DSB', 'WBFM']))

############################################
# 7. Summarize the Result in a Table
############################################
val_loss, val_acc = group_cnn_cubed.evaluate(X_test_trans, y_test_grouped, verbose=0)
results_table = pd.DataFrame({
    'Model': ['Group_CNN_Cubed'],
    'Group Size': [20],
    'FFT Length': [128],
    'Val_Accuracy': [val_acc],
    'Val_Loss': [val_loss]
})

print("\n===== Summary of Results (Cubed IQ) =====")
print(results_table)


### 