This file collates the project into a single file

In [None]:
import wfdb
import os
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import scipy.io as sio
from scipy.io import loadmat
import scipy.signal as signal
from collections import Counter
from sklearn.model_selection import train_test_split
import pywt
import random
import sklearn.model_selection
import sklearn.ensemble
import scipy.stats
import tsfresh
import sys

Below defines the functions used for preprocessing the signal

In [None]:
def remove_baseline_wander(ecg_signal, fs, cutoff=0.5):
    b, a = signal.butter(1, cutoff / (0.5 * fs), btype='highpass')
    return signal.filtfilt(b, a, ecg_signal)

def low_pass_filter(ecg_signal, fs, cutoff=40):
    b, a = signal.butter(4, cutoff / (0.5 * fs), btype='low')
    return signal.filtfilt(b, a, ecg_signal)

def notch_filter(ecg_signal, fs, notch_freq=50, quality_factor=30):
    b, a = signal.iirnotch(notch_freq / (0.5 * fs), quality_factor)
    return signal.filtfilt(b, a, ecg_signal)

def wavelet_denoising(ecg_signal, wavelet='db6', level=3):
    coeffs = pywt.wavedec(ecg_signal, wavelet, level=level)
    sigma = np.median(np.abs(coeffs[-1])) / 0.6745  # Estimating noise level
    uthresh = sigma * np.sqrt(2 * np.log(len(ecg_signal)))
    denoised_coeffs = list(map(lambda x: pywt.threshold(x, uthresh, mode='soft'), coeffs))
    return pywt.waverec(denoised_coeffs, wavelet)

Below gets the file path to all the files for processing, adding it to a list called mat files. It then loops through the paths in this list, reading it in, and then altering the files in-place.

In [None]:

ecg_data = os.path.join(os.getcwd(), 'processed_ecg_signals_2/WFDBRecords')

mat_files = []
hea_files = []

for root, dirs, files in os.walk(ecg_data):
    for file in files:
        if file.endswith('.hea'):
            hea_files.append(os.path.join(root, file))
        elif file.endswith('.mat'):
            mat_files.append(os.path.join(root, file))

for file in mat_files:
    try: 
        base_name = os.path.splitext(os.path.basename(file))[0]

        
        mat_path = os.path.join(os.path.dirname(file), base_name)

            
        x = wfdb.rdrecord(mat_path)
        fs = x.fs

        for i in range(0,12):
            y = x.p_signal[:,i]
            z = remove_baseline_wander(y, fs)
            a = low_pass_filter(z,fs)
            b = notch_filter(a,fs)
            c = wavelet_denoising(b)
            x.p_signal[:,i] = c

        var = x.p_signal.T #transpose
        output_path = os.path.join(os.path.dirname(file), base_name + '.mat')  # Save in original directory
        sio.savemat(output_path, {'my_matrix': var})  # Save the modified file
    except FileNotFoundError:
        print(f"File not found: {file}")
    except ValueError:
        print("value error")
    except IndexError:
        print(f"Index error processing file {file}. Check the signal dimensions.")



Below collates each record in a dataframe, with its corresponding conditions

In [None]:
records_list = []
conditions_list = []

mapping_file = os.path.join(os.getcwd(), 'processed_ecg_signals_2/ConditionNames_SNOMED-CT.csv')
mapping_df = pd.read_csv(mapping_file)
mapping_dict = dict(zip(mapping_df['Snomed_CT'], mapping_df['Full Name']))



for hea_file, mat_file in zip(hea_files, mat_files):
    base_name_hea = os.path.splitext(os.path.basename(hea_file))[0]
    
    header_path = os.path.join(os.path.dirname(hea_file), base_name_hea)

    base_name_mat = os.path.splitext(os.path.basename(mat_file))[0]
    
    mat_path = os.path.join(os.path.dirname(mat_file), base_name_mat)
    
    try:
        record = wfdb.rdheader(header_path)

        mat_data = loadmat(mat_path)

        data_matrix = mat_data['my_matrix']




        #extracting comments
        comments = record.comments[2].split(':')
        b = comments[1]
        b = b.split(",")
        diagnosis_code = []
        for x in b:
            diagnosis_code.append(int(x))

        diagnosis_descriptions = [mapping_dict.get(x, 'Normal') for x in diagnosis_code]

        
        records_list.append((record, data_matrix))
        conditions_list.append(diagnosis_descriptions)
       
    except FileNotFoundError:
        print(f'File not found: {hea_file}')
    except ValueError as e:
        print(f"Error downloading {record}: {e}")
    except KeyError as k:
        print(f"Key error: {record}: {k}")

The below code filters the conditions for the stratification and then stratifies them into the training and testing sets

In [None]:
filtered_conditions = []

for conditions in conditions_list:
    if len(conditions) > 1:
        selected_condition = random.choice(conditions) # this makes a random choice out of the conditions as to the one to use
    else:
        selected_condition = conditions[0]
    filtered_conditions.append(selected_condition)

condition_counts = Counter(filtered_conditions)
print(condition_counts)

conditions_to_remove = {condition for condition, count in condition_counts.items() if count == 1}

filtered_conditions_final = []
filtered_records = []


for record, condition in zip(records_list, filtered_conditions):
    if condition not in conditions_to_remove:
        filtered_records.append(record)
        filtered_conditions_final.append(condition)

condition_counts_2 = Counter(filtered_conditions_final)

print(condition_counts_2)


train_records, test_records, train_conditions, test_conditions = train_test_split(
    filtered_records, 
    filtered_conditions_final, 
    test_size=0.2, 
    stratify=filtered_conditions_final, 
    random_state=42
)


The below code saves the test and training records in two separate folders

In [None]:
for hea, mat in test_records:
    base_name = hea.record_name
    hea_file = os.path.join(base_name + '.hea')
    mat_file = os.path.join(base_name + '.mat')

    output_path = os.path.join('test_data', base_name + '.mat')  
    sio.savemat(output_path, {'my_matrix': mat})  

for hea, mat in train_records:
    base_name = hea.record_name
    hea_file = os.path.join(base_name + '.hea')
    mat_file = os.path.join(base_name + '.mat')

    output_path = os.path.join('train_data', base_name + '.mat')  
    sio.savemat(output_path, {'my_matrix': mat})
