In [None]:
import pandas as pd

# Load the annotations CSV to check column names
annotations = pd.read_csv('ConditionNames_SNOMED-CT.csv')
print(annotations.columns)

In [2]:
import os
import wfdb
import numpy as np
import pandas as pd
import tensorflow as tf
from biosppy.signals import ecg as biosppy_ecg
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense
from sklearn.model_selection import train_test_split


In [3]:
# Function to parse .hea files and extract diagnosis codes
def parse_header(file_path):
    with open(file_path, 'r') as file:
        lines = file.readlines()
        for line in lines:
            if line.startswith('#Dx'):
                return line.split(': ')[1].strip().split(',')

# Function to load and preprocess ECG data
def load_ecg(file_path):
    try:
        record = wfdb.rdrecord(file_path)
        ecg_signal = record.p_signal.flatten()
        ecg_signal = (ecg_signal - np.mean(ecg_signal)) / np.std(ecg_signal)
        return ecg_signal
    except ValueError as e:
        print(f"ValueError reading {file_path}: {e}")
        return None
    except IndexError as e:
        print(f"IndexError reading {file_path}: {e}")
        return None


# Function to recursively search for .hea files
def find_files(directory, extension):
    for dirpath, dirnames, files in os.walk(directory):
        for name in files:
            if name.lower().endswith(extension):
                yield os.path.join(dirpath, name)


In [4]:
# Path to the ECG records and the CSV file
ecg_records_path = 'WFDBRecords'
annotations_csv_path = 'ConditionNames_SNOMED-CT.csv'

# Load the annotations CSV to create a mapping from SNOMED CT codes to condition names
annotations = pd.read_csv(annotations_csv_path)
code_to_condition_map = dict(zip(annotations['Snomed_CT'].astype(str), annotations['Full Name']))

# Print out the mapping to verify
print(code_to_condition_map)


