Reproducible conda environment used: file db_analysis-linux-64.lock 

In [1]:
import sys
from pathlib import Path
import os
sys.path.insert(0,str(Path(os.path.abspath('')).parent))
from proj_config import *
from sklearn import preprocessing as preproc
import scipy as sp
import pandas as pd
import numpy as np
import gc
import matplotlib as mpl
import matplotlib.pyplot as plt
import hot_encoding as oh
import concepts_toolbox
import dtypecheck as dc
import pandas_utils
import csv
import timeit
from importlib import reload

datadir = get_datadir() / "mimic-omop/pre-processed/"

# Diagnoses one hot encoding

In [2]:
diagnoses = pd.read_csv(datadir/"diagnoses.csv.gz",usecols=["visit_occurrence_id","icd9_code"])
#diagnoses = diagnoses.sample(n=100000)
icd9_concepts = pd.read_csv(datadir/"icd9_concepts.csv.gz")

In [20]:
icd9_concepts.head()

Unnamed: 0,concept_id,concept_name,domain_id,concept_class_id,standard_concept,concept_code
0,44829696,Cholera,Condition,3-dig nonbill code,,1.0
1,44835564,Cholera due to vibrio cholerae,Condition,4-dig billing code,,1.0
2,44823922,Cholera due to vibrio cholerae el tor,Condition,4-dig billing code,,1.1
3,44827441,"Cholera, unspecified",Condition,4-dig billing code,,1.9
4,44829697,Typhoid and paratyphoid fevers,Condition,3-dig nonbill code,,2.0


## Using Sklearn

In [24]:
enc = preproc.OneHotEncoder(categories=icd9_concepts.concept_code)
enc.fit(diagnoses.icd9_code.reshape(-1,1))

ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().

## Using pandas get_dummies and then aggregate

In [56]:
%%time
oh_icd_codes = pd.get_dummies(diagnoses.icd9_code,columns="icd9_code",sparse=True)

CPU times: user 73.6 ms, sys: 8.01 ms, total: 81.6 ms
Wall time: 79 ms


In [None]:
oh_icd_codes.info()
oh_icd_codes.head()

In [None]:
%time
oh_diagnoses_df = pd.merge(diagnoses['visit_occurrence_id'],oh_icd_codes,left_index=True,right_index=True)
test = oh_diagnoses_df.groupby(by="visit_occurrence_id",as_index=False).sum()

**This method is intractable even with small samples**

## Using pandas string aggregation and then get_dummies

In [4]:
# Monkey patch the _str_get_dummies to enforce uint8 return type

def _str_get_dummies_uint8(self, sep="|"):
    from pandas import Series
    import pandas._libs.lib as lib

    arr = Series(self).fillna("")
    try:
        arr = sep + arr + sep
    except TypeError:
        arr = sep + arr.astype(str) + sep

    tags: set[str] = set()
    for ts in Series(arr).str.split(sep):
        tags.update(ts)
    tags2 = sorted(tags - {""})

    dummies = np.empty((len(arr), len(tags2)), dtype=np.uint8)

    for i, t in enumerate(tags2):
        pat = sep + t + sep
        dummies[:, i] = lib.map_infer(arr.to_numpy(), lambda x: pat in x)
    return dummies, tags2

pd.core.strings.object_array.ObjectStringArrayMixin._str_get_dummies = _str_get_dummies_uint8

In [5]:
%%time
agg_diagnoses_df = diagnoses[['visit_occurrence_id','icd9_code']].groupby(by="visit_occurrence_id",as_index=False).agg("|".join)

CPU times: user 401 ms, sys: 19.4 ms, total: 420 ms
Wall time: 425 ms


In [None]:
agg_diagnoses_df.head()

In [7]:
%%time
oh_icd_codes = agg_diagnoses_df.icd9_code.str.get_dummies(sep='|')#.astype(dtype=np.uint8)#.astype(pd.SparseDtype(dtype='uint8',fill_value=0))

CPU times: user 1min 57s, sys: 4.16 s, total: 2min 1s
Wall time: 2min 5s


In [None]:
oh_icd_codes.info()
oh_icd_codes.head()

In [None]:
oh_diagnoses_df = pd.merge(agg_diagnoses_df['visit_occurrence_id'],oh_icd_codes,left_index=True,right_index=True)
oh_diagnoses_df.info()
oh_diagnoses_df.head()

In [23]:
%%time
visit_occurrence_s = pd.read_csv(datadir/"../visit_occurrence.csv.gz",usecols=['visit_occurrence_id']).squeeze()
tmp = visit_occurrence_s[np.logical_not(visit_occurrence_s.isin(oh_diagnoses_df.visit_occurrence_id))]
colvals_dict = {colname:0 for colname in oh_diagnoses_df.columns}
colvals_dict['visit_occurrence_id']=tmp
oh_diagnoses_df = pd.concat([oh_diagnoses_df,pd.DataFrame(colvals_dict).astype(oh_diagnoses_df.dtypes.to_dict())])
oh_diagnoses_df.fillna(0,inplace=True)

