In [48]:
import pandas as pd
import itertools
import numpy as np
import gc
from statsmodels.nonparametric.smoothers_lowess import lowess
import dask.dataframe as dd


In [16]:
csv= '/home/mei/nas/docker/thesis/data/csv/'
hdf= '/home/mei/nas/docker/thesis/data/hdf/'

In [17]:
def round_up(x, base=5):
    return base * round(x/base)

In [18]:
print('==> Loading data from timeseries files...')
timeseries_lab = pd.read_csv(csv + 'timeserieslab.csv', low_memory=False)
timeseries_periodic = pd.read_csv(csv+ 'timeseriesperiodic.csv')

==> Loading data from timeseries files...


In [19]:
print("there are {} patients in the  and {} records in lab table.".format(len(list(timeseries_lab ['patientunitstayid'].unique())),len(timeseries_lab)))
print("there are {} patients in the  and {} records in vital periodic table.".format(len(list(timeseries_periodic ['patientunitstayid'].unique())),len(timeseries_periodic)))

there are 12260 patients in the  and 2337787 records in lab table.
there are 12260 patients in the  and 10671165 records in vital periodic table.


In [20]:
timeseries_lab.set_index(['patientunitstayid','labresultoffset'], inplace=True)
timeseries_periodic.set_index(['patientunitstayid','observationoffset'], inplace=True)
timeseries_lab.rename(round_up, level = 'labresultoffset', inplace = True)
timeseries_periodic.rename(round_up, level = 'observationoffset', inplace = True)
timeseries_lab.sort_index(inplace=True)
timeseries_periodic.sort_index(inplace=True)

In [21]:
def reconfigure_timeseries(timeseries, offset_column, feature_column=None, test=False):
    """
    Reconfigure timeseries data by setting multi-index and pivoting if necessary.
    """
    timeseries.reset_index(inplace=True)
    if test:
        timeseries = timeseries.iloc[:5000]  # Limit for testing
    timeseries.set_index(['patientunitstayid', pd.to_timedelta(timeseries[offset_column], unit='min')], inplace=True)
    timeseries.drop(columns=offset_column, inplace=True)
    if feature_column:
        timeseries = timeseries.pivot_table(columns=feature_column, index=timeseries.index)
    timeseries.index = pd.MultiIndex.from_tuples(timeseries.index, names=['patient', 'time'])
    return timeseries

In [22]:
test=False
print('==> Reconfiguring lab timeseries...')
lab = reconfigure_timeseries(timeseries_lab, 'labresultoffset', 'labname', test)
lab.columns=lab.columns.droplevel()

print('==> Reconfiguring periodic timeseries...')
periodic = reconfigure_timeseries(timeseries_periodic, 'observationoffset', test)

==> Reconfiguring lab timeseries...
==> Reconfiguring periodic timeseries...


In [23]:
flat=pd.read_csv(csv + 'preprocessed_flat_drug.csv')
labels = pd.read_csv(csv + 'preprocessed_labels.csv')
diagnoses= pd.read_csv(csv + 'preprocessed_diagnoses.csv')
common_id = list(set(flat['patient']).intersection(set(labels['patient']).intersection(set(diagnoses['patient']))))
len(common_id)

11698

In [24]:
lab = lab.reset_index()
lab = lab[lab['patient'].isin(common_id)]
# lab['time'] = pd.to_timedelta(lab['time'])
lab = lab[lab['time'] <= pd.to_timedelta('14 days')]

lab = lab.set_index(['patient', 'time'])

periodic = periodic.reset_index()
periodic = periodic[periodic['patient'].isin(common_id)]
periodic = periodic.set_index(['patient', 'time'])

In [25]:
print('==> Combining data together...')
merged = pd.concat([lab, periodic], axis=0, sort=False)

==> Combining data together...


In [26]:
possible_value_ranges = {
    "sao2": (40, 100),             # Peripheral oxygen saturation
    "heartrate": (30, 400),        # Heart rate
    "respiration": (0, 60),        # Resp. rate
    "cvp": (0, 20),                # Central venous pressure
    "systemicsystolic": (40, 300), # Invasive systolic blood pressure
    "systemicdiastolic": (20, 150),# Invasive diastolic blood pressure
    "systemicmean": (30, 200),     # Invasive mean blood pressure
}

In [27]:
def filter_vital_signs(data, ranges):
    for column, (min_val, max_val) in ranges.items():
        if column in data.columns:
            data = data[(data[column].isna()) | ((data[column] >= min_val) & (data[column] <= max_val))]
    return data

def min_max_normalize(df):
    normalized_df = df.copy()
    for col in df.columns:
        min_val = df[col].min()
        max_val = df[col].max()
        normalized_df[col] = (df[col] - min_val) / (max_val - min_val)
    return normalized_df

In [33]:
merged = filter_vital_signs(merged, possible_value_ranges)

