## Imports

In [1]:
import pandas as pd
import numpy as np
import datetime, random

from multiprocessing import Pool
import time

## Reading in file; managing columns

In [60]:
%%time

#Read in dataframe

#df = pd.read_csv('~/Desktop/patr/StandardizingAKI/inpatient 2014-2018 creatinine.csv') #1441707, 4
df = pd.read_csv(r'H:\Data\Standardized AKI definition\dataset\inpatient 2014-2018 creatinine.csv')
df.pat_mrn_id = df.pat_mrn_id.str.strip('MR').astype('int') #Index with integers --> it's much quicker
df.time = pd.to_datetime(df.time) # Convert to pandas datetime format
print(df.dtypes) # Confirm all the column types are as we want it

df.columns = ['mrn', #Renaming columns for ease
              'enc',
              'time',
              'creat'] 
df.set_index(['mrn', 'enc'], inplace=True) #Turn the index into a hierarchical tuple (mrn, enc)

pat_mrn_id                 int32
pat_enc_csn_id             int64
time              datetime64[ns]
creatinine               float64
dtype: object
Wall time: 3.1 s


## Functions: add_rows(df),  add_delta_cols(df), returnAKIpatients(df), parallelizeAnalysis(df, num_cores)

### add_rows()

    Input: Pandas data frame 

    Output: Pandas data frame (w/ extra dummy rows containing back-calculated values)

