# **1D CNN**

In [2]:
from google.colab import drive
drive.mount('/content/drive')
zip_path = '/content/drive/MyDrive/semester_project_2.3/mit-bih-atrial-fibrillation-database-1.0.0.zip'
!cp "{zip_path}" /content/afdb.zip
!mkdir -p afdb_data
!unzip -q /content/afdb.zip -d afdb_data
!pip install wfdb

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
replace afdb_data/files/08455.hea? [y]es, [n]o, [A]ll, [N]one, [r]ename: N


In [3]:
import os
import wfdb
import numpy as np
import matplotlib.pyplot as plt
from collections import Counter
from scipy.signal import butter, filtfilt
from tqdm import tqdm
from sklearn.utils.class_weight import compute_class_weight
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, BatchNormalization, MaxPooling1D, SpatialDropout1D, GlobalAveragePooling1D, Dropout, Dense, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from sklearn.metrics import confusion_matrix, classification_report

## Find usable patients
Not all of our patients are usable for classification. Some are missing their
.dat files and others include AFL or J labels

Let's clear them up

In [4]:
# locate all patients with ecg signals (.dat files)
data_dir = "/content/afdb_data/files"
all_files = os.listdir(data_dir)
dat_files = [f for f in all_files if f.endswith('.dat')]

# patients with .dat files
dat_patients = sorted([f.replace('.dat', '') for f in dat_files])

# all unique patient IDs in the folder
all_patients = sorted(set(f.split('.')[0] for f in all_files))
# exclude patients with no .dat file
excluded_patients = sorted([p for p in all_patients if p not in dat_patients])
# remove non-patient entries
non_patient_entries = ['ANNOTATORS', 'RECORDS', 'SHA256SUMS', 'notes', 'old']
excluded_patients = [p for p in excluded_patients if p not in non_patient_entries]

# print results
print(f"Patients with .dat file: {len(dat_patients)}")
print(dat_patients)
print(f"\nPatients without .dat file: {len(excluded_patients)}")
print(excluded_patients)

Patients with .dat file: 23
['04015', '04043', '04048', '04126', '04746', '04908', '04936', '05091', '05121', '05261', '06426', '06453', '06995', '07162', '07859', '07879', '07910', '08215', '08219', '08378', '08405', '08434', '08455']

Patients without .dat file: 2
['00735', '03665']


In [5]:
# make a list of patients strictly with AFib/N signals
included_patients = []

for patient in dat_patients:
    record_path = os.path.join(data_dir, patient)
    try:
        ann = wfdb.rdann(record_path, 'atr')
        aux_notes = set(ann.aux_note)

        if aux_notes.issubset({'(AFIB', '(N'}):
            included_patients.append(patient)
        else:
            excluded_patients.append(patient)

    except Exception as e:
        excluded_patients.append(patient)

# upon inspection found very noisy signal
included_patients.remove('07859')
excluded_patients.append('07859')
included_patients.remove('08405')
excluded_patients.append('08405')
included_patients.remove('08434')
excluded_patients.append('08434')

# print results
print(f"Included patients (AFIB/N only): {len(included_patients)}")
print(included_patients)
print(f"\nExcluded patients: {len(excluded_patients)}")
print(excluded_patients)

Included patients (AFIB/N only): 10
['04015', '04048', '04126', '04746', '05091', '05261', '06453', '07162', '08219', '08455']

Excluded patients: 15
['00735', '03665', '04043', '04908', '04936', '05121', '06426', '06995', '07879', '07910', '08215', '08378', '07859', '08405', '08434']


### We have a list of all usable patients in "included_patients". Let's check if they compose a balanced dataset

In [7]:
# count AFib vs N samples across all included patients
afib_count = 0
normal_count = 0

for patient in included_patients:
    record_path = os.path.join(data_dir, patient)
    record = wfdb.rdrecord(record_path)
    signal = record.p_signal[:, 0]
    ann = wfdb.rdann(record_path, 'atr')

    # create a mask of the same length as signal
    mask = np.zeros(len(signal), dtype=int)  # 0 = N, 1 = AFib

    current_label = 0
    for i in range(len(ann.sample) - 1):
        note = ann.aux_note[i]
        if note == '(AFIB':
            current_label = 1
        elif note == '(N':
            current_label = 0
        mask[ann.sample[i]:ann.sample[i+1]] = current_label
    # fill the remainder
    mask[ann.sample[-1]:] = current_label

    afib_count += np.sum(mask == 1)
    normal_count += np.sum(mask == 0)

