In [1]:
# import modules

import os
import datetime
from tqdm import tqdm
import numpy as np
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from config import BATCH_SIZE

In [2]:
# Useful variables & data loading

raw_data_root = '/home/ubuntu/ecg_mount'
proc_data_root = '/home/ubuntu/ecg_hfpef/data/processed_data'
file_root = './fname_score_pair.csv'
crit_file_root = './fname_crit_pair.csv'
keys = ['als', 'fname', 'score']
crit_keys = ['als', 'fname', 'BMI', 'hypertensive', 'afib', 'pulm hyper', 'age', 'pressure']

score_data = pd.read_csv(f'{raw_data_root}/H2FPEF-echo_.csv', encoding='cp949', low_memory=False)
fname_data = pd.read_csv(f'{raw_data_root}/230111_SevMUSE_EKG_MasterTable.csv', low_memory=False)

num_datas = len(score_data)
batch_size = 20000
num_batches = num_datas//batch_size + 1

In [3]:
# Reset CSV File. 
# Please be careful to run the cell. 

if os.path.isfile(file_root):
    os.remove(file_root)

empty = pd.DataFrame(columns = keys)
empty.to_csv(file_root)

In [4]:
# A function that processes single batch
# From mini_score_df, get fname from fname_df, then append into main_df

def dataBatchProcessing(main_df, mini_score_df, fname_df):
    for i in tqdm(range(len(mini_score_df))):
        als = mini_score_df.iloc[i]['AlsUnitNo']
        ecg_fnames = fname_df.query(f'AlsUnitNo=={als}')
        
        # Skip if there is no ecg data, or als is already in main_df
        if len(ecg_fnames) == 0 or als in main_df['als'].tolist():
            continue
        
        # Find ecg filename that has minimum date diffrence. 
        fname_index = None
        echo_date = datetime.datetime.strptime(mini_score_df.iloc[i]['STUDYDATE'], '%Y-%m-%d')
        min_date_diff = 9999999
        for j in range(len(ecg_fnames)):
            # Check if the waveform has shape (5000, 12)
            if ecg_fnames.iloc[j]['waveform_shape'] != '(5000, 12)':
                continue
            # Minimum data difference
            ecg_date = datetime.datetime.strptime(ecg_fnames.iloc[j]['AcqDate'], '%Y-%m-%d')
            date_diff = abs((echo_date - ecg_date).days)
            if date_diff < min_date_diff:
                min_date_diff = date_diff
                fname_index = j
        # Skip if there is no ecg data that matches the waveform shape (5000, 12)
        if fname_index == None:
            continue
                
        # Save and append to the main stream
        fname = ecg_fnames.iloc[fname_index]['fname']
        score = mini_score_df.iloc[i]['score_H2FPEF']
        temp = pd.DataFrame({'als':als, 'fname':fname, 'score':score}, index=[0])
        main_df = pd.concat([main_df, temp], ignore_index=True)
    
    return main_df

In [5]:
# Main processing function.
# divides data into batches and runs dataBatchProcessing for each batch

def main(score, fname, num_data, bs, end_batch, file_root, start_batch=0):
    for i in range(start_batch, end_batch):
        # Get size of the batch for the minibatch index
        if i == end_batch:
            size = num_data%bs
        else:
            size = bs
        
        # Read mainstream from csv
        data = pd.read_csv(file_root)
        mini_batch = score.iloc[i*bs:i*bs+size]
        
        # Process and save to csv
        print(f'Processing batch {i}/{end_batch}')
        data = dataBatchProcessing(data, mini_batch, fname)
        data.to_csv(file_root)
        print(f'Done batch {i} processing\n')

In [6]:
# Run main

main(score_data, fname_data, num_datas, \
     batch_size, num_batches, file_root, start_batch=0)

Processing batch 0/17


100%|█████████████████████████████████████| 20000/20000 [08:43<00:00, 38.19it/s]


Done batch 0 processing

Processing batch 1/17


100%|█████████████████████████████████████| 20000/20000 [07:12<00:00, 46.25it/s]


Done batch 1 processing

Processing batch 2/17


100%|█████████████████████████████████████| 20000/20000 [07:23<00:00, 45.09it/s]


Done batch 2 processing

Processing batch 3/17


100%|█████████████████████████████████████| 20000/20000 [07:44<00:00, 43.03it/s]


Done batch 3 processing

Processing batch 4/17


100%|█████████████████████████████████████| 20000/20000 [07:59<00:00, 41.73it/s]


Done batch 4 processing

Processing batch 5/17


