In [4]:
import os
import ast
import pickle
import joblib
import shutil

from tqdm.auto import tqdm
import pandas as pd
import numpy as np
import wfdb

from sklearn.preprocessing import MultiLabelBinarizer

PATH = '/users/mac/Downloads/ECG/PTB_XL/'
OUTPUT_DIR = '/users/mac/Downloads/ECG/PTB_XL/ptbxl_data_signals100/'
if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)

In [5]:
# DATA PROCESSING STUFF

def select_data(XX,YY, outputfolder):
    # convert multilabel to multi-hot
    mlb = MultiLabelBinarizer()

    X = XX[YY.superdiagnostic_len > 0]
    Y = YY[YY.superdiagnostic_len > 0]
    mlb.fit(Y.superdiagnostic.values)
    y = mlb.transform(Y.superdiagnostic.values)
    

    # save LabelBinarizer
    with open(outputfolder+'mlb.pkl', 'wb') as tokenizer:
        pickle.dump(mlb, tokenizer)

    return X, Y, y, mlb


def load_raw_data_ptbxl(df, sampling_rate, path):
    if sampling_rate == 100:
        if os.path.exists(path + 'raw100.npy'):
            data = np.load(path+'raw100.npy', allow_pickle=True)
        else:
            data = [wfdb.rdsamp(path+f) for f in tqdm(df.filename_lr)]
            data = np.array([signal for signal, meta in data])
    elif sampling_rate == 500:
        if os.path.exists(path + 'raw500.npy'):
            data = np.load(path+"Serialized"+'raw500.npy', allow_pickle=True)
        else:
            data = [wfdb.rdsamp(path+f) for f in tqdm(df.filename_hr)]
            data = np.array([signal for signal, meta in data])
            pickle.dump(data, open(path+"Serialized"+'raw500.npy', 'wb'), protocol=4)
    return data



def load_dataset(path, sampling_rate):
    
    # load and convert annotation data
    Y = pd.read_csv(path+'ptbxl_database.csv', index_col='ecg_id')
    Y.scp_codes = Y.scp_codes.apply(lambda x: ast.literal_eval(x))
    # Load raw signal data
    X = load_raw_data_ptbxl(Y, sampling_rate, path)

    return X, Y


def compute_label_aggregations(df, folder, ctype):

    df['scp_codes_len'] = df.scp_codes.apply(lambda x: len(x))

    aggregation_df = pd.read_csv(folder+'scp_statements.csv', index_col=0)

    if ctype in ['diagnostic', 'subdiagnostic', 'superdiagnostic']:

        def aggregate_all_diagnostic(y_dic):
            tmp = []
            for key in y_dic.keys():
                if key in diag_agg_df.index:
                    tmp.append(key)
            return list(set(tmp))

        def aggregate_subdiagnostic(y_dic):
            tmp = []
            for key in y_dic.keys():
                if key in diag_agg_df.index:
                    c = diag_agg_df.loc[key].diagnostic_subclass
                    if str(c) != 'nan':
                        tmp.append(c)
            return list(set(tmp))

        def aggregate_diagnostic(y_dic):
            tmp = []
            for key in y_dic.keys():
                if key in diag_agg_df.index:
                    c = diag_agg_df.loc[key].diagnostic_class
                    if str(c) != 'nan':
                        tmp.append(c)
            return list(set(tmp))

        diag_agg_df = aggregation_df[aggregation_df.diagnostic == 1.0]
        if ctype == 'diagnostic':
            df['diagnostic'] = df.scp_codes.apply(aggregate_all_diagnostic)
            df['diagnostic_len'] = df.diagnostic.apply(lambda x: len(x))
        elif ctype == 'subdiagnostic':
            df['subdiagnostic'] = df.scp_codes.apply(aggregate_subdiagnostic)
            df['subdiagnostic_len'] = df.subdiagnostic.apply(lambda x: len(x))
        elif ctype == 'superdiagnostic':
            df['superdiagnostic'] = df.scp_codes.apply(aggregate_diagnostic)
            df['superdiagnostic_len'] = df.superdiagnostic.apply(lambda x: len(x))
    elif ctype == 'form':
        form_agg_df = aggregation_df[aggregation_df.form == 1.0]

        def aggregate_form(y_dic):
            tmp = []
            for key in y_dic.keys():
                if key in form_agg_df.index:
                    c = key
                    if str(c) != 'nan':
                        tmp.append(c)
            return list(set(tmp))

        df['form'] = df.scp_codes.apply(aggregate_form)
        df['form_len'] = df.form.apply(lambda x: len(x))
    elif ctype == 'rhythm':
        rhythm_agg_df = aggregation_df[aggregation_df.rhythm == 1.0]

        def aggregate_rhythm(y_dic):
            tmp = []
            for key in y_dic.keys():
                if key in rhythm_agg_df.index:
                    c = key
                    if str(c) != 'nan':
                        tmp.append(c)
            return list(set(tmp))

        df['rhythm'] = df.scp_codes.apply(aggregate_rhythm)
        df['rhythm_len'] = df.rhythm.apply(lambda x: len(x))
    elif ctype == 'all':
        df['all_scp'] = df.scp_codes.apply(lambda x: list(set(x.keys())))

    return df.reset_index(drop=True)

