In [3]:
import wfdb
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from keras import layers, models
import tensorflow as tf

import os
import sklearn

2025-05-08 08:12:32.470406: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-05-08 08:12:32.549657: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-05-08 08:12:32.567130: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-05-08 08:12:32.571558: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-05-08 08:12:32.626493: I tensorflow/core/platform/cpu_feature_guar

# Helper functions

### Metrics

In [4]:
import tensorflow.keras.backend as K

def sensitivity(y_true, y_pred):
  y_pred_classes = K.argmax(y_pred, axis=-1)
  true_positives = K.sum(K.cast(y_pred_classes[y_true == 1] == 1, 'float32'))
  possible_positives = K.sum(K.cast(y_true == 1, 'float32'))
  return true_positives / (possible_positives + K.epsilon())

def specificity(y_true, y_pred):
  y_pred_classes = K.argmax(y_pred, axis=-1)
  true_negatives = K.sum(K.cast(y_pred_classes[y_true == 0] == 0, 'float32'))
  possible_negatives = K.sum(K.cast(y_true == 0, 'float32'))
  return true_negatives / (possible_negatives + K.epsilon())

from tensorflow.keras.metrics import F1Score

f1_metric = F1Score()

from tensorflow.keras.metrics import SparseCategoricalAccuracy

accuracy = SparseCategoricalAccuracy(name='accuracy')

2025-05-08 08:12:35.203363: E external/local_xla/xla/stream_executor/cuda/cuda_driver.cc:266] failed call to cuInit: CUDA_ERROR_UNKNOWN: unknown error
2025-05-08 08:12:35.203657: I external/local_xla/xla/stream_executor/cuda/cuda_diagnostics.cc:135] retrieving CUDA diagnostic information for host: nihad-IdeaPad-3-15ITL6
2025-05-08 08:12:35.203660: I external/local_xla/xla/stream_executor/cuda/cuda_diagnostics.cc:142] hostname: nihad-IdeaPad-3-15ITL6
2025-05-08 08:12:35.203802: I external/local_xla/xla/stream_executor/cuda/cuda_diagnostics.cc:166] libcuda reported version is: 550.144.3
2025-05-08 08:12:35.203812: I external/local_xla/xla/stream_executor/cuda/cuda_diagnostics.cc:170] kernel reported version is: 550.120.0
2025-05-08 08:12:35.203815: E external/local_xla/xla/stream_executor/cuda/cuda_diagnostics.cc:252] kernel version 550.120.0 does not match DSO version 550.144.3 -- cannot find working devices in this configuration


### Data loading

In [5]:
def load_signals():

    path = "../data/training2017/"

    records = os.listdir(path)

    records = [record for record in records if record.endswith('.mat')]

    records = [os.path.splitext(record)[0] for record in records]

    records.sort()

    signals = [wfdb.rdrecord(record_name=path+record).p_signal for record in records]

    fs = wfdb.rdrecord(record_name=path+records[0]).fs

    signals_flat = []
    for signal in signals:
        signal_flat = [sample[0] for sample in signal]
        signal_flat = np.array(signal_flat)
        signals_flat.append(signal_flat)

    signals_flat = np.asarray(signals_flat, dtype=np.ndarray)
    
    return signals_flat

signals_flat = load_signals()

### EDA

In [25]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import periodogram

def plot_psd_heatmap(signals, fs=300):
    """
    Plots a power spectral density (PSD) heatmap using periodogram.
    
    Parameters:
        signals (array-like): List or array of 1D signals (same length).
        fs (float): Sampling frequency in Hz.
    """
    signals = np.array(signals.tolist(), dtype=np.float32)  # convert object array to float
    num_signals = len(signals)
    
    # Compute PSD for each signal
    freqs, psd = periodogram(signals, fs=fs, axis=1)

    # Convert to dB scale (optional but common)
    psd_db = 10 * np.log10(psd + 1e-12)  # avoid log(0)
    #psd_db = psd
    
    # Plot
    plt.figure(figsize=(12, 6))
    plt.imshow(psd_db, aspect='auto', origin='lower',
               extent=[freqs[0], freqs[-1], 0, num_signals],
               cmap='viridis')
    plt.colorbar(label='Power Spectral Density (dB/Hz)')
    plt.xlabel('Frequency (Hz)')
    plt.ylabel('Signal Index')
    plt.title('PSD Heatmap using Periodogram')
    plt.tight_layout()
    plt.show()