# print results
total = afib_count + normal_count
print(f"AFib samples: {afib_count} ({afib_count/total*100:.2f}%)")
print(f"N samples: {normal_count} ({normal_count/total*100:.2f}%)")
print(f"Total annotated samples: {total}")

AFib samples: 30087575 (33.00%)
N samples: 61089265 (67.00%)
Total annotated samples: 91176840


##### Dataset is imbalanced, we'll need to account for this later

## Splitting patients into train/val/test sets
We create a function to label windows of desired size, using a predetermined frequency

In [8]:
# --------------------------------------------------------------
# function that extracts labeled windows from a list of patients
# --------------------------------------------------------------

def extract_labeled_windows(patients, data_dir, window_sec, fs=250):
    window_samples = window_sec * fs
    X = []
    y = []

    for patient in patients:
        try:
            record_path = os.path.join(data_dir, patient)
            record = wfdb.rdrecord(record_path)
            signal = record.p_signal[:, 0] # use first channel
            ann = wfdb.rdann(record_path, 'atr')

            # create mask of the same length as the signal
            mask = np.zeros(len(signal), dtype=int) # 0 = N, 1 = AFib
            current_label = 0
            for i in range(len(ann.sample) - 1):
                note = ann.aux_note[i]
                if note == '(AFIB':
                    current_label = 1
                elif note == '(N':
                    current_label = 0
                mask[ann.sample[i]:ann.sample[i+1]] = current_label
            mask[ann.sample[-1]:] = current_label

            # slide windows across signal
            step = window_samples  # no overlap
            for start in range(0, len(signal) - window_samples + 1, step):
                window_signal = signal[start:start+window_samples]
                window_mask = mask[start:start+window_samples]

                # label window if majority AFib or N
                afib_ratio = np.sum(window_mask == 1) / window_samples
                normal_ratio = np.sum(window_mask == 0) / window_samples

                # only keep windows with 90% majority
                if afib_ratio > 0.9:
                    X.append(window_signal)
                    y.append(1)
                elif normal_ratio > 0.9:
                    X.append(window_signal)
                    y.append(0)

        except Exception as e:
            print(f"Error processing patient {patient}: {e}")

    X = np.array(X)
    y = np.array(y)
    return X, y

#### Split into train/val/test

In [9]:
# ---------------------
# define patient splits
# ---------------------
included_patients_sorted = sorted(included_patients)

# 60-20-20 offers best balance between val/test
n_patients = len(included_patients_sorted)
n_train = int(0.6 * n_patients)
n_val   = int(0.2 * n_patients)

train_patients = included_patients_sorted[:n_train]
val_patients   = included_patients_sorted[n_train:n_train+n_val]
test_patients  = included_patients_sorted[n_train+n_val:]

# -----------------
# generate datasets
# -----------------
X_train, y_train = extract_labeled_windows(train_patients, data_dir, 10)
X_val, y_val     = extract_labeled_windows(val_patients, data_dir, 10)
X_test, y_test   = extract_labeled_windows(test_patients, data_dir, 10)

# print results
print("Shapes:")
print("X_train:", X_train.shape, "y_train:", y_train.shape)
print("X_val:  ", X_val.shape, "y_val:  ", y_val.shape)
print("X_test: ", X_test.shape, "y_test: ", y_test.shape)

Shapes:
X_train: (22027, 2500) y_train: (22027,)
X_val:   (7003, 2500) y_val:   (7003,)
X_test:  (7297, 2500) y_test:  (7297,)


## Band Pass Filter

In [10]:
def band_pass_filter(signal, fs, lowcut=0.1, highcut=40.0, order=2):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist

    b, a = butter(order, [low, high], btype='band')
    filtered = filtfilt(b, a, signal)

    return filtered

## Normalization

In [11]:
def normalize_signal(signal):
    mu = np.mean(signal)
    std = np.std(signal)
    if std < 1e-8:
        return signal
    return (signal - mu) / std

#### Apply preprocessing

In [12]:
# --------------------------------------------------------------------
# function that applies BPF and normalization functions to all windows
# --------------------------------------------------------------------