100%|█████████████████████████████████████| 20000/20000 [08:24<00:00, 39.66it/s]


Done batch 5 processing

Processing batch 6/17


100%|█████████████████████████████████████| 20000/20000 [08:42<00:00, 38.30it/s]


Done batch 6 processing

Processing batch 7/17


100%|█████████████████████████████████████| 20000/20000 [08:43<00:00, 38.19it/s]


Done batch 7 processing

Processing batch 8/17


100%|█████████████████████████████████████| 20000/20000 [08:48<00:00, 37.87it/s]


Done batch 8 processing

Processing batch 9/17


100%|█████████████████████████████████████| 20000/20000 [08:55<00:00, 37.33it/s]


Done batch 9 processing

Processing batch 10/17


100%|█████████████████████████████████████| 20000/20000 [09:38<00:00, 34.56it/s]


Done batch 10 processing

Processing batch 11/17


100%|█████████████████████████████████████| 20000/20000 [10:02<00:00, 33.20it/s]


Done batch 11 processing

Processing batch 12/17


100%|█████████████████████████████████████| 20000/20000 [10:19<00:00, 32.28it/s]


Done batch 12 processing

Processing batch 13/17


100%|█████████████████████████████████████| 20000/20000 [10:50<00:00, 30.74it/s]


Done batch 13 processing

Processing batch 14/17


100%|█████████████████████████████████████| 20000/20000 [11:03<00:00, 30.13it/s]


Done batch 14 processing

Processing batch 15/17


100%|█████████████████████████████████████| 20000/20000 [10:54<00:00, 30.54it/s]


Done batch 15 processing

Processing batch 16/17


100%|█████████████████████████████████████████| 670/670 [00:21<00:00, 31.50it/s]


Done batch 16 processing



In [3]:
# Remove Addtional terms from batch calculation

drop_columns = ["Unnamed: 0"] + [f"Unnamed: 0.{i}"for i in range(1, num_batches+1)]
data = pd.read_csv(file_root)
new_data = data.drop(columns=drop_columns)
new_data.to_csv(file_root)

In [4]:
# score min max checking

data = pd.read_csv(file_root)
print(min(data['score']), max(data['score']))

0 7


In [None]:
# Reset crit_CSV File. 
# Please be careful to run the cell. 

crit_file_root = './fname_crit_pair.csv'
crit_keys = ['idx', 'als', 'fname', 'score' \
    'BMI', 'h-tense', 'afib', 'pulm h-tense', 'age', 'pressure']

if os.path.isfile(crit_file_root):
    os.remove(crit_file_root)

empty = pd.DataFrame(columns = crit_keys)
empty.to_csv(crit_file_root)

In [None]:
# Reprocessing Data
# 1. Slice data by batchsize, and re-save as numpy file
# 2. Save Hfpef scores with each standards
# Heavy:2, Hypertensive:1, Artrial Fibrillation:3, 
# Pulmonary Hypertnesion:1, Elder:1, Filling Pressure:1

def processData(idx):
    df_line = data_pairs.iloc[idx]
    als, fname, score = df_line['als'], df_line['fname'], df_line['score']
    
    new_line = pd.DataFrame(columns = crit_keys)
    score_lines = score_data.query(f"AlsUnitNo=={als}")
    for j in len(score_lines):
        line = score_lines.iloc[j]
        if line['score'] == df_line['score']:
            # From previous file
            new_line['idx']   = idx
            new_line['als']   = als
            new_line['fname'] = fname
            new_line['score'] = score
            
            # Get each criteria (new)
            new_line['BMI']          = line['머시기']
            new_line['h-tense']      = 
            new_line['afib']         = 
            new_line['pulm h-tense'] = 
            new_line['age']          = 
            new_line['pressure']     = 
            break
        
    ecg_data = pd.read_csv(fname).to_numpy()
    return ecg_data, new_line


data_pairs = pd.read_csv(file_root)
batch_size = BATCH_SIZE
num_batches = len(data_pairs)//batch_size
batch_idx = [i*batch_size for i in range(num_batches)] + [len(data_pairs)]
num_threads = 16

for i in range(num_batches+1):
    crit_df = pd.read_csv
    
    batch_pairs = data_pairs.iloc[batch_idx[i]:batch_idx[i+1]]
    idx = range(len(batch_pairs))
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        batch_results = list(tqdm(executor.map(processData, idx), total=len(batch_pairs)))
        
    ecg_data = []
    for ecg_datum, new_line in batch_results:
        ecg_data.append(ecg_datum)
        main_df = pd.concat([main_df, temp], ignore_index=True)