In [8]:
%%time
_, agg_df = load_dataset(PATH, 100)

def get_file_paths(file_name):
    return f"/users/mac/Downloads/ECG/PTB_XL/{file_name}"

agg_df["file_paths"] = agg_df["filename_lr"].apply(get_file_paths)

100%|██████████| 21837/21837 [01:30<00:00, 240.66it/s]


CPU times: user 1min 10s, sys: 11.3 s, total: 1min 21s
Wall time: 1min 37s


In [9]:
%%time
agg_df = agg_df.reset_index(drop=True)
labels = compute_label_aggregations(agg_df, PATH, 'superdiagnostic')

CPU times: user 5.3 s, sys: 101 ms, total: 5.4 s
Wall time: 5.66 s


In [10]:
labels.superdiagnostic_len.value_counts()

1    16272
2     4079
3      920
0      407
4      159
Name: superdiagnostic_len, dtype: int64

In [11]:
labels['file_paths'][0]

'/users/mac/Downloads/ECG/PTB_XL/records100/00000/00001_lr'

In [12]:
labels = labels.loc[labels['superdiagnostic_len'] != 0]
filtration = labels["superdiagnostic"].apply(lambda x: 1 if "NORM" in x and len(x) > 1 else 0)
print(f"Patient that have 'NORM' in their diagnostic is: {sum(filtration == 1)}")
labels = labels[filtration < 1]

Patient that have 'NORM' in their diagnostic is: 445


In [13]:
labels.superdiagnostic_len.value_counts()

1    16272
2     3642
3      913
4      158
Name: superdiagnostic_len, dtype: int64

In [14]:
def generate_signal(file_path, out_dir):
    
    # Extract the name_file from path_name
    file_name = file_path.split('/')[-1].split('_')[0]
    # Read patient record
    records = wfdb.rdsamp(file_path)
    records_signal, _ = records
    records_signal = records_signal.astype(np.float32)
    np.save(out_dir + file_name, records_signal)
        
    return file_name

In [15]:
file_name = joblib.Parallel(n_jobs=-1, verbose=1)(
    joblib.delayed(generate_signal)(file_path, OUTPUT_DIR) for file_path in tqdm(labels['file_paths'].values)
)
labels["signal_filename"] = file_name
shutil.make_archive(OUTPUT_DIR, 'zip', OUTPUT_DIR)
shutil.rmtree(OUTPUT_DIR)

  0%|          | 0/20985 [00:00<?, ?it/s][Parallel(n_jobs=-1)]: Using backend LokyBackend with 8 concurrent workers.
  0%|          | 16/20985 [00:02<57:46,  6.05it/s] 

FileNotFoundError: [Errno 2] No such file or directory: '/users/mac/Downloads/ECG/PTB_XL/ptbxl_data_signals100/00001.npy'

In [None]:
labels["signal_filename"][0]

'00001'

In [None]:
labels[labels['strat_fold'] != 10].to_csv('ptbxl_ecg_train.csv', sep=',', index=False)
labels[labels['strat_fold'] == 10].to_csv('ptbxl_ecg_test.csv', sep=',', index=False)