# compute the 0.1% and 99.9% quantiles for lat test
low_quantile = merged.quantile(0.001, numeric_only=True)  # 0.1% 
high_quantile= merged.quantile(0.999, numeric_only=True)  # 99.9% 

# only keep the row in  [0.1%, 99.9%] 
merged = merged[(merged>= low_quantile) & (merged <= high_quantile)]

print("select valid vlaue of vital signs")
print("There are {} patients and {} records in the vital periodic table.".format(
    merged.index.get_level_values('patient').nunique(),
    len(merged)
))

# noaralize the data
print('==> Normalizing data...')
merged = min_max_normalize(merged)

select valid vlaue of vital signs
There are 11698 patients and 10263500 records in the vital periodic table.
==> Normalizing data...


In [53]:
merged

Unnamed: 0_level_0,Unnamed: 1_level_0,-bands,-basos,-eos,-lymphs,-monos,-polys,24 h urine protein,24 h urine urea nitrogen,ALT (SGPT),ANF/ANA,...,sao2,heartrate,respiration,cvp,systemicsystolic,systemicdiastolic,systemicmean,st1,st2,st3
patient,time,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1
252784,0 days 00:20:00,,,,,,,,,,,...,,,,,,,,,,
252784,0 days 00:40:00,,,,,,,,,,,...,,,,,,,,,,
252784,0 days 01:20:00,,,,,,,,,,,...,,,,,,,,,,
252784,0 days 01:40:00,,,,,,,,,,,...,,,,,,,,,,
252784,0 days 02:35:00,,,,,,,,,,,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3348105,2 days 19:25:00,,,,,,,,,,,...,,0.403509,,,,,,,,
3348105,2 days 19:30:00,,,,,,,,,,,...,,0.412281,,,,,,,,
3348105,2 days 19:35:00,,,,,,,,,,,...,,0.508772,,,,,,,,
3348105,2 days 19:40:00,,,,,,,,,,,...,,0.508772,,,,,,,,


In [47]:
merged.to_parquet(csv + 'merged.parquet')
print('==> Data saved to parquet.')

==> Data saved to parquet.


In [90]:
def gen_patient_chunk(patients, merged, size=500):
    """
    Generate patient data chunks for processing.
    """
    it = iter(patients)
    chunk = list(itertools.islice(it, size))
    while chunk:
        yield merged.loc[chunk]
        chunk = list(itertools.islice(it, size))
        
def moving_average_smoothing(df, window=5):

    smoothed_df = df.copy()
    for col in df.columns:
        if pd.api.types.is_numeric_dtype(df[col]):
            smoothed_df[col] = df[col].rolling(window=window, min_periods=1, center=True).mean()
    return smoothed_df

In [91]:
def resample(timeseries):
    resampled_data = []
    for patient, group in timeseries.groupby(level=0):
        group = group.droplevel(0)
        group.index = group.index.ceil(freq='5min')
        resampled = group.resample('5min', closed='right', label='right').mean()
        
        markers = resampled.notna().astype(int)

        # 1. linear interpolation for missing values
        resampled.interpolate(method='linear', limit_area='inside', inplace=True)
        # 2. forward fill for the rest
        resampled.ffill(inplace=True)
        # 3. backfill for the rest
        resampled.bfill(inplace=True)
        # 4. fill the rest with 0.5
        resampled.fillna(0.5, inplace=True)  
        # 5. smooth the data (lowess)
        resampled = moving_average_smoothing(resampled, window=5)
        
        n = len(resampled)
        resampled.reset_index(drop=True, inplace=True)
        markers.reset_index(drop=True, inplace=True)
        
        new_cols = pd.DataFrame({
            'patient': [patient] * n,
            'time': np.arange(1, n + 1)
        })
        resampled = pd.concat([new_cols, resampled], axis=1)
        markers = pd.concat([new_cols, markers], axis=1)
        
        resampled.set_index(['patient', 'time'], inplace=True)
        markers.set_index(['patient', 'time'], inplace=True)
        
        resampled = pd.concat([resampled, markers.add_suffix('_marker')], axis=1)
        resampled = resampled.copy() 
        resampled_data.append(resampled)
    final = pd.concat(resampled_data)
    
    
    return final


In [93]:
patients = merged.index.unique(level=0)
gen_chunks = gen_patient_chunk(patients, merged)
header = True
print('==> Initiating main processing loop...')

for i, patient_chunk in enumerate(gen_chunks, start=1):
    final = resample(patient_chunk)
    final.to_hdf(hdf + "final_timeseries_Marker.h5", key="df", mode="a", complevel=5, complib="zlib", format="table", append=True)

    print(f'==> Processed {i * 500} patients...')
    
    del patient_chunk # free up memory
    gc.collect()