def preprocess_windows(X, fs):
    X_preprocessed = np.zeros_like(X, dtype=np.float32)

    for i in tqdm(range(len(X)), desc="Preprocessing splits"):
        window = X[i]
        filtered = band_pass_filter(window, fs)
        normalized = normalize_signal(filtered)
        X_preprocessed[i] = normalized

    return X_preprocessed

In [13]:
# apply preprocessing to all splits
X_train = preprocess_windows(X_train, 250)
X_val   = preprocess_windows(X_val, 250)
X_test  = preprocess_windows(X_test, 250)

# print results
print("\n\nShapes after preprocessing:")
print("X_train:", X_train.shape)
print("X_val:  ", X_val.shape)
print("X_test: ", X_test.shape)

Preprocessing splits: 100%|██████████| 22027/22027 [00:22<00:00, 999.73it/s] 
Preprocessing splits: 100%|██████████| 7003/7003 [00:06<00:00, 1093.84it/s]
Preprocessing splits: 100%|██████████| 7297/7297 [00:07<00:00, 1005.94it/s]



Shapes after preprocessing:
X_train: (22027, 2500)
X_val:   (7003, 2500)
X_test:  (7297, 2500)





## Train CNN

#### Prepare sets for CNN

In [None]:
# -----------------------------
# add channel dimension for CNN
# -----------------------------
X_train_cnn = X_train[..., np.newaxis]
X_val_cnn   = X_val[..., np.newaxis]
X_test_cnn  = X_test[..., np.newaxis]

# print results
print("Shapes after adding channel dimension:")
print("X_train:", X_train_cnn.shape)
print("X_val:  ", X_val_cnn.shape)
print("X_test: ", X_test_cnn.shape)

# -------------------------------------------------------------
# dataset is imbalanced, compute class weights for training set
# -------------------------------------------------------------
class_weights_values = compute_class_weight(
    class_weight='balanced',
    classes=np.unique(y_train),
    y=y_train
)

# print results
class_weights = {i: w for i, w in enumerate(class_weights_values)}
print("\nClass weights:", class_weights)

Shapes after adding channel dimension:
X_train: (22027, 2500, 1)
X_val:   (7003, 2500, 1)
X_test:  (7297, 2500, 1)

Class weights: {0: np.float64(0.8847606041131105), 1: np.float64(1.1497546716776281)}


### Train and evaluate different models

#### With computed class weights

In [None]:
# adjusted class weights
class_weights = {0: 0.88, 1: 1.15}

# ----------
# define CNN
# ----------
model = Sequential([
    Conv1D(filters=64, kernel_size=7, activation='relu', input_shape=(2500, 1)),
    MaxPooling1D(pool_size=4),

    Conv1D(filters=128, kernel_size=5, activation='relu'),
    MaxPooling1D(pool_size=4),

    Conv1D(filters=256, kernel_size=3, activation='relu'),
    MaxPooling1D(pool_size=4),

    Flatten(),
    Dense(128, activation='relu'),
    Dropout(0.5),
    Dense(1, activation='sigmoid')
])

model.compile(optimizer=Adam(learning_rate=1e-4),
              loss='binary_crossentropy',
              metrics=['accuracy'])

# -----------
# train model
# -----------
history = model.fit(
    X_train_cnn, y_train,
    validation_data=(X_val_cnn, y_val),
    epochs=20,
    batch_size=256,
    class_weight=class_weights,
    callbacks=[
        EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2)
    ],
    verbose=1
)

# --------
# evaluate
# --------
print("\n----------------------------------------\n\n-------------- EVALUATION --------------\n")
print(f"Used class weights: {class_weights}\n")

test_loss, test_acc = model.evaluate(X_test_cnn, y_test, verbose=0)
print(f"Test Accuracy: {test_acc*100:.2f}%\nTest Loss: {test_loss:.4f}\n")

# predictions
y_pred_prob = model.predict(X_test_cnn, verbose=0)
# convert probabilities to class labels (0 or 1)
y_pred = (y_pred_prob > 0.5).astype(int)

# confusion matrix
cm = confusion_matrix(y_test, y_pred)
print("Confusion Matrix:")
print(cm)