CPU times: user 1.93 s, sys: 2.37 s, total: 4.3 s
Wall time: 4.44 s


In [24]:
%%time
oh_diagnoses_df.to_csv(datadir/"oh_diagnoses.csv.gz",index=False)

CPU times: user 1min 58s, sys: 140 ms, total: 1min 58s
Wall time: 1min 58s


Caveat: this is not really portable because there's no way to incorporate unseen codes

# Diagnoses hierarchy unrolling and hot encode

In [7]:
diagnoses = pd.read_csv(datadir/"diagnoses.csv.gz",usecols=["visit_occurrence_id","icd9_concept_id"])
icd9_concepts = pd.read_csv(datadir/"icd9_concepts.csv.gz")
diagnoses.head()

Unnamed: 0,visit_occurrence_id,icd9_concept_id
0,30549,44825520
1,33647,44833580
2,10854,44826039
3,17475,44824251
4,30550,44835139


In [8]:
icd9_relationships = pd.read_csv(datadir/"icd9_concept_relationships.csv.gz")
diagnoses_rollup = concepts_toolbox.unroll_relationship(diagnoses,
                                     concept_colname="icd9_concept_id",
                                     relationship="Is a",
                                     concept_relationship=icd9_relationships,max_depth=-1)
diagnoses_rollup.drop_duplicates(inplace=True)

~~**I have to drop duplicates since the concept relationship table contains parents relationship to several levels**~~
This is no longer true since I have filtered the DAG to make a tree, but taking unique will not hurt.

In [9]:
diagnoses_rollup = pd.merge(diagnoses_rollup,
         icd9_concepts[['concept_id',"concept_code"]],
         left_on="icd9_concept_id",
         right_on="concept_id").drop(columns=['icd9_concept_id','concept_id']).sort_values('visit_occurrence_id')
diagnoses_rollup.rename(columns={'concept_code': "icd9_code"}, inplace=True)

In [10]:
oh_diagnoses_rollup = oh.hot_encode_df_col(diagnoses_rollup,colname="icd9_code",by="visit_occurrence_id",sep="|")

In [11]:
visit_occurrence_s = pd.read_csv(datadir/"../visit_occurrence.csv.gz",usecols=['visit_occurrence_id']).squeeze()
tmp = visit_occurrence_s[np.logical_not(visit_occurrence_s.isin(oh_diagnoses_rollup.visit_occurrence_id))]
colvals_dict = {colname:0 for colname in oh_diagnoses_rollup.columns}
colvals_dict['visit_occurrence_id']=tmp
oh_diagnoses_rollup = pd.concat([oh_diagnoses_rollup,pd.DataFrame(colvals_dict).astype(oh_diagnoses_rollup.dtypes.to_dict())])
oh_diagnoses_rollup.fillna(0,inplace=True)

In [12]:
oh_diagnoses_rollup.to_csv(datadir/"oh_diagnoses_rollup.csv.gz",index=False) # drop a csv file for archiving
oh_diagnoses_rollup.to_parquet(datadir/"oh_diagnoses_rollup.parquet",index=False) # parquet file as working file

In [68]:
oh_diagnoses_rollup.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 58929 entries, 0 to 58928
Columns: 9337 entries, visit_occurrence_id to V91.03
dtypes: int64(1), uint8(9336)
memory usage: 525.1 MB


# Prescriptions hot encode

In [None]:
ingredients = pd.read_csv(datadir/"unique_ingredient_prescription.csv.gz",usecols=['visit_occurrence_id','rxnorm_ingredient_name'])
ingredients.fillna(value={'rxnorm_ingredient_name':"NA"})

In [None]:
%%time
oh_ingredients_df = oh.hot_encode_df_col(ingredients.dropna(),colname="rxnorm_ingredient_name",by="visit_occurrence_id",sep="|")
oh_ingredients_df.head()

In [68]:
oh_ingredients_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 50211 entries, 0 to 50210
Columns: 1165 entries, visit_occurrence_id to zonisamide
dtypes: int64(1), uint8(1164)
memory usage: 56.1 MB