{'270492004': '1 degree atrioventricular block', '195042002': '2 degree atrioventricular block', '54016002': '2 degree atrioventricular block(Type one)', '28189009': '2 degree atrioventricular block(Type two)', '27885002': '3 degree atrioventricular block', '251173003': 'atrial bigeminy', '39732003': 'Axis left shift', '284470004': 'atrial\xa0premature\xa0beats', '164917005': 'abnormal Q wave', '47665007': 'Axis right shift', '233917008': 'atrioventricular block', '251199005': 'countercolockwise rotation', '251198002': 'colockwise rotation', '428417006': 'Early repolarization of the ventricles', '164942001': 'fQRS Wave', '698252002': 'Intraventricular block', '426995002': 'junctional escape beat', '251164006': 'junctional premature beat', '164909002': 'left front bundle branch block', '164873001': 'left ventricle hypertrophy', '251146004': 'lower voltage QRS in all lead', '251148003': 'lower voltage QRS in chest lead', '251147008': 'lower voltage QRS in limb lead', '164865005': 'Myocar

In [5]:
# Initialize data and labels lists
ecg_data = []
labels = []

# Load each ECG record and assign labels
for file_path in find_files(ecg_records_path, '.hea'):
    base_path = file_path.replace('.hea', '')
    mat_file_path = base_path + '.mat'
    hea_file_path = base_path + '.hea'

    if not os.path.isfile(mat_file_path) or not os.path.isfile(hea_file_path):
        print(f"Missing file for {base_path}")
        continue
    
    diagnosis_codes = parse_header(file_path)
    label = 1 if any(code_to_condition_map.get(code, '') != '' for code in diagnosis_codes) else 0
    ecg_signal_prefiltered = load_ecg(base_path)
    if ecg_signal_prefiltered is not None:
        try:
            # Attempt to denoise ECG signal using BioSPPy
            output = biosppy_ecg.ecg(signal=ecg_signal_prefiltered, sampling_rate=500., show=False)
            ecg_signal = output['filtered']  # Accessing the filtered ECG signal
            ecg_data.append(ecg_signal)
            labels.append(label)
        except ValueError as e:
            print(f"Skipping file due to error in heart rate computation: {base_path}, Error: {e}")
    else:
        print(f"Skipping file due to loading error: {base_path}")

# Check if any data was loaded
if not ecg_data:
    print("No ECG data was loaded.")
else:
    print(f"Loaded {len(ecg_data)} ECG records.")
    # Count the number of normal (0) and abnormal (1) labels
    abnormal_count = sum(labels)
    normal_count = len(labels) - abnormal_count
    print(f"Number of normal ECG records: {normal_count}")
    print(f"Number of abnormal ECG records: {abnormal_count}")


Skipping file due to error in heart rate computation: WFDBRecords/35/354/JS34868, Error: Not enough beats to compute heart rate.
Skipping file due to error in heart rate computation: WFDBRecords/35/354/JS34879, Error: Not enough beats to compute heart rate.
Skipping file due to error in heart rate computation: WFDBRecords/35/353/JS34788, Error: Not enough beats to compute heart rate.
Skipping file due to error in heart rate computation: WFDBRecords/35/350/JS34479, Error: Not enough beats to compute heart rate.
Skipping file due to error in heart rate computation: WFDBRecords/35/357/JS35192, Error: Not enough beats to compute heart rate.
Skipping file due to error in heart rate computation: WFDBRecords/35/356/JS35050, Error: Not enough beats to compute heart rate.
Skipping file due to error in heart rate computation: WFDBRecords/35/356/JS35065, Error: Not enough beats to compute heart rate.
Skipping file due to error in heart rate computation: WFDBRecords/35/351/JS34509, Error: Not enou

Skipping file due to error in heart rate computation: WFDBRecords/38/388/JS38231, Error: Not enough beats to compute heart rate.
Skipping file due to error in heart rate computation: WFDBRecords/38/388/JS38252, Error: Not enough beats to compute heart rate.
Skipping file due to error in heart rate computation: WFDBRecords/36/362/JS35654, Error: Not enough beats to compute heart rate.
Skipping file due to error in heart rate computation: WFDBRecords/36/363/JS35727, Error: Not enough beats to compute heart rate.
Skipping file due to error in heart rate computation: WFDBRecords/36/364/JS35857, Error: Not enough beats to compute heart rate.
Skipping file due to error in heart rate computation: WFDBRecords/36/366/JS36015, Error: Not enough beats to compute heart rate.
Skipping file due to error in heart rate computation: WFDBRecords/36/366/JS36018, Error: Not enough beats to compute heart rate.
Skipping file due to error in heart rate computation: WFDBRecords/36/368/JS36244, Error: Not enou

In [6]:
# Convert to NumPy arrays and add channel dimension for Conv1D input
if ecg_data:
    ecg_data = np.array(ecg_data)[..., np.newaxis]
    labels = np.array(labels)
    print(f"ECG data shape: {ecg_data.shape}")
    print(f"Labels: {labels}")
else:
    # Handle the case where no data is loaded
    print("No data to preprocess.")


ECG data shape: (45048, 60000, 1)
Labels: [1 1 1 ... 1 1 1]


In [7]:
def create_model(input_shape):
    # Define the model architecture
    model = tf.keras.Sequential([
        Conv1D(16, kernel_size=3, activation='relu', input_shape=input_shape),
        MaxPooling1D(2),
        Flatten(),
        Dense(50, activation='relu'),
        Dense(1, activation='sigmoid')
    ])
    # Create the optimizer with gradient clipping
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.001, clipvalue=1.0)
    # Compile the model with the optimizer
    model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])
    return model

In [8]:
if len(ecg_data) > 0:
    # Split data into train and test sets
    X_train, X_test, y_train, y_test = train_test_split(ecg_data, labels, test_size=0.2, random_state=42)
    
    # Check if there is enough data
    if X_train.shape[0] > 0:
        # Create and compile the model
        model = create_model((X_train.shape[1], 1))
        # Train the model
        history = model.fit(X_train, y_train, epochs=10, validation_split=0.2)
    else:
        print("Not enough data to create training and test sets.")