# classification report (precision, recall, F1)
report = classification_report(y_test, y_pred, target_names=['Normal', 'AFib'])
print("\nClassification Report:")
print(report)

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/20
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 169ms/step - accuracy: 0.6128 - loss: 0.6560 - val_accuracy: 0.7985 - val_loss: 0.4776 - learning_rate: 1.0000e-04
Epoch 2/20
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 105ms/step - accuracy: 0.7540 - loss: 0.5280 - val_accuracy: 0.5773 - val_loss: 0.5678 - learning_rate: 1.0000e-04
Epoch 3/20
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 103ms/step - accuracy: 0.7811 - loss: 0.4903 - val_accuracy: 0.5232 - val_loss: 0.7718 - learning_rate: 1.0000e-04
Epoch 4/20
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 97ms/step - accuracy: 0.7984 - loss: 0.4610 - val_accuracy: 0.5438 - val_loss: 0.7061 - learning_rate: 5.0000e-05

----------------------------------------

-------------- EVALUATION --------------

Used class weights: {0: 0.88, 1: 1.15}

Test Accuracy: 77.29%
Test Loss: 0.5077

Confusion Matrix:
[[3697 1588]
 [  69 1943]]

Classification Report:
    

Output is expected.

EarlyStopping(...) rolls back to the weights used in the epoch with the lowest val_loss.

Test accuracy is similar to the val_accuracy of that epoch. Recall on AFib is excellent and recall on N is low.

#### Increasing patience on early stopping

In [None]:
# adjusted class weights
class_weights = {0: 0.88, 1: 1.15}

# ----------
# define CNN
# ----------
model = Sequential([
    Conv1D(filters=64, kernel_size=7, activation='relu', input_shape=(2500, 1)),
    MaxPooling1D(pool_size=4),

    Conv1D(filters=128, kernel_size=5, activation='relu'),
    MaxPooling1D(pool_size=4),

    Conv1D(filters=256, kernel_size=3, activation='relu'),
    MaxPooling1D(pool_size=4),

    Flatten(),
    Dense(128, activation='relu'),
    Dropout(0.5),
    Dense(1, activation='sigmoid')
])

model.compile(optimizer=Adam(learning_rate=1e-4),
              loss='binary_crossentropy',
              metrics=['accuracy'])

# -----------
# train model
# -----------
history = model.fit(
    X_train_cnn, y_train,
    validation_data=(X_val_cnn, y_val),
    epochs=20,
    batch_size=256,
    class_weight=class_weights,
    callbacks=[
        EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2)
    ],
    verbose=1
)

# --------
# evaluate
# --------
print("\n----------------------------------------\n\n-------------- EVALUATION --------------\n")
print(f"Used class weights: {class_weights}\n")

test_loss, test_acc = model.evaluate(X_test_cnn, y_test, verbose=0)
print(f"Test Accuracy: {test_acc*100:.2f}%\nTest Loss: {test_loss:.4f}\n")

# predictions
y_pred_prob = model.predict(X_test_cnn, verbose=0)
# convert probabilities to class labels (0 or 1)
y_pred = (y_pred_prob > 0.5).astype(int)

# confusion matrix
cm = confusion_matrix(y_test, y_pred)
print("Confusion Matrix:")
print(cm)

# classification report (precision, recall, F1)
report = classification_report(y_test, y_pred, target_names=['Normal', 'AFib'])
print("\nClassification Report:")
print(report)

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/20
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 126ms/step - accuracy: 0.5704 - loss: 0.6715 - val_accuracy: 0.4391 - val_loss: 0.7466 - learning_rate: 1.0000e-04
Epoch 2/20
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 92ms/step - accuracy: 0.7392 - loss: 0.5393 - val_accuracy: 0.6443 - val_loss: 0.5090 - learning_rate: 1.0000e-04
Epoch 3/20
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 89ms/step - accuracy: 0.7680 - loss: 0.5114 - val_accuracy: 0.5483 - val_loss: 0.6893 - learning_rate: 1.0000e-04
Epoch 4/20
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 90ms/step - accuracy: 0.7900 - loss: 0.4733 - val_accuracy: 0.5253 - val_loss: 0.8574 - learning_rate: 1.0000e-04
Epoch 5/20
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 90ms/step - accuracy: 0.8013 - loss: 0.4520 - val_accuracy: 0.5114 - val_loss: 1.0000 - learning_rate: 5.0000e-05
Epoch 6/20
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0

Increasing patience on early stopping makes the model more likely to predict N, thus reducing precision for both classes

#### Trying different class weights

In [None]:
# adjusted class weights
class_weights = {0: 1.0, 1: 2.0}