==> Initiating main processing loop...
==> Processed 500 patients...
==> Processed 1000 patients...
==> Processed 1500 patients...
==> Processed 2000 patients...
==> Processed 2500 patients...
==> Processed 3000 patients...
==> Processed 3500 patients...
==> Processed 4000 patients...
==> Processed 4500 patients...
==> Processed 5000 patients...
==> Processed 5500 patients...
==> Processed 6000 patients...
==> Processed 6500 patients...
==> Processed 7000 patients...
==> Processed 7500 patients...
==> Processed 8000 patients...
==> Processed 8500 patients...
==> Processed 9000 patients...
==> Processed 9500 patients...
==> Processed 10000 patients...
==> Processed 10500 patients...
==> Processed 11000 patients...
==> Processed 11500 patients...
==> Processed 12000 patients...


In [95]:
final_ts= pd.read_hdf(hdf+ "final_timeseries_Marker.h5", key="df") # key is the name of the key in the hdf file

In [96]:
final_ts

Unnamed: 0_level_0,Unnamed: 1_level_0,-bands,-basos,-eos,-lymphs,-monos,-polys,24 h urine protein,24 h urine urea nitrogen,ALT (SGPT),ANF/ANA,...,sao2_marker,heartrate_marker,respiration_marker,cvp_marker,systemicsystolic_marker,systemicdiastolic_marker,systemicmean_marker,st1_marker,st2_marker,st3_marker
patient,time,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1
252784,1,0.5,0.066667,0.014286,0.135165,0.288095,0.5,0.5,0.5,0.004442,0.5,...,1,1,1,0,0,0,0,1,1,1
252784,2,0.5,0.066667,0.014286,0.135165,0.288095,0.5,0.5,0.5,0.004442,0.5,...,1,1,1,0,0,0,0,1,1,1
252784,3,0.5,0.066667,0.014286,0.135165,0.288095,0.5,0.5,0.5,0.004442,0.5,...,1,1,1,0,0,0,0,1,1,1
252784,4,0.5,0.066667,0.014286,0.135165,0.288095,0.5,0.5,0.5,0.004442,0.5,...,1,1,1,0,0,0,0,1,1,1
252784,5,0.5,0.066667,0.014286,0.135165,0.288095,0.5,0.5,0.5,0.004442,0.5,...,1,1,1,0,0,0,0,1,1,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3348105,1500,0.5,0.333333,0.095238,0.131868,0.309524,0.5,0.5,0.5,0.003273,0.5,...,0,0,0,0,0,0,0,0,0,0
3348105,1501,0.5,0.333333,0.095238,0.131868,0.309524,0.5,0.5,0.5,0.003273,0.5,...,0,0,0,0,0,0,0,0,0,0
3348105,1502,0.5,0.333333,0.095238,0.131868,0.309524,0.5,0.5,0.5,0.003273,0.5,...,0,0,0,0,0,0,0,0,0,0
3348105,1503,0.5,0.333333,0.095238,0.131868,0.309524,0.5,0.5,0.5,0.003273,0.5,...,0,0,0,0,0,0,0,0,0,0


In [5]:
final_ts

Unnamed: 0_level_0,Unnamed: 1_level_0,-bands,-basos,-eos,-lymphs,-monos,-polys,24 h urine protein,24 h urine urea nitrogen,ALT (SGPT),ANF/ANA,...,sao2,heartrate,respiration,cvp,systemicsystolic,systemicdiastolic,systemicmean,st1,st2,st3
patient,time,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1
252784,1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,100.0,106.0,22.0,0.0,0.0,0.0,0.0,0.05,0.05,0.0
252784,2,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,100.0,107.0,24.0,0.0,0.0,0.0,0.0,0.10,0.00,-0.1
252784,3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,100.0,109.0,21.0,0.0,0.0,0.0,0.0,0.00,0.10,0.0
252784,4,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,100.0,110.0,26.0,0.0,0.0,0.0,0.0,0.10,0.00,-0.1
252784,5,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,98.0,116.0,26.0,0.0,0.0,0.0,0.0,0.00,0.00,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3348105,1500,0.0,1.0,2.0,12.0,13.0,0.0,0.0,0.0,17.0,0.0,...,98.0,113.0,9.0,0.0,0.0,0.0,0.0,0.00,0.00,0.0
3348105,1501,0.0,1.0,2.0,12.0,13.0,0.0,0.0,0.0,17.0,0.0,...,98.0,113.0,9.0,0.0,0.0,0.0,0.0,0.00,0.00,0.0
3348105,1502,0.0,1.0,2.0,12.0,13.0,0.0,0.0,0.0,17.0,0.0,...,98.0,113.0,9.0,0.0,0.0,0.0,0.0,0.00,0.00,0.0
3348105,1503,0.0,1.0,2.0,12.0,13.0,0.0,0.0,0.0,17.0,0.0,...,98.0,113.0,9.0,0.0,0.0,0.0,0.0,0.00,0.00,0.0