else:
    print("No data to train the model.")


  super().__init__(


Epoch 1/10
[1m901/901[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m89s[0m 97ms/step - accuracy: 0.9739 - loss: 0.3809 - val_accuracy: 0.9868 - val_loss: 0.0734
Epoch 2/10
[1m901/901[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m73s[0m 80ms/step - accuracy: 0.9851 - loss: 0.0570 - val_accuracy: 0.9865 - val_loss: 0.0620
Epoch 3/10
[1m901/901[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m74s[0m 82ms/step - accuracy: 0.9879 - loss: 0.0372 - val_accuracy: 0.9849 - val_loss: 0.0666
Epoch 4/10
[1m901/901[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m73s[0m 81ms/step - accuracy: 0.9887 - loss: 0.0298 - val_accuracy: 0.9846 - val_loss: 0.0931
Epoch 5/10
[1m901/901[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m73s[0m 81ms/step - accuracy: 0.9929 - loss: 0.0162 - val_accuracy: 0.9849 - val_loss: 0.0880
Epoch 6/10
[1m901/901[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m73s[0m 81ms/step - accuracy: 0.9940 - loss: 0.0137 - val_accuracy: 0.9858 - val_loss: 0.1315
Epoch 7/10
[1m9

In [9]:
# Evaluate the model only if it was trained
if 'history' in locals():
    loss, accuracy = model.evaluate(X_test, y_test)
    print(f'Test Accuracy: {accuracy * 100:.2f}%')
else:
    print("Model has not been trained.")


[1m282/282[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 20ms/step - accuracy: 0.9793 - loss: 0.2483
Test Accuracy: 97.97%


In [13]:
# Save the entire model as a SavedModel with a `.keras` extension
model.save('ECG_Trained.keras')

In [None]:
# Load the model back
loaded_model = load_model('ECG_Trained.keras')

In [None]:
import sys
from PyQt5.QtWidgets import QApplication, QWidget, QPushButton, QVBoxLayout, QLabel, QFileDialog
from tensorflow.keras.models import load_model
import numpy as np
import wfdb

class ECGApp(QWidget):
    def __init__(self):
        super().__init__()
        self.initUI()

    def initUI(self):
        self.setWindowTitle('ECG Signal Classification')
        self.setGeometry(100, 100, 800, 600)

        # Layout
        layout = QVBoxLayout()
        
        # Label
        self.label = QLabel('Upload an ECG .mat file to classify')
        layout.addWidget(self.label)
        
        # Button
        self.button = QPushButton('Load ECG File', self)
        self.button.clicked.connect(self.loadECG)
        layout.addWidget(self.button)
        
        # Set the layout
        self.setLayout(layout)

    def loadECG(self):
        options = QFileDialog.Options()
        fileName, _ = QFileDialog.getOpenFileName(self, "Load ECG", "", "MAT Files (*.mat);;All Files (*)", options=options)
        if fileName:
            self.classifyECG(fileName)

    def classifyECG(self, file_path):
        # Load the signal
        record = wfdb.rdrecord(file_path[:-4])
        signal = record.p_signal.flatten()
        signal = (signal - np.mean(signal)) / np.std(signal)
        
        # Preprocess and reshape the signal for the model
        signal = signal.reshape((1, -1, 1))  # Adjust based on your model's expected input
        
        # Load model and predict
        model = load_model('my_model.h5')
        prediction = model.predict(signal)
        label = 'Abnormal' if prediction > 0.5 else 'Normal'
        
        # Update UI
        self.label.setText(f'The ECG is classified as: {label}')

# Start the application
def main():
    app = QApplication(sys.argv)
    ex = ECGApp()
    ex.show()
    sys.exit(app.exec_())

if __name__ == '__main__':
    main()
