This approach avoids transformation to a dask bag and does all the computation in a dask dataframe.

In [1]:
import os
import json

In [2]:
from dask_jobqueue import SLURMCluster
import dask.dataframe as dd
from dask.distributed import Client

In [3]:
import numpy as np
from glob import glob

In [4]:
symptom_db_json = os.path.join("/home/oagba/bulk/data/kk/json", "symptom_db.json")
condition_db_json = os.path.join("/home/oagba/bulk/data/kk/json", "condition_db.json")

In [5]:
with open(symptom_db_json) as fp:
    symptom_db = json.load(fp)
with open(condition_db_json) as fp:
    condition_db = json.load(fp)

In [6]:
symptom_vector = sorted(list(symptom_db.keys()))
condition_codes = sorted(list(condition_db.keys()))
condition_labels = {code: idx for idx, code in enumerate(condition_codes)}

In [3]:
cluster = SLURMCluster(
    queue='general',
    # project='medvice_parse',
    cores=16,
    memory='60 GB',
    walltime='03:00:00'
)

In [8]:
client = Client(cluster)
cluster.scale(10)

In [9]:
csv_dir = "/shares/bulk/oagba/data/kk/csv"
patients_csv = os.path.join(csv_dir, "patients.csv")
conditions_csv = os.path.join(csv_dir, "conditions.csv")
symptoms_csv = os.path.join(csv_dir, "symptoms.csv")

In [10]:
patient_sel_columns = ['Id', 'BIRTHDATE', 'RACE', 'GENDER']
    
patients = dd.read_csv(
    patients_csv,
    usecols=patient_sel_columns,
    parse_dates=['BIRTHDATE'],
    infer_datetime_format=True
)



In [11]:
condition_columns = ['START','STOP','PATIENT','ENCOUNTER','CODE','DESCRIPTION']
condition_sel_columns = ['ENCOUNTER', 'PATIENT', 'CODE', 'START']
conditions = dd.read_csv(
    conditions_csv, 
    #names=condition_columns,
    usecols=condition_sel_columns, 
    parse_dates=['START'], 
    infer_datetime_format=True
)

In [12]:
def _race_txform(val):
    race_code = {'white': 0, 'black':1, 'asian':2, 'native':3, 'other':4}
    return race_code.get(val)
def _label_txform(val, labels):
    return labels.get(val)

In [13]:
patients['RACE'] = patients['RACE'].apply(_race_txform, meta=('RACE', np.uint8))
patients['GENDER'] = patients['GENDER'].apply(lambda gender: 0 if gender == 'F' else 1, meta=('GENDER', np.bool))

In [14]:
conditions['LABEL'] = conditions['CODE'].apply(_label_txform, labels=condition_labels, meta=('CODE', np.uint16))

In [15]:
df = conditions.merge(patients, left_on='PATIENT', right_on='Id', suffixes=('', '_pat'))

In [16]:
df = df.repartition(npartitions=100)

In [17]:
# df = client.persist(df)

In [18]:
symptom_columns = ['SYMPTOM_CODE','SYMPTOM_DISPLAY','ENCOUNTER','PATIENT']
symptom_sel_colums = ['ENCOUNTER', 'PATIENT', 'SYMPTOM_CODE']
symptoms = dd.read_csv(symptoms_csv, names=symptom_columns, usecols=symptom_sel_colums)

In [19]:
symptoms = symptoms.repartition(npartitions=200)

In [20]:
from collections import OrderedDict

In [21]:
label_map = OrderedDict()
for idx, item in enumerate(symptom_vector):
    label_map[item] = 2 ** idx

In [22]:
def transform_symptom_codes(item, label_map):
    return label_map.get(item)

In [23]:
symptoms['SYMPTOM_CODE'] = symptoms['SYMPTOM_CODE'].apply(transform_symptom_codes, label_map=label_map, meta=('SYMPTOM_CODE', np.uint16))

In [24]:
symptoms['grp'] = symptoms.ENCOUNTER

In [25]:
# symptoms = symptoms.set_index('ENCOUNTER')

In [26]:
# continue after this

In [27]:
df = symptoms.merge(df, left_index=True, right_index=True, suffixes=('_symp', ''))

In [28]:
# persist df and del symptoms, patients and conditions (see if we can free up memory before continuing)

In [29]:
if df.npartitions > 200:
    df = df.repartition(npartitions=200)

In [30]:
df['AGE'] = ((df['START'] - df['BIRTHDATE']).astype('timedelta64[M]')/12).astype(np.uint16)

In [31]:
ordered_keys = ['grp', 'LABEL', 'RACE', 'GENDER', 'AGE', 'SYMPTOM_CODE']

In [32]:
df = df[ordered_keys]

In [33]:
df['counter'] = 1
df['counter'] = df.counter.astype(np.uint8)

In [34]:
# continue below this line

In [35]:
# clear the data thats being used - does this help free up memory on the cluster ?
# del symptoms
# del conditions
# del patients

In [36]:
grouped = df.groupby('grp')

In [37]:
df = grouped.agg('sum', split_out=200)

In [38]:
def map_agg(df):
    df.LABEL = (df.LABEL/df.counter).astype(np.uint16)
    df.RACE = (df.RACE/df.counter).astype(np.uint8)
    df.AGE = (df.AGE/df.counter).astype(np.uint8)
    df.GENDER = (df.GENDER/df.counter).astype(np.uint8)
    return df[['LABEL', 'RACE', 'AGE', 'GENDER', 'SYMPTOM_CODE']]

In [39]:
def map_expand(df, label_map):
    def check_inner(val, comp):
        c = val & comp
        if c > 0:
            return 1
        else:
            return 0
    for k, v in label_map.items():
        df[k] = df.SYMPTOM_CODE.apply(check_inner, comp=v)
    return df

In [40]:
dtypes = {
    'LABEL': np.uint16,
    'RACE': np.uint8,
    'AGE': np.uint8,
    'GENDER': np.uint8,
    'SYMPTOM_CODE': np.object
}

In [41]:
full_dtype = {item: val for item, val in dtypes.items()}
full_dtype.update({item: np.uint8 for item in label_map})

In [42]:
df = df.map_partitions(map_agg, meta=dtypes)

In [43]:
df = df.map_partitions(map_expand, label_map=label_map, meta=full_dtype)

In [44]:
csv_op = "/home/oagba/bulk/data/kk/data-*.csv"

In [None]:
df.to_csv(csv_op)