### Modifying signal length

In [7]:
def cut_signals_shortest(signals_flat):
     # Get the minimum length across all signals
    lengths = [len(signal) for signal in signals_flat]
    min_length = min(lengths)
    
    # Create the middle slices for each signal
    signals_middle = []
    for signal in signals_flat:
        signal_length = len(signal)
        
        # Calculate the start and end indices for the middle of each signal
        start = (signal_length // 2 - min_length // 2)
        end = start + min_length
        
        # Slice the signal to keep the middle part
        signals_middle.append(signal[start:end])
    
    # Convert the list of signals to a NumPy array
    signals_middle = np.array(signals_middle)
    
    return signals_middle

In [62]:
def extract_center_5s(signals, target_length=600):
    """
    Extract central 5 seconds (600 samples at 120Hz) from each signal.
    If signal is shorter than 600, pad with zeros (centered).
    
    Parameters:
        signals: array-like of 1D numpy arrays (downsampled ECG signals)
        target_length: number of samples to extract (default is 600 for 5s at 120Hz)
        
    Returns:
        np.ndarray of shape (num_signals, target_length)
    """
    processed = []

    for signal in signals:
        sig_len = len(signal)

        if sig_len >= target_length:
            start = (sig_len - target_length) // 2
            segment = signal[start:start + target_length]
        else:
            pad_len = target_length - sig_len
            pad_left = pad_len // 2
            pad_right = pad_len - pad_left
            segment = np.pad(signal, (pad_left, pad_right), mode='constant')

        processed.append(segment)

    return np.array(processed, dtype=np.float32)

In [8]:
# Per-signal z-score normalization (recommended for ECG)
# signals_short = (signals_short - signals_short.mean(axis=1, keepdims=True)) / (signals_short.std(axis=1, keepdims=True) + 1e-8)

### Loading labels

In [9]:
def load_labels():

    data = pd.read_csv(filepath_or_buffer="../data/training2017/REFERENCE-v3.csv", names=["signal_name", "label"])

    data["label"] = data["label"].astype("category").cat.codes

    data = data.drop(["signal_name"], axis=1)

    y = np.array(data["label"])
    
    return y

### Removing extra classes

In [10]:
def remove_extra_classes(X, y):

    classes_to_remove = [2, 3]

    classes, counts = np.unique(y, return_counts=True)

    # Display results
    print("Before removing \n")
    for c, count in zip(classes, counts):
        print(f"Class {c}: {count} samples")


    mask = ~np.isin(y, classes_to_remove)

    X = X[mask]
    y = y[mask]

    classes, counts = np.unique(y, return_counts=True)

    print("\nAfter removing \n")
    for c, count in zip(classes, counts):
        print(f"Class {c}: {count} samples")
        
    return X, y

### Normalisation (-1,1)

In [11]:
def normalize_to_minus1_1(signals):
    norm_signals = []
    for x in signals:
        x_min = np.min(x)
        x_max = np.max(x)
        if x_max != x_min:
            x_norm = 2 * (x - x_min) / (x_max - x_min) - 1
        else:
            x_norm = np.zeros_like(x)  # Avoid division by zero
        norm_signals.append(x_norm)
    return np.array(norm_signals, dtype=object)


### Oversampling

In [12]:
from imblearn.over_sampling import RandomOverSampler

In [13]:
from sklearn.utils.class_weight import compute_class_weight

def get_class_weights(y):
    # Convert one-hot labels to integers
    y_train_labels = np.argmax(y_train, axis=1)

    # Compute class weights
    class_weights = compute_class_weight('balanced', classes=np.unique(y_train_labels), y=y_train_labels)
    class_weight_dict = dict(enumerate(class_weights))
    
    return class_weight_dict

### Models and validation

In [14]:
from tensorflow.keras.utils import to_categorical

In [15]:
def make_model_conv1D(input_shape):
    input_layer = layers.Input(shape=(input_shape, 1))

    # Block 1
    x = layers.Conv1D(32, kernel_size=7, padding="same")(input_layer)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.MaxPooling1D(pool_size=2)(x)

    # Block 2
    x = layers.Conv1D(64, kernel_size=5, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.MaxPooling1D(pool_size=2)(x)

    # Block 3
    x = layers.Conv1D(128, kernel_size=3, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.GlobalAveragePooling1D()(x)

    # Optional: Dropout for regularization
    x = layers.Dropout(0.5)(x)

    output_layer = layers.Dense(4, activation="softmax")(x)

    return models.Model(inputs=input_layer, outputs=output_layer)

In [16]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, ReLU, Input, Softmax

def build_custom_vgg_1d(input_length=3000, num_classes=4):
    model = Sequential()
    model.add(Input(shape=(input_length, 1)))  # 1D signal input

    # Layer 1
    model.add(Conv1D(64, kernel_size=3, padding='same'))
    model.add(ReLU())
    model.add(Conv1D(64, kernel_size=3, padding='same'))
    model.add(ReLU())
    model.add(MaxPooling1D(pool_size=3, strides=3))

    # Layer 2
    model.add(Conv1D(128, kernel_size=3, padding='same'))
    model.add(ReLU())
    model.add(Conv1D(128, kernel_size=3, padding='same'))
    model.add(ReLU())
    model.add(MaxPooling1D(pool_size=3, strides=3))

    # Layer 3
    model.add(Conv1D(256, kernel_size=3, padding='same'))
    model.add(ReLU())
    model.add(Conv1D(256, kernel_size=3, padding='same'))
    model.add(ReLU())
    model.add(Conv1D(256, kernel_size=3, padding='same'))
    model.add(ReLU())
    model.add(MaxPooling1D(pool_size=2, strides=2))

    # Layer 4
    model.add(Conv1D(512, kernel_size=3, padding='same'))
    model.add(ReLU())
    model.add(Conv1D(512, kernel_size=3, padding='same'))
    model.add(ReLU())
    model.add(Conv1D(512, kernel_size=3, padding='same'))
    model.add(ReLU())
    model.add(MaxPooling1D(pool_size=2, strides=2))

    # Layer 5
    model.add(Conv1D(512, kernel_size=3, padding='same'))
    model.add(ReLU())
    model.add(Conv1D(512, kernel_size=3, padding='same'))
    model.add(ReLU())
    model.add(Conv1D(512, kernel_size=3, padding='same'))
    model.add(ReLU())
    model.add(MaxPooling1D(pool_size=2, strides=2))

    model.add(Flatten())

    # Layer 6, 7, 8: Fully connected layers
    model.add(Dense(1024, activation='relu'))  # Layer 6
    model.add(Dense(1024, activation='relu'))  # Layer 7
    model.add(Dense(256, activation='relu'))   # Layer 8

    # Layer 9: Output softmax
    model.add(Dense(num_classes, activation='softmax'))

    return model

In [57]:
from tensorflow.keras.layers import Concatenate, BatchNormalization
from tensorflow.keras.models import Model

def conv_block(x, filters, kernel_size, blocks, pool_stride):
    for _ in range(blocks):
        x = Conv1D(filters, kernel_size, padding='same')(x)
        x = BatchNormalization()(x)
        x = ReLU()(x)
    x = MaxPooling1D(pool_size=2, strides=pool_stride, padding='same')(x)
    return x

def build_dual_stream_model(input_length=3000, num_classes=4):
    input_layer = Input(shape=(input_length, 1))

    # Stream 1 (kernel size = 3)
    s1 = conv_block(input_layer, 64, 3, 2, 3)    # Layer 1
    s1 = conv_block(s1, 128, 3, 2, 3)            # Layer 2
    s1 = conv_block(s1, 256, 3, 3, 2)            # Layer 3
    s1 = conv_block(s1, 512, 3, 3, 2)            # Layer 4
    s1 = conv_block(s1, 512, 3, 3, 2)            # Layer 5
    s1 = Flatten()(s1)

    # Stream 2 (kernel size = 5 for first two layers, then same as stream 1)
    s2 = conv_block(input_layer, 64, 5, 2, 3)    # Layer 1
    s2 = conv_block(s2, 128, 5, 2, 3)            # Layer 2
    s2 = conv_block(s2, 256, 3, 3, 2)            # Layer 3
    s2 = conv_block(s2, 512, 3, 3, 2)            # Layer 4
    s2 = conv_block(s2, 512, 3, 3, 2)            # Layer 5
    s2 = Flatten()(s2)

    # Merge
    merged = Concatenate()([s1, s2])

    # Fully connected layers
    x = Dense(1024, activation='relu')(merged)
    x = Dense(1024, activation='relu')(x)
    x = Dense(256, activation='relu')(x)
    output = Dense(num_classes, activation='softmax')(x)

    model = Model(inputs=input_layer, outputs=output)
    return model

In [17]:
from sklearn.model_selection import KFold

def run_kfold_cross_validation(X, y, build_model_fn, num_folds=10, epochs=10, batch_size=32):
    """
    Run k-fold cross-validation for a Keras model.

    Parameters:
        X (np.ndarray): Input features of shape (samples, length).
        y (np.ndarray): Labels of shape (samples,) or one-hot encoded.
        build_model_fn (function): A function that returns a compiled Keras model.
        num_folds (int): Number of folds for cross-validation.
        epochs (int): Training epochs per fold.
        batch_size (int): Training batch size.
        one_hot (bool): If True, converts labels to one-hot encoding.

    Returns:
        List of validation accuracies per fold.
    """
    kf = KFold(n_splits=num_folds, shuffle=True, random_state=42)
    val_accuracies = []

    for fold, (train_idx, val_idx) in enumerate(kf.split(X), 1):
        print(f"\n📘 Fold {fold}/{num_folds}")

        X_train, X_val = X[train_idx], X[val_idx]
        y_train, y_val = y[train_idx], y[val_idx]

        model = build_model_fn()

        model.fit(X_train, y_train,
                  validation_data=(X_val, y_val),
                  epochs=epochs,
                  batch_size=batch_size,
                  verbose=0)

        val_acc = model.evaluate(X_val, y_val, verbose=0)[1]
        val_accuracies.append(val_acc)
        print(f"✅ Fold {fold} Accuracy: {val_acc:.4f}")

    avg_accuracy = np.mean(val_accuracies)
    print(f"\n🎯 Average Validation Accuracy: {avg_accuracy:.4f}")
    return val_accuracies

### Evaluation

In [18]:
import numpy as np
from sklearn.metrics import confusion_matrix, accuracy_score

def evaluate_model(model, X_test, y_test, one_hot_labels=True):
    """
    Evaluates a Keras model on a test set and prints:
    - Confusion matrix
    - Per-class sensitivity and specificity
    - Overall accuracy

    Parameters:
    - model: Trained Keras model
    - X_test: Test input data (NumPy array)
    - y_test: True labels (one-hot encoded or integer)
    - one_hot_labels: Set to True if y_test is one-hot encoded
    """
    
    # Convert predictions and labels to class indices
    y_pred_probs = model.predict(X_test)
    y_pred = np.argmax(y_pred_probs, axis=1)
    
    if one_hot_labels:
        y_true = np.argmax(y_test, axis=1)
    else:
        y_true = y_test

    # Confusion matrix
    cm = confusion_matrix(y_true, y_pred)
    print("Confusion Matrix:\n", cm)

    # Accuracy
    accuracy = accuracy_score(y_true, y_pred)
    print(f"\nOverall Accuracy: {accuracy:.4f}")

    # Sensitivity & Specificity
    num_classes = cm.shape[0]
    sensitivity = []
    specificity = []

    for i in range(num_classes):
        TP = cm[i, i]
        FN = np.sum(cm[i, :]) - TP
        FP = np.sum(cm[:, i]) - TP
        TN = np.sum(cm) - (TP + FN + FP)

        sens = TP / (TP + FN) if (TP + FN) != 0 else 0
        spec = TN / (TN + FP) if (TN + FP) != 0 else 0

        sensitivity.append(sens)
        specificity.append(spec)

        print(f"Class {i}: Sensitivity = {sens:.4f}, Specificity = {spec:.4f}")

## Preprocessing

In [19]:
from scipy.signal import butter, filtfilt

def low_pass_filter(X, fs_original=300, cutoff_freq=60):
    # Design the low-pass filter (Butterworth filter)
    nyquist = 0.5 * fs_original  # Nyquist frequency
    normal_cutoff = cutoff_freq / nyquist  # Normalize the cutoff frequency
    
    # Create the low-pass filter
    b, a = butter(4, normal_cutoff, btype='low', analog=False)
    
    # Apply the low-pass filter to each signal (row) in X
    filtered_X = []
    for signal in X:
        # Apply the filter to each signal
        filtered_signal = filtfilt(b, a, signal)
        
        # Trim any extra edge artifacts (in case of filter edge effects)
        filtered_signal = filtered_signal[:len(signal)]
        
        filtered_X.append(filtered_signal)
    
    # Return as a list of arrays (each signal with its own length)
    return filtered_X

In [20]:
def downsample_signal(X, fs_original=300, fs_new=120):
    downsampled_X = []
    
    # Down-sample each signal (row) in X individually
    for signal in X:
        # Calculate the down-sample factor
        downsample_factor = fs_original // fs_new
        
        # Down-sample the signal based on the calculated factor
        downsampled_signal = signal[::downsample_factor]
        downsampled_X.append(downsampled_signal)
    
    # Return as a list of downsampled signals (each signal with its own length)
    return downsampled_X

In [21]:
def plot_fft(signal, fs=300):
    """
    Calculate and plot the FFT of a signal.
    
    Parameters:
    - signal: The input signal (1D array)
    - fs: The sampling frequency (default 300 Hz)
    """
    # Number of samples in the signal
    n = len(signal)
    
    # Compute the FFT
    fft_signal = np.fft.fft(signal)
    
    # Compute the frequencies corresponding to the FFT bins
    freqs = np.fft.fftfreq(n, 1/fs)
    
    # Take the absolute value of the FFT (magnitude)
    fft_magnitude = np.abs(fft_signal)
    
    # Only keep the positive half of the spectrum (real frequencies)
    positive_freqs = freqs[:n//2]
    positive_fft_magnitude = fft_magnitude[:n//2]
    
    # Plot the FFT magnitude vs frequency
    plt.figure(figsize=(10, 6))
    plt.plot(positive_freqs, positive_fft_magnitude)
    plt.title('FFT of Signal')
    plt.xlabel('Frequency (Hz)')
    plt.ylabel('Magnitude')
    plt.grid(True)
    plt.show()

# Execution

### Preprocessing

In [63]:
X = signals_flat
y = load_labels()

X, y = remove_extra_classes(X, y)

Before removing 

Class 0: 758 samples
Class 1: 5076 samples
Class 2: 2415 samples
Class 3: 279 samples

After removing 

Class 0: 758 samples
Class 1: 5076 samples


### Shortest

In [64]:
X_filtered = low_pass_filter(X=X)

X_downsampled = downsample_signal(X=X_filtered)

X_normalised = normalize_to_minus1_1(X_downsampled)

X_5s = extract_center_5s(signals=X_normalised, target_length=5*120)

X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(
    X_5s, y, test_size=0.2, random_state=42)

X_train, y_train = RandomOverSampler().fit_resample(X_train, y_train)

y_train = to_categorical(y_train, num_classes=2)
y_test = to_categorical(y_test, num_classes=2)

X_train = np.array(X_train.tolist(), dtype=np.float32)  
X_train = np.expand_dims(X_train, axis=-1)

X_test = np.array(X_test.tolist(), dtype=np.float32)  
X_test = np.expand_dims(X_test, axis=-1)

In [66]:
input_size = len(X_train[0])
epochs = 10
batch_size = 32

# model = make_model_conv1D(input_shape=input_size)
# model = build_custom_vgg_1d(input_length=input_size, num_classes=2)
model = build_dual_stream_model(input_length=input_size, num_classes=2)

model.compile(
    optimizer='adam',
    loss='categorical_crossentropy',
    metrics=['accuracy'] # f1_metric error
)

history = model.fit(
    X_train,
    y_train,
    batch_size=batch_size,
    epochs=epochs,
    validation_split=0.2,
    verbose=1,
)

Epoch 1/10
[1m204/204[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m85s[0m 373ms/step - accuracy: 0.5681 - loss: 1.5517 - val_accuracy: 0.0000e+00 - val_loss: 0.8343
Epoch 2/10
[1m204/204[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m77s[0m 379ms/step - accuracy: 0.6520 - loss: 0.6434 - val_accuracy: 0.6374 - val_loss: 0.7148
Epoch 3/10
[1m204/204[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m76s[0m 372ms/step - accuracy: 0.7652 - loss: 0.5007 - val_accuracy: 0.6883 - val_loss: 0.7487
Epoch 4/10
[1m204/204[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m76s[0m 371ms/step - accuracy: 0.8527 - loss: 0.3303 - val_accuracy: 0.6589 - val_loss: 0.7961
Epoch 5/10
[1m204/204[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m76s[0m 371ms/step - accuracy: 0.8871 - loss: 0.2563 - val_accuracy: 0.8798 - val_loss: 0.2876
Epoch 6/10
[1m204/204[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m78s[0m 380ms/step - accuracy: 0.9229 - loss: 0.1985 - val_accuracy: 0.9454 - val_loss: 0.1448
Epoch 

In [None]:
val_accs = run_kfold_cross_validation(X=X, y=y, build_model_fn=build_custom_vgg_1d, num_folds=10, epochs=10, batch_size=32)

In [67]:
evaluate_model(model=model, X_test=X_test, y_test=y_test)

[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 89ms/step
Confusion Matrix:
 [[120  45]
 [ 20 982]]

Overall Accuracy: 0.9443
Class 0: Sensitivity = 0.7273, Specificity = 0.9800
Class 1: Sensitivity = 0.9800, Specificity = 0.7273


### Shortest + class weights

In [24]:
class_weight_dict = get_class_weights(y)

input_size = 2714
epochs = 20
batch_size = 16

model = make_model_conv1D(input_shape=input_size)

model.compile(
    optimizer='adam',
    loss='binary_crossentropy',
    metrics=['accuracy'] # f1_metric error
)

history = model.fit(
    X_train,
    y_train,
    class_weight=class_weight_dict,
    batch_size=batch_size,
    epochs=epochs,
    validation_split=0.2,
    verbose=1,
)

Epoch 1/20
[1m196/196[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 50ms/step - accuracy: 0.4405 - loss: 0.4824 - val_accuracy: 0.8798 - val_loss: 0.3072
Epoch 2/20
[1m196/196[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 13ms/step - accuracy: 0.4848 - loss: 0.3759 - val_accuracy: 0.8798 - val_loss: 0.2770
Epoch 3/20
[1m196/196[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 13ms/step - accuracy: 0.5662 - loss: 0.3505 - val_accuracy: 0.8350 - val_loss: 0.3030
Epoch 4/20
[1m196/196[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 13ms/step - accuracy: 0.5530 - loss: 0.3487 - val_accuracy: 0.4974 - val_loss: 0.3510
Epoch 5/20
[1m196/196[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 13ms/step - accuracy: 0.5147 - loss: 0.3535 - val_accuracy: 0.3478 - val_loss: 0.4039
Epoch 6/20
[1m196/196[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 13ms/step - accuracy: 0.5785 - loss: 0.3354 - val_accuracy: 0.3632 - val_loss: 0.4045
Epoch 7/20
[1m196/19

KeyboardInterrupt: 

### 1D conv + zero padding

In [49]:
signals_flat_norm = normalize_to_minus1_1(signals_flat)

padded_signals_flat = tf.keras.utils.pad_sequences(signals_flat_norm, padding="post", dtype="float32")

X = padded_signals_flat
y = load_labels()

X, y = remove_extra_classes(X, y)
# X, y = RandomOverSampler().fit_resample(X, y)

X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(
    X, y, test_size=0.33, random_state=42)

y_train = to_categorical(y_train, num_classes=4)
y_test = to_categorical(y_test, num_classes=4)

Before removing 

Class 0: 758 samples
Class 1: 5076 samples
Class 2: 2415 samples
Class 3: 279 samples

After removing 

Class 0: 758 samples
Class 1: 5076 samples


In [50]:
input_size = len(padded_signals_flat[0])
epochs = 10
batch_size = 32

model = make_model_conv1D(input_shape=input_size)

model.compile(
    optimizer='adam',
    loss='binary_crossentropy',
    metrics=["accuracy"] # f1_metric error
)

history = model.fit(
    X_train,
    y_train,
    batch_size=batch_size,
    epochs=epochs,
    validation_split=0.2,
    verbose=1,
)

Epoch 1/10
[1m98/98[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m70s[0m 699ms/step - accuracy: 0.7725 - loss: 0.4102 - val_accuracy: 0.8798 - val_loss: 0.3163
Epoch 2/10
[1m98/98[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m68s[0m 692ms/step - accuracy: 0.8670 - loss: 0.2207 - val_accuracy: 0.8798 - val_loss: 0.2035
Epoch 3/10
[1m98/98[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m70s[0m 716ms/step - accuracy: 0.8672 - loss: 0.2061 - val_accuracy: 0.8798 - val_loss: 0.1942
Epoch 4/10
[1m98/98[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m68s[0m 692ms/step - accuracy: 0.8633 - loss: 0.2094 - val_accuracy: 0.8798 - val_loss: 0.1963
Epoch 5/10
[1m98/98[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m68s[0m 695ms/step - accuracy: 0.8639 - loss: 0.2049 - val_accuracy: 0.8798 - val_loss: 0.1901
Epoch 6/10
[1m98/98[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m67s[0m 686ms/step - accuracy: 0.8679 - loss: 0.2002 - val_accuracy: 0.8798 - val_loss: 0.1858
Epoch 7/10
[1m98/98[

In [51]:
evaluate_model(model=model, X_test=X_test, y_test=y_test)

[1m61/61[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 109ms/step
Confusion Matrix:
 [[   3  263]
 [   7 1653]]

Overall Accuracy: 0.8598
Class 0: Sensitivity = 0.0113, Specificity = 0.9958
Class 1: Sensitivity = 0.9958, Specificity = 0.0113


## Masking with 1D conv

In [109]:
import keras

In [110]:
class MaskedConv1D(keras.layers.Conv1D):
    def __init__(self, *args, **kwargs):
        super(MaskedConv1D, self).__init__(*args, **kwargs)
        self.supports_masking = True

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = tf.expand_dims(mask, axis=-1)
            inputs = inputs * tf.cast(mask, tf.float32)
        return super(MaskedConv1D, self).call(inputs)

In [111]:
def make_model_masked(input_shape):
    input_layer = keras.layers.Input(shape=(input_shape, 1))

    # Apply masking to identify padded values (assuming padding value is 0)
    masked_input = keras.layers.Masking(mask_value=0.0)(input_layer)

    # Use the custom MaskedConv1D layer
    conv1 = MaskedConv1D(filters=16, kernel_size=13, padding="same")(masked_input)
    conv1 = keras.layers.BatchNormalization()(conv1)
    conv1 = keras.layers.ReLU()(conv1)

    gap = keras.layers.GlobalAveragePooling1D()(conv1)

    output_layer = keras.layers.Dense(4, activation="softmax")(gap)

    return keras.models.Model(inputs=input_layer, outputs=output_layer)

In [112]:
padded_signals_flat = tf.keras.utils.pad_sequences(signals_flat, padding="post")

In [113]:
max_length = max(len(arr) for arr in signals_flat)

In [115]:
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(
    padded_signals_flat, y, test_size=0.33, random_state=42)

y_train = to_categorical(y_train, num_classes=4)
y_test = to_categorical(y_test, num_classes=4)

In [119]:
input_size = max_length
model = make_model_masked(input_shape=input_size)

model.compile(
    optimizer="adam",
    loss="categorical_crossentropy",
    metrics=["accuracy"],
)

epochs = 20
batch_size = 16

history = model.fit(
    X_train,
    y_train,
    batch_size=batch_size,
    epochs=epochs,
    validation_split=0.2,
    verbose=1,
)

Epoch 1/20
[1m286/286[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m31s[0m 101ms/step - accuracy: 0.0948 - loss: nan - val_accuracy: 0.0892 - val_loss: nan
Epoch 2/20
[1m137/286[0m [32m━━━━━━━━━[0m[37m━━━━━━━━━━━[0m [1m14s[0m 95ms/step - accuracy: 0.0909 - loss: nan

KeyboardInterrupt: 

## LSTM

### with masking

In [120]:
input_shape = (max_length, 1)

In [121]:
def create_model_with_masking(input_shape):
    inputs = layers.Input(shape=input_shape)
    
    # Masking layer to handle padded values (assuming padding value is 0)
    masked_inputs = layers.Masking(mask_value=0.0)(inputs)
    
    # LSTM layer
    lstm_out = layers.LSTM(64, return_sequences=False)(masked_inputs)
    
    # Dense output layer
    outputs = layers.Dense(4, activation='softmax')(lstm_out)
    
    model = models.Model(inputs, outputs)
    return model

In [122]:
model_with_masking = create_model_with_masking(input_shape)


model_with_masking.compile(
    optimizer="adam",
    loss="categorical_crossentropy",
    metrics=["accuracy"],
)

# Train the model
model_with_masking.fit(X_train, y_train, epochs=10, batch_size=32) 

Epoch 1/10


2025-05-04 18:17:06.803660: E tensorflow/core/util/util.cc:131] oneDNN supports DT_BOOL only on platforms with AVX-512. Falling back to the default Eigen-based implementation if present.


[1m 15/179[0m [32m━[0m[37m━━━━━━━━━━━━━━━━━━━[0m [1m25:59[0m 10s/step - accuracy: 0.3955 - loss: 1.3753

KeyboardInterrupt: 

### without masking

In [123]:
def create_model_without_masking(input_shape):
    inputs = layers.Input(shape=input_shape)
    
    # LSTM layer
    lstm_out = layers.LSTM(64, return_sequences=False)(inputs)
    
    # Dense output layer
    outputs = layers.Dense(4, activation='softmax')(lstm_out)
    
    model = models.Model(inputs, outputs)
    return model

In [124]:
model_without_masking = create_model_without_masking(input_shape)


model_without_masking.compile(
    optimizer="adam",
    loss="categorical_crossentropy",
    metrics=["accuracy"],
)

# Train the model
model_without_masking.fit(X_train, y_train, epochs=10, batch_size=32) 

Epoch 1/10
[1m179/179[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1386s[0m 8s/step - accuracy: 0.5612 - loss: 1.1012
Epoch 2/10
[1m179/179[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m42623s[0m 239s/step - accuracy: 0.5951 - loss: 0.9894
Epoch 3/10
[1m175/179[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m31s[0m 8s/step - accuracy: 0.5929 - loss: 0.9877

KeyboardInterrupt: 

In [72]:
import numpy as np
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow.keras.metrics import SparseCategoricalAccuracy

# Assuming 'create_model_with_masking' is defined as in the previous message

# Generate synthetic training data
num_samples = 1000
max_sequence_length = 50
num_features = 1  # Single feature per timestep
num_classes = 4  # Number of output classes

# Random sequences with varying lengths
X_train = [np.random.rand(np.random.randint(1, max_sequence_length + 1), num_features) for _ in range(num_samples)]
y_train = np.random.randint(0, num_classes, size=num_samples)

# Pad sequences to ensure uniform input size
X_train_padded = np.array([np.pad(seq, ((0, max_sequence_length - len(seq)), (0, 0)), mode='constant') if len(seq) < max_sequence_length else seq[:max_sequence_length] for seq in X_train])

# Define the model
input_shape = (max_sequence_length, num_features)
model_with_masking = create_model_with_masking(input_shape)

# Compile the model
model_with_masking.compile(
    optimizer=Adam(),
    loss=SparseCategoricalCrossentropy(),
    metrics=[SparseCategoricalAccuracy()]
)

# Train the model
model_with_masking.fit(X_train_padded, y_train, epochs=10, batch_size=32)

Epoch 1/10
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 10ms/step - loss: 1.3877 - sparse_categorical_accuracy: 0.2661
Epoch 2/10
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step - loss: 1.3864 - sparse_categorical_accuracy: 0.2737
Epoch 3/10
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step - loss: 1.3860 - sparse_categorical_accuracy: 0.2548
Epoch 4/10
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step - loss: 1.3854 - sparse_categorical_accuracy: 0.2684
Epoch 5/10
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step - loss: 1.3848 - sparse_categorical_accuracy: 0.2725
Epoch 6/10
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step - loss: 1.3859 - sparse_categorical_accuracy: 0.2677
Epoch 7/10
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step - loss: 1.3842 - sparse_categorical_accuracy: 0.2750
Epoch 8/10
[1m32/32[0m [32m━━━━

<keras.src.callbacks.history.History at 0x7c36fc0b3500>