In [30]:
import joblib
import polars as pl
import pandas as pd
import numpy as np
from pandas.tseries.holiday import USFederalHolidayCalendar
import time

In [31]:
output_dir = "/work/InternalMedicine/s223850/ED-StaticDynamic/raw_data/ED_EVENTS_6624_clean.joblib"

In [32]:
with open(output_dir, 'rb') as f:
    df = joblib.load(f)


In [4]:
df.columns

['PAT_ENC_CSN_ID',
 'PAT_MRN_ID',
 'PAT_ID',
 'Ethnicity',
 'FirstRace',
 'MultiRacial',
 'Sex',
 'Arrived_Time',
 'ED_Disposition',
 'Admitted_YN',
 'Patient_Age',
 'Acuity_Level',
 'Means_Of_Arrival',
 'Chief_Complaint',
 'Chief_Complaint_All',
 'Count_of_Chief_Complaints',
 'Calculated_DateTime',
 'Coverage_Financial_Class_Grouper',
 'Dispo_Prov_Admission_Rate',
 'ProblemList_Sixty_Admission_YN',
 'ProblemList_Eighty_Admission_YN',
 'Procedure in the Last 4 Weeks',
 'Has Completed Appt in Last Seven Days',
 'Has Hospital Encounter in Last Seven Days',
 'Number of Inpatient Admissions in the last 30 Days',
 'Number of past appointments in last 60 days',
 'Number of past inpatient admissions over ED visits in last three years',
 'Type',
 'EVENT_NAME',
 'ED_Location_YN',
 'Order_Status',
 'Result_Flag',
 'MEAS_VALUE',
 'Primary_DX_Name',
 'Primary_DX_First_ICD10',
 'Primary_DX_ICD10',
 'has_admit_order',
 'n_admit_orders',
 'n_disch_orders',
 'elapsed_time_min',
 'event_idx',
 'tta',
 

In [8]:
static_cols =     [
         "Ethnicity", "FirstRace", "Sex", "Acuity_Level", "Means_Of_Arrival",
        "cc_list", "Coverage_Financial_Class_Grouper", "Procedure in the Last 4 Weeks",
        "Has Completed Appt in Last Seven Days", "Has Hospital Encounter in Last Seven Days", "MultiRacial",
        "Patient_Age", "Dispo_Prov_Admission_Rate", "Number of Inpatient Admissions in the last 30 Days",
        "Number of past appointments in last 60 days", "Number of past inpatient admissions over ED visits in last three years",
       "ProblemList_Sixty_Admission_YN", "ProblemList_Eighty_Admission_YN",
        'arr_year', 'arr_month','arr_day','arr_hour', 'holiday' ,"Arrived_Time", 
    ]

In [6]:
df_static = df.select(
    static_cols
)

# Create static dictionary

In [24]:
USCal = USFederalHolidayCalendar()
def get_holiday(x):
    # n = USCal.holidays(pd.to_datetime(x), pd.to_datetime(x), return_name=True)
    n = USCal.holidays(x, x, return_name=True)

    if len(n) == 0:
        return 'null'
    else:
        return n.iloc[0]

In [25]:
get_holiday("2024-01-01")

"New Year's Day"

In [26]:
df_static = df_static.with_columns(
    pl.col('Arrived_Time').map_elements(lambda x: get_holiday(x.date()), return_dtype=pl.String).alias('holiday')
)

In [22]:
# Add holiday optimized version (Already implemented in prepare_data.py)
df_static = df_static.with_columns(
    pl.col('Arrived_Time').dt.date().alias('arr_date')
)
start = df_static['Arrived_Time'].to_pandas().min()
end = df_static['Arrived_Time'].to_pandas().max()

df_holiday = USCal.holidays(start, end, return_name=True).to_frame().reset_index()
df_holiday.columns = ['arr_date', 'holiday']
df_holiday['arr_date'] = df_holiday['arr_date'].dt.date
df_holiday = pl.DataFrame(df_holiday)
df_static = df_static.join(df_holiday, on='arr_date', how='left')

DuplicateError: unable to hstack, column with name "holiday_right" already exists

In [29]:
(df_static['holiday_right']!=df_static['holiday']).sum()

0

In [14]:
pl.DataFrame(df_holiday)

arr_date,holiday
datetime[ns],str
2022-06-20 00:00:00,"""Juneteenth Nat…"
2022-07-04 00:00:00,"""Independence D…"
2022-09-05 00:00:00,"""Labor Day"""
2022-10-10 00:00:00,"""Columbus Day"""
2022-11-11 00:00:00,"""Veterans Day"""
…,…
2023-12-25 00:00:00,"""Christmas Day"""
2024-01-01 00:00:00,"""New Year's Day…"
2024-01-15 00:00:00,"""Birthday of Ma…"
2024-02-19 00:00:00,"""Washington’s B…"


In [90]:
df_static['holname'].value_counts()

holname,count
str,u32
"""Washington’s B…",54851
"""null""",17308018
"""Juneteenth Nat…",58921
"""Veterans Day""",47643
"""Thanksgiving D…",29213
…,…
"""New Year's Day…",48347
"""Independence D…",40398
"""Birthday of Ma…",42755
"""Labor Day""",48559


In [26]:
df_static['Ethnicity'].value_counts()

Ethnicity,count
str,u32
"""Black or Afric…",1955
"""Non-Hispanic/L…",14001138
"""Asian""",150
"""Hispanic or La…",3431154
"""American India…",2546
"""Unknown""",280707
"""White""",3610
"""Declined""",115746


In [35]:
null_vals = [None,
             'none',
             'null', 
             'unknown',
             'undefined',
             '*unspecified',
             'unspecified']

In [36]:
static_dict = {}
for col in df_static.columns:
    if df_static[col].dtype == pl.String:
        static_dict[col] = {'null': 0, 'unk':1}
        for v in df_static[col].value_counts().sort(by='count', descending=True)[col]:
            if v is not None and v.lower().strip() not in null_vals: # null list
                static_dict[col][v] = len(static_dict[col])

In [37]:
static_dict['holiday'].keys()

dict_keys(['null', 'unk', 'Juneteenth National Independence Day', 'Columbus Day', 'Washington’s Birthday', 'Memorial Day', 'Labor Day', 'Christmas Day', "New Year's Day", 'Veterans Day', 'Birthday of Martin Luther King, Jr.', 'Independence Day', 'Thanksgiving Day'])

In [38]:
thresh = 1.0
inc_cc_df = df_static['cc_list'].explode().value_counts().sort(by='count', descending=True).with_columns(
    pl.col('count').cum_sum().alias('cumsum')
).with_columns((pl.col('cumsum')/pl.col('cumsum').last()).alias('prob')).with_columns( (pl.col('prob')<=thresh).cast(pl.UInt32).alias('included') )

In [105]:
inc_cc_df

cc_list,count,cumsum,prob,included
str,u32,u32,f64,u32
"""abdominalpain""",2719692,2719692,0.12189,1
"""chestpain""",1642000,4361692,0.19548,1
"""breathingprobl…",1571989,5933681,0.265932,1
"""generalizedwea…",701351,6635032,0.297365,1
"""headache""",640195,7275227,0.326057,1
…,…,…,…,…
"""concussion""",29,22312648,0.999996,1
"""postsurgicalfo…",25,22312673,0.999997,1
"""eyesurgery""",22,22312695,0.999998,1
"""consultonly""",21,22312716,0.999999,1


In [39]:
cc_dict = {'null':0, 'unk':1}
for v in inc_cc_df.filter(pl.col('included')==1)['cc_list']:
    if v not in null_vals:
        cc_dict[v] = len(cc_dict)

In [112]:
cc_dict

{'null': 0,
 'unk': 1,
 'abdominalpain': 2,
 'chestpain': 3,
 'breathingproblem': 4,
 'generalizedweakness': 5,
 'headache': 6,
 'falls': 7,
 'fever': 8,
 'dizziness': 9,
 'other': 10,
 'abnormaltests': 11,
 'backpain': 12,
 'flulikesymptoms': 13,
 'nauseaandvomiting': 14,
 'cough': 15,
 'flankpain': 16,
 'vomiting': 17,
 'alteredmentalstatus': 18,
 'legpain': 19,
 'nausea': 20,
 'pain': 21,
 'diarrhea': 22,
 'eyeproblem': 23,
 'syncope': 24,
 'swelling': 25,
 'palpitations': 26,
 'vaginalbleeding': 27,
 'fatigue': 28,
 'legswelling': 29,
 'psychiatricevaluation': 30,
 'seizures': 31,
 'numbness': 32,
 'extremityweakness': 33,
 'bloodpressure': 34,
 'hematuria': 35,
 'suicidal': 36,
 'rectalbleeding': 37,
 'hypotension': 38,
 'sobatrest': 39,
 'armpain': 40,
 'hypertension': 41,
 'sicklecellcrisis': 42,
 'lowbackpain': 43,
 'motorvehiclecrash': 44,
 'throatproblem': 45,
 'footpain': 46,
 'neckpain': 47,
 'confusion': 48,
 'hippain': 49,
 'kneepain': 50,
 'postopcomplaints': 51,
 'psych

In [114]:
None in cc_dict

False

In [40]:
t = time.time()
df_static = df_static.with_columns(
    [
        pl.col(key).map_elements(lambda x: val_dict.get(x, 1) if x not in null_vals else 0, return_dtype=pl.UInt16).alias(f'{key}_index')
        for key, val_dict in static_dict.items()
    ]
)
print(f'Took {time.time()-t} seconds ...')

Took 38.14787769317627 seconds ...


In [41]:
import cProfile

In [43]:
def create_key_index(df_static):
    df_static = df_static.with_columns(    [
        pl.col(key).map_elements(lambda x: val_dict.get(x, 1) if x not in null_vals else 0, return_dtype=pl.UInt16).alias(f'{key}_index')
        for key, val_dict in static_dict.items()
    ]
    )
    return df_static

In [44]:
dd = cProfile.run('create_key_index(df_static)')

         9708 function calls (9697 primitive calls) in 36.319 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000   36.314   36.314 2577217657.py:1(create_key_index)
        1    0.000    0.000    0.004    0.004 2577217657.py:2(<listcomp>)
      108    0.000    0.000    0.000    0.000 <string>:1(<lambda>)
        1    0.004    0.004   36.319   36.319 <string>:1(<module>)
        1    0.000    0.000    0.000    0.000 _collections_abc.py:821(get)
        1    0.000    0.000    0.000    0.000 abc.py:121(__subclasscheck__)
        9    0.000    0.000    0.000    0.000 col.py:145(__new__)
        9    0.000    0.000    0.000    0.000 col.py:20(_create_col)
        9    0.000    0.000    0.000    0.000 convert.py:142(is_polars_dtype)
        9    0.000    0.000    0.000    0.000 convert.py:382(py_type_to_dtype)
        9    0.000    0.000    0.000    0.000 dis.py:128(_get_code_object)
        9    0.000   

In [122]:
def list2idx(x, vocab):
    if x is None or len(x) == 0:
        return [0]
    o = []
    for v in x:
        if v in null_vals:
            o.append(0)
        else:
            o.append(vocab.get(v, 1))
    return o

In [125]:
vec = []
for row in df_static.iter_rows(named=True):
    vec.append(list2idx(row['cc_list'], cc_dict))

In [132]:
for v in vec:
    if v is None or len(v) == 0 or None in v:
        print("There is none")

In [134]:
t = time.time()
df_static = df_static.with_columns(
    pl.col('cc_list').map_elements(lambda x: list2idx(x, cc_dict), pl.List(pl.Int64)).alias('cc_list_index')
)
print(f'Took {time.time()-t} seconds ...')

Took 184.5332260131836 seconds ...


In [136]:
"""
OUTPUT of a fitted model
"""
# static_dict, cc_dict, null_vals

'\nOUTPUT of a fitted model\n'

In [138]:
### Split data into events

In [45]:
idx = 50

In [46]:
df_50 = df.filter(pl.col('event_idx')<50)

In [47]:
df_50_static = df_50.select(static_cols)

In [48]:
df_50_static.shape

(151425, 24)

In [50]:
df_50_static['Ethnicity'], static_dict['Ethnicity']

(shape: (151_425,)
 Series: 'Ethnicity' [str]
 [
 	"Non-Hispanic/L…
 	"Non-Hispanic/L…
 	"Non-Hispanic/L…
 	"Non-Hispanic/L…
 	"Non-Hispanic/L…
 	…
 	"Non-Hispanic/L…
 	"Non-Hispanic/L…
 	"Non-Hispanic/L…
 	"Non-Hispanic/L…
 	"Non-Hispanic/L…
 ],
 {'null': 0,
  'unk': 1,
  'Non-Hispanic/Latino': 2,
  'Hispanic or Latino': 3,
  'Declined': 4,
  'White': 5,
  'American Indian or Alaska Native': 6,
  'Black or African American': 7,
  'Asian': 8})

In [52]:
def onehotencode_series2mat(series, vocab):
    X = np.zeros((len(series), len(vocab)), dtype=np.uint16)
    for idx, v in enumerate(series):
        if v in null_vals:
            X[idx, 0] += 1
        else:
            X[idx, vocab.get(v, 1)] += 1
    return X

In [56]:
X =onehotencode_series2mat(df_50_static["Ethnicity"], static_dict["Ethnicity"])

In [66]:
df_50_static["Ethnicity"][124864]

'White'

In [75]:
colnames = [""]*len(static_dict["Ethnicity"])
for k, v in static_dict['Ethnicity'].items():
    colnames[v] = f"Ethnicity_{k}"

In [76]:
Xdf = pl.DataFrame(X)

In [77]:
Xdf.columns = colnames

In [80]:
pl.concat([df_50_static, Xdf], how='horizontal')

Ethnicity,FirstRace,Sex,Acuity_Level,Means_Of_Arrival,cc_list,Coverage_Financial_Class_Grouper,Procedure in the Last 4 Weeks,Has Completed Appt in Last Seven Days,Has Hospital Encounter in Last Seven Days,MultiRacial,Patient_Age,Dispo_Prov_Admission_Rate,Number of Inpatient Admissions in the last 30 Days,Number of past appointments in last 60 days,Number of past inpatient admissions over ED visits in last three years,ProblemList_Sixty_Admission_YN,ProblemList_Eighty_Admission_YN,arr_year,arr_month,arr_day,arr_hour,holiday,Arrived_Time,Ethnicity_null,Ethnicity_unk,Ethnicity_Non-Hispanic/Latino,Ethnicity_Hispanic or Latino,Ethnicity_Declined,Ethnicity_White,Ethnicity_American Indian or Alaska Native,Ethnicity_Black or African American,Ethnicity_Asian
str,str,str,str,str,list[str],str,str,u8,u8,i64,f64,f64,u16,u16,u16,i64,i64,i32,i8,i8,i8,str,datetime[ns],u16,u16,u16,u16,u16,u16,u16,u16,u16
"""Non-Hispanic/L…","""White""","""Male""",,"""Ambulance""","[""hypotension""]","""Medicaid""","""No""",0,0,0,67.882272,1.0,1,5,3,1,1,2022,6,20,0,"""Juneteenth Nat…",2022-06-20 00:15:55,0,0,1,0,0,0,0,0,0
"""Non-Hispanic/L…","""White""","""Male""",,"""Ambulance""","[""hypotension""]","""Medicaid""","""No""",0,0,0,67.882272,1.0,1,5,3,1,1,2022,6,20,0,"""Juneteenth Nat…",2022-06-20 00:15:55,0,0,1,0,0,0,0,0,0
"""Non-Hispanic/L…","""White""","""Male""",,"""Ambulance""","[""hypotension""]","""Medicaid""","""No""",0,0,0,67.882272,1.0,1,5,3,1,1,2022,6,20,0,"""Juneteenth Nat…",2022-06-20 00:15:55,0,0,1,0,0,0,0,0,0
"""Non-Hispanic/L…","""White""","""Male""",,"""Ambulance""","[""hypotension""]","""Medicaid""","""No""",0,0,0,67.882272,1.0,1,5,3,1,1,2022,6,20,0,"""Juneteenth Nat…",2022-06-20 00:15:55,0,0,1,0,0,0,0,0,0
"""Non-Hispanic/L…","""White""","""Male""",,"""Ambulance""","[""hypotension""]","""Medicaid""","""No""",0,0,0,67.882272,1.0,1,5,3,1,1,2022,6,20,0,"""Juneteenth Nat…",2022-06-20 00:15:55,0,0,1,0,0,0,0,0,0
…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…
"""Non-Hispanic/L…","""Black or Afric…","""Male""","""II""","""Ambulance""","[""chestpain""]","""Commercial""","""No""",1,0,0,41.823408,0.4,1,5,3,1,1,2024,5,27,23,"""Memorial Day""",2024-05-27 23:40:07,0,0,1,0,0,0,0,0,0
"""Non-Hispanic/L…","""Black or Afric…","""Male""","""II""","""Ambulance""","[""chestpain""]","""Commercial""","""No""",1,0,0,41.823408,0.4,1,5,3,1,1,2024,5,27,23,"""Memorial Day""",2024-05-27 23:40:07,0,0,1,0,0,0,0,0,0
"""Non-Hispanic/L…","""Black or Afric…","""Male""","""II""","""Ambulance""","[""chestpain""]","""Commercial""","""No""",1,0,0,41.823408,0.4,1,5,3,1,1,2024,5,27,23,"""Memorial Day""",2024-05-27 23:40:07,0,0,1,0,0,0,0,0,0
"""Non-Hispanic/L…","""Black or Afric…","""Male""","""II""","""Ambulance""","[""chestpain""]","""Commercial""","""No""",1,0,0,41.823408,0.4,1,5,3,1,1,2024,5,27,23,"""Memorial Day""",2024-05-27 23:40:07,0,0,1,0,0,0,0,0,0


### Implement transformers

In [None]:
'''
Take dataframe, dictionaries, and how do you want to preprocess it
'''
'''
Since the same function will be called on multiple columns, those calls are indepenent and thus can be parallelized
    - Send each vocabulary with only the series corresponding to that raw column data to the function
    - Map each element to the series to the corresponding row and column in X sparse matrix
    - return a dictionary with name of the column and X
'''
def onehotencoding(df, vocab_dict, null_vals):
    # Initialize the newly added matrix
    X = np.zeros((len(df), len(vocab_dict)), dtype=np.uint16)
    for idx, row in enumerate(df.iterrows(named=True)):
        X[idx, vocab_dict[row

In [None]:
class CustomOneHotEncoding:
    def __init__(self, single_val_cols, multi_val_cols, vocabthresh=100, cumprob_inc_thresh=0.99):
        '''
            multi_val_cols are expected to be passed in a list datastructure. null values in the data should be mapped to empty list
            for multi_val_cols
        '''
        self.single_val_cols = single_val_cols
        self.multi_val_cols = multi_val_cols
        self.vocab_thr = vocabthresh
        self.inc_thr = cumprob_inc_thresh
        self.fitted = False

    def _build_vocab_col(self, df, colname)
        dd = dd.with_columns(
            pl.col('count').cum_sum().alias('cumsum')
        ).with_columns(
            (pl.col('cumsum')/pl.col('cumsum').max()).alias('prob')
        ).with_columns(
            (pl.col('prob')<=inc_cum_thresh).alias('included')
        )
        inc_df = dd.filter(pl.col('included'))
        inc_vals = inc_df[colname].to_list()
        inc_dict = {}
        inc_dict[f'null_{colname}'] = 0
        if inc_cum_thresh < 1.0:
            inc_dict[f'rare_{colname}'] = 1
        for c in inc_vals:
            inc_dict[c] = len(inc_dict)
        return inc_dict

    def _build_vocab(self, df):
        multival_vocab_ = {}
        for colname in self.multi_val_cols:
            dd = df.explode(colname)[colname].value_counts().sort(by='count', descending=True)
            vocab = self._build_vocab_col(dd, colname)
            multival_vocab_[colname] = vocab
            
        singleval_vocab_ = {}
        for colname in self.single_val_cols:
            vocab = self._build_vocab_col(df, colname)
            singleval_vocab_[colname] = vocab
    
        return singleval_vocab_, multival_vocab_

    def fit(self, X: pl.DataFrame, y=None):
        for col in self.single_val_cols:
            assert col in X.columns, f'{col} is supposed to be processed as a single valued column using CustomOneHotEncoding. However it doesnt exist in the passed X'
        
        for col in self.multi_val_cols:
            assert col in X.columns, f'{col} is supposed to be processed as a multi valued column using CustomOneHotEncoding. However it doesnt exist in the passed X'
            
        # Create dictionaries
        self.singleval_vocab_, self.multi_val_cols = self._build_vocab(df)
        self.fitted  = True
        
        return self
    
    def _transform_single_val(series, vocab):
        X = np.zeros((len(series), len(vocab)), dtype=np.uint16)
        for idx, v in enumerate(series):
            if v in null_vals:
                X[idx, 0] += 1
            else:
                X[idx, vocab.get(v, 1)] += 1
        return X
    
    def transform(self, X, y=None):
        if not self.fitted:
            raise ValueError("You need to run .fit() method first")
        X_    
        
    def fit_transform(self, X, y=None):
        self.fit(X, y)
        return self.transform(X, y)