In [5]:
import wfdb    
import os
import pandas as pd
from scipy.signal import butter, filtfilt
import numpy as np

In [6]:
record = wfdb.rdrecord('10', pn_dir='ludb/1.0.1/data/')

In [7]:
def filter(ecg_signal, sampling_rate):

    # Bandpass
    low = 0.5 / (0.5 * sampling_rate)
    high = 20 / (0.5 * sampling_rate)
    b, a = butter(4, [low, high], btype='band')
    filtered_ecg = filtfilt(b, a, ecg_signal)
    # mean = np.mean(filtered_ecg)
    # std = np.std(filtered_ecg)
    # standardized_data = (filtered_ecg - mean) / std
    return filtered_ecg

In [8]:
data_dir = 'ludb/1.0.1/data/'
data_path = '../lobachevsky-university-electrocardiography-database-1.0.1/data/'
fs = 500

# Get all records
records = [f.split('.')[0] for f in os.listdir(data_path) if f.endswith('.dat')]
records = sorted(records)

# Read the ludb rhythm data
ludb_df = pd.read_csv('../lobachevsky-university-electrocardiography-database-1.0.1/ludb.csv', header=None)

# Create a dictionary to map record names to their rhythms
record_to_rhythm = {row[0]: row[3] for index, row in ludb_df.iterrows()}


#print(df[0][0])

dfs = []  # List to store individual dataframes


for i, record_name in enumerate(records):
    # Read the ECG record
    record = wfdb.rdrecord(record_name, pn_dir=data_dir)
    # Ensure signal is long enough to be trimmed
    #if record.p_signal.shape[0] >= 5000:  # Assuming the signals are at least 5000 samples long
    # Trim the signal to 3000 samples in length by removing the first 1000 and last 1000 samples
    trimmed_signal = record.p_signal[1000:-1000]

    # Convert the trimmed signal for lead ii to a DataFrame
    lead_i_idx = record.sig_name.index('ii')
    df_signals = pd.DataFrame({'ii': filter(trimmed_signal[:, lead_i_idx], fs)})


    # Read the annotations for lead ii
    annotations = wfdb.rdann(record_name, 'ii', pn_dir=data_dir)

    # Filter annotations to include only those that fall within the trimmed range
    valid_annotations = [(index-1000, symbol) for index, symbol in zip(annotations.sample, annotations.symbol)
                            if 1000 <= index < record.p_signal.shape[0]-1000]

    # Create a column for the lead ii annotations and fill with 0
    df_signals['target'] = 0

    # Initialize flags for the presence of annotations 'N', 'p', 't'
    has_N, has_p, has_t = False, False, False

    # Apply valid annotations to the DataFrame
    for index, symbol in valid_annotations:
        if symbol == 'N':
            df_signals.at[index, 'target'] = 3
            has_N = True
        elif symbol == 'p':
            df_signals.at[index, 'target'] = 1
            has_p = True
        elif symbol == 't':
            df_signals.at[index, 'target'] = 5
            has_t = True
    
    # recent_open_bracket = None
    # recent_closed_bracket = None
    # 
    # for index, symbol in valid_annotations:
    #     if symbol == '(':
    #         recent_open_bracket = index
    #     elif symbol == ')':
    #         recent_closed_bracket = index
    #     elif symbol == 'N':
    #         df_signals.at[index, 'target'] = 3
    #         has_N = True
    #         # Look ahead for the next ')'
    #         # for future_index in range(index, len(valid_annotations)):
    #         #     print(valid_annotations[future_index][1])
    #         #     if valid_annotations[future_index][1] == ')':
    #         #         close_bracket_after_N = valid_annotations[future_index][0]
    #         #         break
    #         if recent_open_bracket is not None:
    #             df_signals.at[recent_open_bracket, 'target'] = 2
    #     # elif symbol == ')':
    #     #     if N_index is not None and index - N_index < 50:
    #     #         df_signals.at[index, 'target'] = 4
    #     elif symbol == 'p':
    #         df_signals.at[index, 'target'] = 1
    #         has_p = True
    #     elif symbol == 't':
    #         df_signals.at[index, 'target'] = 5
    #         has_t = True
    #         if recent_closed_bracket is not None:
    #             df_signals.at[recent_closed_bracket, 'target'] = 4

    rhythm = record_to_rhythm.get(record_name, 'Unknown')
    # Check if all three annotations are present
    if has_N and has_p and has_t or rhythm == "Atrial fibrillation":
        # Add a column to identify the record
        df_signals.insert(0, 'record', record_name)


        # Retrieve the rhythm for the current record and add it as a new column
        
        df_signals['rhythm'] = rhythm

        # Append this DataFrame to the list
        dfs.append(df_signals)
        
        



# Concatenate all DataFrames into one
df_filtered = pd.concat(dfs, ignore_index=True)


In [10]:
df_filtered.to_csv('ludb_data_w_fibrillation.csv', header=False, index=False)