# ----------
# define CNN
# ----------
model = Sequential([
    Conv1D(filters=64, kernel_size=7, activation='relu', input_shape=(2500, 1)),
    MaxPooling1D(pool_size=4),

    Conv1D(filters=128, kernel_size=5, activation='relu'),
    MaxPooling1D(pool_size=4),

    Conv1D(filters=256, kernel_size=3, activation='relu'),
    MaxPooling1D(pool_size=4),

    Flatten(),
    Dense(128, activation='relu'),
    Dropout(0.5),
    Dense(1, activation='sigmoid')
])

model.compile(optimizer=Adam(learning_rate=1e-4),
              loss='binary_crossentropy',
              metrics=['accuracy'])

# -----------
# train model
# -----------
history = model.fit(
    X_train_cnn, y_train,
    validation_data=(X_val_cnn, y_val),
    epochs=20,
    batch_size=256,
    class_weight=class_weights,
    callbacks=[
        EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2)
    ],
    verbose=1
)

# --------
# evaluate
# --------
print("\n----------------------------------------\n\n-------------- EVALUATION --------------\n")
print(f"Used class weights: {class_weights}\n")

test_loss, test_acc = model.evaluate(X_test_cnn, y_test, verbose=0)
print(f"Test Accuracy: {test_acc*100:.2f}%\nTest Loss: {test_loss:.4f}\n")

# predictions
y_pred_prob = model.predict(X_test_cnn, verbose=0)
# convert probabilities to class labels (0 or 1)
y_pred = (y_pred_prob > 0.5).astype(int)

# confusion matrix
cm = confusion_matrix(y_test, y_pred)
print("Confusion Matrix:")
print(cm)

# classification report (precision, recall, F1)
report = classification_report(y_test, y_pred, target_names=['Normal', 'AFib'])
print("\nClassification Report:")
print(report)

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/20
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 133ms/step - accuracy: 0.4991 - loss: 0.9442 - val_accuracy: 0.4768 - val_loss: 0.6926 - learning_rate: 1.0000e-04
Epoch 2/20
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 97ms/step - accuracy: 0.6891 - loss: 0.7846 - val_accuracy: 0.4762 - val_loss: 0.7438 - learning_rate: 1.0000e-04
Epoch 3/20
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 93ms/step - accuracy: 0.7399 - loss: 0.7107 - val_accuracy: 0.4611 - val_loss: 0.8889 - learning_rate: 1.0000e-04
Epoch 4/20
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 94ms/step - accuracy: 0.7693 - loss: 0.6619 - val_accuracy: 0.4707 - val_loss: 0.9467 - learning_rate: 5.0000e-05

----------------------------------------

-------------- EVALUATION --------------

Used class weights: {0: 1.0, 1: 2.0}

Test Accuracy: 72.70%
Test Loss: 0.5631

Confusion Matrix:
[[3397 1888]
 [ 104 1908]]

Classification Report:
        

In [None]:
# adjusted class weights
class_weights = {0: 1.0, 1: 1.5}

# ----------
# define CNN
# ----------
model = Sequential([
    Conv1D(filters=64, kernel_size=7, activation='relu', input_shape=(2500, 1)),
    MaxPooling1D(pool_size=4),

    Conv1D(filters=128, kernel_size=5, activation='relu'),
    MaxPooling1D(pool_size=4),

    Conv1D(filters=256, kernel_size=3, activation='relu'),
    MaxPooling1D(pool_size=4),

    Flatten(),
    Dense(128, activation='relu'),
    Dropout(0.5),
    Dense(1, activation='sigmoid')
])

model.compile(optimizer=Adam(learning_rate=1e-4),
              loss='binary_crossentropy',
              metrics=['accuracy'])

# -----------
# train model
# -----------
history = model.fit(
    X_train_cnn, y_train,
    validation_data=(X_val_cnn, y_val),
    epochs=20,
    batch_size=256,
    class_weight=class_weights,
    callbacks=[
        EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2)
    ],
    verbose=1
)

# --------
# evaluate
# --------
print("\n----------------------------------------\n\n-------------- EVALUATION --------------\n")
print(f"Used class weights: {class_weights}\n")

test_loss, test_acc = model.evaluate(X_test_cnn, y_test, verbose=0)
print(f"Test Accuracy: {test_acc*100:.2f}%\nTest Loss: {test_loss:.4f}\n")

# predictions
y_pred_prob = model.predict(X_test_cnn, verbose=0)
# convert probabilities to class labels (0 or 1)
y_pred = (y_pred_prob > 0.5).astype(int)