This function is the bulk of the computation - it adds dummy rows with the appropriate back-calculated values at the first encounter row. If one is only interested in those patients which have AKI according to the Rolling Window definition, the code should run in less than a minute and *add_rows()* isn't necessary. If you are interested in back-calculated AKI cases, then the analysis takes a bit longer (although still under an hour) and that's where *add_rows()* comes in. It simply finds the first encounter recorded for a patient, looks back between 7 and 365 days in the past and sees whether or not there are measured creatinine values. If there are, the mean of those values are put into the dummy row. Otherwise, a value calculated based on the estimated glomerular filtration rate (eGFR) equation from A New Equation to Estimate Glomerular Filtration Rate (Levey et. Al, 2009, https://pubmed.ncbi.nlm.nih.gov/19414839/) is used instead in the dummy row.  

### parallelizeAnalysis()

Parallelizes the analysis.

### returnAKIpatients()

Finds the patients with AKI according to the Rolling Window and/or the back-calculated definitions... runtime significantly depends on which definition you care for.

### addDeltaCols()

The way in which AKI is measured is based on the change in creatinine over a given period of time. As a proxy, $\Delta creat$ and $\Delta time$ columns are created to measure the change from one time point to another. This function calculates those values via the Split-Apply-Combine methodology on the original data frame: splitting by encounter, transforming via a backward shift, then taking the difference of values, then a forward shift, and then recombining the dataframe object.

In [91]:
def returnAKIpatients(df, aki_calc_type = 'both', keep_extra_cols = True, num_cores=4):
    
    if aki_calc_type == 'rolling_window':
        #Add the delta columns as described above
        d = add_delta_cols(df)
        
        #Two conditions for rolling-window definition
        condition1 = np.logical_and(d.delta_creat >= 0.3, d.delta_time <= datetime.timedelta(hours=48))
        condition2 = np.logical_and(d.delta_creat >= d.creat.shift(1)*0.5, d.delta_time <= datetime.timedelta(days=7))    
    
        d['aki'] = condition1 | condition2
        
    elif aki_calc_type == 'back_calc':
        #Now, we gotta go through the process of creating the dummy rows
        split_dfs = np.array_split(df, num_cores)
        with Pool(num_cores) as p:
            result = p.map(add_rows, split_dfs)
            d_ = pd.concat(result)
        d = add_delta_cols(d_)
        condition2 = np.logical_and(d.delta_creat >= d.creat.shift(1)*0.5, d.delta_time < datetime.timedelta(days=7))    
        d['aki'] = condition2 & (d.delta_time == datetime.timedelta(0)) #Remember, back-calc doesn't include the 0.3 increase criterion... has to be a 50% increase
        
    elif aki_calc_type == 'both':
        split_dfs = np.array_split(df, num_cores)
        with Pool(num_cores) as p:
            result = p.map(add_rows, split_dfs)
            d_ = pd.concat(result)
        d = add_delta_cols(d_)
        
        condition1 = np.logical_and(d.delta_creat >= 0.3, d.delta_time < datetime.timedelta(hours=48))
        condition2 = np.logical_and(d.delta_creat >= d.creat.shift(1)*0.5, d.delta_time < datetime.timedelta(days=7))    
        
        d['aki'] = condition1 | condition2
    
    return d

def add_rows(df):
    '''
    Input: Pandas data frame
    Output: Pandas data frame
    
    This function is the bulk of the computation - it adds dummy rows with the appropriate back-calculated values
    at the first encounter row. If one is only interested in those patients which have AKI according to the 
    Rolling Window definition, the code should run in less than a minute and *add_rows()* isn't necessary. 
    If you are interested in back-calculated AKI cases, then the analysis takes a bit longer 
    (although still under an hour) and that's where *add_rows()* comes in. It simply finds the first encounter 
    recorded for a patient, looks back between 7 and 365 days in the past and sees whether or not there are measured
    creatinine values. If there are, the mean of those values are put into the dummy row. Otherwise, a value 
    calculated based on the estimated glomerular filtration rate (eGFR) equation from A New Equation to Estimate 
    Glomerular Filtration Rate (Levey et. Al, 2009, https://pubmed.ncbi.nlm.nih.gov/19414839/) is used instead 
    in the dummy row.  
    '''
    patient_dfs = df.groupby(['mrn'], sort=False) 
    patients = dict()
    
    for mrn, pat_df in patient_dfs:
        patients[mrn] = pat_df
        backcalc_rows = np.array(pat_df.reset_index().drop_duplicates(['mrn', 'enc']).index)
        times_to_consider = [pat_df.iloc[indx].time for indx in backcalc_rows]

        new_rows = pat_df.iloc[backcalc_rows].copy()
        for i in range(1,len(times_to_consider)):
            new_rows.iloc[i, new_rows.columns.get_loc('creat')] = pat_df.loc[np.logical_and(times_to_consider[i] - pat_df.time < datetime.timedelta(days=365),times_to_consider[i] - pat_df.time > datetime.timedelta(days=7)), 'creat'].mean()

        mini_dfs = np.split(pat_df, backcalc_rows[1:])
        mini_dfs = [pd.concat([new_rows.iloc[[indx]], dataframe]) for indx, dataframe in enumerate(mini_dfs)]
        pat_df = pd.concat(mini_dfs)
        patients[mrn] = pat_df
        
    return pd.concat(list(patients.values()))

def add_delta_cols(df):
    delta_df = df.groupby(['mrn', 'enc']).shift(-1) - df
    df['delta_creat'] = np.round(delta_df['creat'].shift(1), decimals = 1)
    df['delta_time'] = delta_df['time'].shift(1)
    firstencs = df.reset_index().drop_duplicates('mrn') #Similarly, df.groupby(['mrn']).head(1)
    df['first_enc'] = [i in firstencs.index for i in range(df.shape[0])]
    return df

## Validate the rolling-window code

In [None]:
%%time
df_both = returnAKIpatients(df, aki_calc_type = 'both')

In [92]:
%%time
df_rw = returnAKIpatients(df, aki_calc_type = 'rolling_window')
df_rw.reset_index(inplace=True)
patient=100 # Should work for any value of patient
#df_rw.iloc[np.array([[i-2, i-1, i, i+1, i+2] for i in np.where(df_rw['aki'])[0]]).flatten()][5*patient:5*patient+5]

Wall time: 2.82 s


In [93]:
df_yu = pd.read_csv(r'H:\Data\Standardized AKI definition\dataset\aki flagger inpatient 2014-2018.csv')
df_yu['aki'] = df_yu['aki'].astype('bool')
print(df_yu.shape, df_rw.shape)
df_yu.head()

(1441707, 6) (1441707, 9)


Unnamed: 0,pat_mrn_id,pat_enc_csn_id,time,creatinine,aki,aki_stage
0,MR1000041,115884935,2015-06-14T20:59:00,1.6,False,0
1,MR1000041,115884935,2015-06-15T07:54:00,0.9,False,0
2,MR1000041,115884935,2015-06-16T07:02:00,0.9,False,0
3,MR1000041,115884935,2015-06-17T07:11:00,0.9,False,0
4,MR1000041,117378943,2015-07-18T08:39:00,1.1,False,0


In [101]:
df_yu.aki.sum(), df_rw.aki.sum()

(173632, 74190)

In [95]:
print(np.all(df_rw.reset_index().enc == df_yu.pat_enc_csn_id)) #Check whether the encounters are the same
np.all(df_yu.aki.astype('bool') == df_rw.reset_index().aki)

True


False

In [6]:
%%time
df_bc = returnAKIpatients(df, aki_calc_type = 'back_calc')
patient = 0
#df_bc.iloc[np.array([[i-2, i-1, i, i+1, i+2] for i in np.where(df_bc['aki'])[0]]).flatten()][5*patient:5*patient+5]

Process ForkPoolWorker-6:
Process ForkPoolWorker-5:
Process ForkPoolWorker-8:
Process ForkPoolWorker-7:
Traceback (most recent call last):
  File "/Users/saranmedical-smile/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/saranmedical-smile/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)


KeyboardInterrupt: 

###### might get rid of this in a minute

In [None]:
def add_rows(df):
    '''
    Input: Pandas data frame
    Output: Pandas data frame
    
    This function is the bulk of the computation - it adds dummy rows with the appropriate back-calculated values
    at the first encounter row. If one is only interested in those patients which have AKI according to the 
    Rolling Window definition, the code should run in less than a minute and *add_rows()* isn't necessary. 
    If you are interested in back-calculated AKI cases, then the analysis takes a bit longer 
    (although still under an hour) and that's where *add_rows()* comes in. It simply finds the first encounter 
    recorded for a patient, looks back between 7 and 365 days in the past and sees whether or not there are measured
    creatinine values. If there are, the mean of those values are put into the dummy row. Otherwise, a value 
    calculated based on the estimated glomerular filtration rate (eGFR) equation from A New Equation to Estimate 
    Glomerular Filtration Rate (Levey et. Al, 2009, https://pubmed.ncbi.nlm.nih.gov/19414839/) is used instead 
    in the dummy row.  
    '''
    patient_dfs = df.groupby(['mrn'], sort=False) 
    patients = dict()
    
    for mrn, pat_df in patient_dfs:
        patients[mrn] = pat_df
        backcalc_rows = np.array(pat_df.reset_index().drop_duplicates(['mrn', 'enc']).index)
        times_to_consider = [pat_df.iloc[indx].time for indx in backcalc_rows]

        new_rows = pat_df.iloc[backcalc_rows].copy()
        for i in range(1,len(times_to_consider)):
            new_rows.iloc[i, new_rows.columns.get_loc('creat')] = pat_df.loc[np.logical_and(times_to_consider[i] - pat_df.time < datetime.timedelta(days=365),times_to_consider[i] - pat_df.time > datetime.timedelta(days=7)), 'creat'].mean()

        mini_dfs = np.split(pat_df, backcalc_rows[1:])
        mini_dfs = [pd.concat([new_rows.iloc[[indx]], dataframe]) for indx, dataframe in enumerate(mini_dfs)]
        pat_df = pd.concat(mini_dfs)
        patients[mrn] = pat_df
        
    return pd.concat(list(patients.values()))

def parallelize_analysis(df, func=add_rows, num_cores = 4):
    '''
    Input: dataframe, generic function, number of cpu processors to dedicate to parallelization
    Output: function applied to dataframe
    
    Generic parallelizing function, simple splits the dataframe, performs the operations on the split dataframes,
    
    '''
    split_dfs = np.array_split(df, num_cores)
    with Pool(num_cores) as p:
        result = p.map(func, split_dfs)
        df = pd.concat(result)
    return df

def returnAKIpatients(df, aki_calc_type = 'both'):
    #Redefine delta creat and delta time cols
    delta_df = d.groupby(['mrn', 'enc']).shift(-1) - df
    df['delta_creat'] = delta_df['creat'].shift(1)
    df['delta_time'] = delta_df['time'].shift(1)
    
    #Rolling Window conditions
    if aki_calc_type == 'both':
        condition1 = np.logical_and(df.delta_creat > 0.3, df.delta_time < datetime.timedelta(hours=48))
        condition2 = np.logical_and(df.delta_creat > df.creat.shift(1)*0.5, df.delta_time < datetime.timedelta(days=7))
        df['aki'] = condition1 | condition2
    elif aki_calc_type == 'rolling_window':
        condition1 = np.logical_and(df.delta_creat > 0.3, df.delta_time < datetime.timedelta(hours=48))
        condition2 = np.logical_and(df.delta_creat > df.creat.shift(1)*0.5, df.delta_time < datetime.timedelta(days=7))
        df['aki'] = (condition1 | condition2) & (df.delta_time != datetime.timedelta(0))
    elif aki_calc_type == 'back_calc':
        condition = np.logical_and(df.delta_creat > df.creat.shift(1)*0.5, df.delta_time < datetime.timedelta(days=7))
        df['aki'] = condition & (df.delta_time == datetime.timedelta(0))
    return df

def add_delta_cols(df):
    delta_df = df.groupby(['mrn', 'enc']).shift(-1) - df
    df['delta_creat'] = delta_df['creat'].shift(1)
    df['delta_time'] = delta_df['time'].shift(1)
    firstencs = df.reset_index().drop_duplicates('mrn') #Similarly, df.groupby(['mrn']).head(1)
    df['first_enc'] = [i in firstencs.index for i in range(df.shape[0])]
    return df