In [26]:
visit_occurrence_s = pd.read_csv(datadir/"../visit_occurrence.csv.gz",usecols=['visit_occurrence_id']).squeeze()
tmp = visit_occurrence_s[np.logical_not(visit_occurrence_s.isin(oh_ingredients_df.visit_occurrence_id))]
colvals_dict = {colname:0 for colname in oh_ingredients_df.columns}
colvals_dict['visit_occurrence_id']=tmp
oh_ingredients_df = pd.concat([oh_ingredients_df,pd.DataFrame(colvals_dict).astype(oh_ingredients_df.dtypes.to_dict())])
oh_ingredients_df.fillna(0,inplace=True)

In [27]:
oh_ingredients_df.to_csv(datadir/"oh_ingredients.csv.gz",index=False) # drop a csv file for archiving
oh_ingredients_df.to_parquet(datadir/"oh_ingredients.parquet",index=False) # parquet file as working file

# Performance checks

## Some memory experiments: convert_dtypes

In [20]:
init_array=np.zeros(500000)
pd.DataFrame(init_array,dtype=bool).info()
pd.DataFrame(init_array,dtype=np.bool_).info()
pd.DataFrame(init_array,dtype='uint8').info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500000 entries, 0 to 499999
Data columns (total 1 columns):
 #   Column  Non-Null Count   Dtype
---  ------  --------------   -----
 0   0       500000 non-null  bool 
dtypes: bool(1)
memory usage: 488.4 KB
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500000 entries, 0 to 499999
Data columns (total 1 columns):
 #   Column  Non-Null Count   Dtype
---  ------  --------------   -----
 0   0       500000 non-null  bool 
dtypes: bool(1)
memory usage: 488.4 KB
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500000 entries, 0 to 499999
Data columns (total 1 columns):
 #   Column  Non-Null Count   Dtype
---  ------  --------------   -----
 0   0       500000 non-null  uint8
dtypes: uint8(1)
memory usage: 488.4 KB


## OH file reading

In [22]:
%%time
oh_diagnoses_df = pandas_utils.read_csv_defaultdtype(datadir/"oh_diagnoses_rollup.csv.gz",
                                                     defaultdtype='uint8',dtypes_dict={'visit_occurrence_id':'uint64'},
                                                    engine='c')

CPU times: user 1min 38s, sys: 4.01 s, total: 1min 42s
Wall time: 1min 57s


In [2]:
%%time
oh_ingredients_df = pandas_utils.read_csv_defaultdtype(datadir/"oh_ingredients.csv.gz",
                                                       defaultdtype='uint8',dtypes_dict={'visit_occurrence_id':'uint64'},
                                                      engine='c')

CPU times: user 4.23 s, sys: 154 ms, total: 4.39 s
Wall time: 4.39 s


In [None]:
%%time
# Using the faster csv reader from pandas, introduced in 1.4.0
#del oh_diagnoses_df
# WILL EXHAUST MY COMPUTER RAM
oh_diagnoses_df = pandas_utils.read_csv_defaultdtype(datadir/"oh_diagnoses_rollup.csv.gz",
                                                     defaultdtype='uint8',dtypes_dict={'visit_occurrence_id':'uint64'},
                                                    engine='pyarrow')

In [17]:
import gc
from multiprocessing import Process, Event, Value
import os
import time
from timeit import default_timer
from collections import defaultdict
import matplotlib.pyplot as plt
import numpy as np
import psutil


class Timer:
    """Simple util to measure execution time.
    Examples
    --------
    >>> import time
    >>> with Timer() as timer:
    ...     time.sleep(1)
    >>> print(timer)
    00:00:01
    """
    def __init__(self):
        self.start = None
        self.elapsed = None

    def __enter__(self):
        self.start = default_timer()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.elapsed = default_timer() - self.start

    def __str__(self):
        return self.verbose()

    def __float__(self):
        return self.elapsed

    def verbose(self):
        if self.elapsed is None:
            return '<not-measured>'
        return self.format_elapsed_time(self.elapsed)

    @staticmethod
    def format_elapsed_time(value: float):
        return time.strftime('%H:%M:%S', time.gmtime(value))
    
    
class MemoryTrackingProcess(Process):
    """A process that periodically measures the amount of RAM consumed by another process.
    
    This process is stopped as soon as the event is set.
    """
    def __init__(self, pid, event, **kwargs):
        super().__init__()
        self.p = psutil.Process(pid)
        self.event = event
        self.max_mem = Value('f', 0.0)
        
    def run(self):
        mem_usage = []
        while not self.event.is_set():
            info = self.p.memory_info()
            mem_bytes = info.rss
            mem_usage.append(mem_bytes)
            time.sleep(0.05)
        self.max_mem.value = np.max(mem_usage)