# confusion matrix
cm = confusion_matrix(y_test, y_pred)
print("Confusion Matrix:")
print(cm)

# classification report (precision, recall, F1)
report = classification_report(y_test, y_pred, target_names=['Normal', 'AFib'])
print("\nClassification Report:")
print(report)

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/20
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 128ms/step - accuracy: 0.5502 - loss: 0.8138 - val_accuracy: 0.5173 - val_loss: 0.6725 - learning_rate: 1.0000e-04
Epoch 2/20
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 92ms/step - accuracy: 0.7425 - loss: 0.6533 - val_accuracy: 0.5149 - val_loss: 0.6846 - learning_rate: 1.0000e-04
Epoch 3/20
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 91ms/step - accuracy: 0.7712 - loss: 0.6083 - val_accuracy: 0.5203 - val_loss: 0.7631 - learning_rate: 1.0000e-04
Epoch 4/20
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 92ms/step - accuracy: 0.7891 - loss: 0.5759 - val_accuracy: 0.5241 - val_loss: 0.7904 - learning_rate: 5.0000e-05

----------------------------------------

-------------- EVALUATION --------------

Used class weights: {0: 1.0, 1: 1.5}

Test Accuracy: 74.98%
Test Loss: 0.5249

Confusion Matrix:
[[3618 1667]
 [ 159 1853]]

Classification Report:
         

In [None]:
# adjusted class weights
class_weights = {0: 1.0, 1: 1.3}

# ----------
# define CNN
# ----------
model = Sequential([
    Conv1D(filters=64, kernel_size=7, activation='relu', input_shape=(2500, 1)),
    MaxPooling1D(pool_size=4),

    Conv1D(filters=128, kernel_size=5, activation='relu'),
    MaxPooling1D(pool_size=4),

    Conv1D(filters=256, kernel_size=3, activation='relu'),
    MaxPooling1D(pool_size=4),

    Flatten(),
    Dense(128, activation='relu'),
    Dropout(0.5),
    Dense(1, activation='sigmoid')
])

model.compile(optimizer=Adam(learning_rate=1e-4),
              loss='binary_crossentropy',
              metrics=['accuracy'])

# -----------
# train model
# -----------
history = model.fit(
    X_train_cnn, y_train,
    validation_data=(X_val_cnn, y_val),
    epochs=20,
    batch_size=256,
    class_weight=class_weights,
    callbacks=[
        EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2)
    ],
    verbose=1
)

# --------
# evaluate
# --------
print("\n----------------------------------------\n\n-------------- EVALUATION --------------\n")
print(f"Used class weights: {class_weights}\n")

test_loss, test_acc = model.evaluate(X_test_cnn, y_test, verbose=0)
print(f"Test Accuracy: {test_acc*100:.2f}%\nTest Loss: {test_loss:.4f}\n")

# predictions
y_pred_prob = model.predict(X_test_cnn, verbose=0)
# convert probabilities to class labels (0 or 1)
y_pred = (y_pred_prob > 0.5).astype(int)

# confusion matrix
cm = confusion_matrix(y_test, y_pred)
print("Confusion Matrix:")
print(cm)

# classification report (precision, recall, F1)
report = classification_report(y_test, y_pred, target_names=['Normal', 'AFib'])
print("\nClassification Report:")
print(report)

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/20
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m17s[0m 129ms/step - accuracy: 0.5886 - loss: 0.7564 - val_accuracy: 0.1962 - val_loss: 0.9153 - learning_rate: 1.0000e-04
Epoch 2/20
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 92ms/step - accuracy: 0.7274 - loss: 0.6227 - val_accuracy: 0.5411 - val_loss: 0.6113 - learning_rate: 1.0000e-04
Epoch 3/20
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 92ms/step - accuracy: 0.7740 - loss: 0.5673 - val_accuracy: 0.5759 - val_loss: 0.5857 - learning_rate: 1.0000e-04
Epoch 4/20
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 93ms/step - accuracy: 0.7932 - loss: 0.5365 - val_accuracy: 0.5192 - val_loss: 0.7896 - learning_rate: 1.0000e-04
Epoch 5/20
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 96ms/step - accuracy: 0.7970 - loss: 0.5216 - val_accuracy: 0.5405 - val_loss: 0.7694 - learning_rate: 1.0000e-04
Epoch 6/20
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0