In [None]:
!pip install wfdb

In [1]:
import pandas as pd
import numpy as np
import wfdb
import ast

In [2]:
def load_raw_data(df, sampling_rate, path):
    if sampling_rate == 100:
        data = [wfdb.rdsamp(path+f) for f in df.filename_lr]
    else:
        data = [wfdb.rdsamp(path+f) for f in df.filename_hr]
    data = np.array([signal for signal, meta in data])
    return data



In [3]:
path = 'C:/Users/zafaryab.haider/Downloads/ptb-xl-electrocardiography-dataset-1.0.3/'
sampling_rate=100

# load and convert annotation data
Y = pd.read_csv(path+'ptbxl_database.csv', index_col='ecg_id')
Y.scp_codes = Y.scp_codes.apply(lambda x: ast.literal_eval(x))

# Load raw signal data
X = load_raw_data(Y, sampling_rate, path)



In [13]:
X[:1]
Y.index

Index([    1,     2,     3,     4,     5,     6,     7,     8,     9,    10,
       ...
       21828, 21829, 21830, 21831, 21832, 21833, 21834, 21835, 21836, 21837],
      dtype='int64', name='ecg_id', length=21799)

In [4]:
# Load scp_statements.csv for diagnostic aggregation
agg_df = pd.read_csv(path+'scp_statements.csv', index_col=0)
agg_df = agg_df[agg_df.diagnostic == 1]

def aggregate_diagnostic(y_dic):
    tmp = []
    for key in y_dic.keys():
        if key in agg_df.index:
            tmp.append(agg_df.loc[key].diagnostic_class)
    return list(set(tmp))

# Apply diagnostic superclass
Y['diagnostic_superclass'] = Y.scp_codes.apply(aggregate_diagnostic)



In [11]:
Y.diagnostic_superclass


ecg_id
1        [NORM]
2        [NORM]
3        [NORM]
4        [NORM]
5        [NORM]
          ...  
21833    [STTC]
21834    [NORM]
21835    [STTC]
21836    [NORM]
21837    [NORM]
Name: diagnostic_superclass, Length: 21799, dtype: object

In [16]:
Y['ecg_id'] = Y.index
columns_to_keep = ['ecg_id', 'patient_id', 'age', 'sex', 'height', 'weight', 'diagnostic_superclass']
Y_selected = Y[columns_to_keep]
Y_selected['bmi'] = Y_selected.apply(lambda row: (row['weight'] / ((row['height'] / 100) ** 2)) if (row['height'] > 0 and row['weight'] > 0) else None, axis=1)

def interpret_bmi(bmi):
    if bmi == None:
        return None
    elif bmi < 18.5:
        return 'Underweight'
    elif 18.5 <= bmi < 24.9:
        return 'Normal weight'
    elif 25 <= bmi < 29.9:
        return 'Overweight'
    else:
        return 'Obesity'

Y_selected['bmi_interpretation'] = Y_selected['bmi'].apply(interpret_bmi)
output_path = 'C:/Users/zafaryab.haider/Downloads/modified_csvfile.csv'
Y_selected.to_csv(output_path, index=False)



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  Y_selected['bmi'] = Y_selected.apply(lambda row: (row['weight'] / ((row['height'] / 100) ** 2)) if (row['height'] > 0 and row['weight'] > 0) else None, axis=1)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  Y_selected['bmi_interpretation'] = Y_selected['bmi'].apply(interpret_bmi)


In [5]:
# Split data into train and test
test_fold = 10
# Train
X_train = X[np.where(Y.strat_fold != test_fold)]
y_train = Y[(Y.strat_fold != test_fold)].diagnostic_superclass
# Test
X_test = X[np.where(Y.strat_fold == test_fold)]
y_test = Y[Y.strat_fold == test_fold].diagnostic_superclass