class MemoryTracker:
    """A context manager that runs MemoryTrackingProcess in background and collects 
    the information about used memory when the context is exited.
    """
    def __init__(self, pid=None):
        pid = pid or os.getpid()
        self.start_mem = psutil.Process(pid).memory_info().rss
        self.event = Event()
        self.p = MemoryTrackingProcess(pid, self.event)
    
    @property
    def memory(self):
        return self.p.max_mem.value - self.start_mem
    
    def __enter__(self):
        self.p.start()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.event.set()
        self.p.join()

        
class GC:
    def __enter__(self):
        return self
    def __exit__(self, exc_type, exc_val, ext_tb):
        self.collected = gc.collect()
        
        
def size_of(filename, unit=1024**2):
    return round(os.stat(filename).st_size / unit, 2)


def get_save_load(df, fmt):
    def csv_reader(filename):
        return pandas_utils.read_csv_defaultdtype(filename, defaultdtype='uint8',dtypes_dict={'visit_occurrence_id':'uint64'},
                                                    engine='c')
    save = getattr(df, f'to_csv') if fmt=='csv.gz' else getattr(df, f'to_{fmt}')
    load = csv_reader if fmt in ['csv','csv.gz'] else getattr(pd, f'read_{fmt}')
    return save, load


def benchmark(dataset, list_of_formats, n_rounds=1):
    """Runs dataset saving/loading benchamrk using formts from the list_of_formats.
    
    Each round a new random dataset is generated with data_size observations. 
    The measurements for each of the rounds are concatenated together and returned
    as a single data frame.
    
    Parameters:
        list_of_formats: A list of tuples in the format (<format_name>, [<params_dict>]). 
            The <format_name> should be one of the pandas supported formats.
        data_size: A number of samples in the generated dataset.
        n_num: A number of numerical columns in the generated dataset.
        n_cat: A number of categorical columns in the generated dataset.
        n_rounds: A number of randomly generated datasets to test the formats.
        as_category: If True, then categorical columns will be converted into 
            pandas.Category type before saving.
            
    """
    runs = []
    
    for i in range(n_rounds):
        print(f'Benchmarking round #{i + 1:d}')
        
        benchmark = []
        
        for case in list_of_formats:
            fmt, params = case if len(case) == 2 else (case[0], {})
            
            with GC():
                print('\ttesting format:', fmt)
                filename = f'/home/quentin/Desktop/filebench.{fmt}'
                save, load = get_save_load(dataset, fmt)
                results = defaultdict(int)
                results['format'] = fmt
                
                with MemoryTracker() as tracker:
                    with Timer() as timer:
                        save(filename, **params)
                results['size_mb'] = size_of(filename)
                results['save_ram_delta_mb'] = tracker.memory / (1024 ** 2)
                results['save_time'] = float(timer)
                
                with MemoryTracker() as tracker:
                    with Timer() as timer:
                        _ = load(filename)
                results['load_ram_delta_mb'] = tracker.memory / (1024 ** 2)
                results['load_time'] = float(timer)
                
                benchmark.append(results)
                os.remove(filename)
                
            run = pd.DataFrame(benchmark)
            run['run_no'] = i
            runs.append(run)
            
    benchmark = pd.concat(runs, axis=0)
    benchmark.reset_index(inplace=True, drop=True)
    return benchmark



Try some binary formats:
- Feather
- Parquet
- msgpack
- hdf5
- pickle

Some existing benchmarks:
- https://towardsdatascience.com/the-best-format-to-save-pandas-data-414dca023e0d

In [18]:
formats = [
    #('csv',), # this simply takes way too much memory
    ('csv.gz',),
    #('hdf', {'key': 'data', 'format': 'table'}),
    ('pickle',),
    #('msgpack',),
    ('feather',),
    ('parquet',)
]
results = benchmark(oh_diagnoses_df,list_of_formats=formats,n_rounds=2)
results.drop_duplicates(inplace=True)

Benchmarking round #1
	testing format: csv.gz
	testing format: pickle
	testing format: feather
	testing format: parquet
Benchmarking round #2
	testing format: csv.gz
	testing format: pickle
	testing format: feather
	testing format: parquet


In [19]:
results

Unnamed: 0,format,size_mb,save_ram_delta_mb,save_time,load_ram_delta_mb,load_time,run_no
0,csv.gz,4.91,0.042969,148.332947,1942.574219,305.204614,0
2,pickle,525.62,525.292969,25.216944,522.023438,0.268633,0
5,feather,12.74,95.980469,0.640301,829.636719,5.407113,0
9,parquet,8.89,-125.617188,6.190886,1161.753906,1.181758,0
10,csv.gz,4.91,-158.871094,144.41955,1080.207031,293.95421,1
12,pickle,525.62,505.742188,16.025387,489.945312,0.362092,1
15,feather,12.74,94.089844,0.512138,804.367188,11.903225,1
19,parquet,8.89,-108.304688,6.180116,1176.902344,2.21296,1
