In [6]:
import duckdb
import os
import tqdm
import pandas as pd

# Store MIMIC III data as Parquet files

In [7]:
#per chatgpt, used for progress bar estimations below
mimic_table_row_counts = { 
    'ADMISSIONS': 58976,
    'CALLOUT': 34499,
    'CAREGIVERS': 7567,
    'CHARTEVENTS': 330712483,
    'CPTEVENTS': 573146,
    'D_CPT': 134,
    'D_ICD_DIAGNOSES': 14710,
    'D_ICD_PROCEDURES': 3898,
    'D_ITEMS': 12487,
    'D_LABITEMS': 753,
    'DATETIMEEVENTS': 4485937,
    'DIAGNOSES_ICD': 651047,
    'DRGCODES': 125557,
    'ICUSTAYS': 61532,
    'INPUTEVENTS_CV': 17527935,
    'INPUTEVENTS_MV': 3618991,
    'LABEVENTS': 27854055,
    'MICROBIOLOGYEVENTS': 631726,
    'NOTEEVENTS': 2083180,
    'OUTPUTEVENTS': 4349218,
    'PATIENTS': 46520,
    'PRESCRIPTIONS': 4157756,
    'PROCEDUREEVENTS_MV': 258066,
    'PROCEDURES_ICD': 240095,
    'SERVICES': 733241,
    'TRANSFERS': 261897
}


In [8]:
# build column list for preprocessing into parquet file
datetime_columns_translation = [
    'ADMITTIME',  # ADMISSIONS
    'DISCHTIME',  # ADMISSIONS
    'DEATHTIME',  # ADMISSIONS
    'EDREGTIME',  # ADMISSIONS
    'EDOUTTIME',  # ADMISSIONS
    'CHARTTIME',  # CHARTEVENTS, DATETIMEEVENTS, LABEVENTS, OUTPUTEVENTS, NOTEEVENTS, MICROBIOLOGYEVENTS
    'STORETIME',  # CHARTEVENTS, DATETIMEEVENTS, OUTPUTEVENTS
    'STARTTIME',  # INPUTEVENTS_MV, PROCEDUREEVENTS_MV
    'ENDTIME',  # INPUTEVENTS_MV, PROCEDUREEVENTS_MV
    'INTIME',  # ICUSTAYS
    'OUTTIME',  # ICUSTAYS
    'DOB',  # PATIENTS 
    'DOD',  # PATIENTS 
    'DOD_HOSP',  # PATIENTS 
    'DOD_SSN',  # PATIENTS 
    'CHARTDATE',  # NOTEEVENTS, MICROBIOLOGYEVENTS
    'STARTDATE',  # PRESCRIPTIONS
    'ENDDATE',  # PRESCRIPTIONS
]

numeric_columns_translation = [
    'VALUENUM',  # Common in multiple tables
    'VALUE',  # CHARTEVENTS, LABEVENTS, DATETIMEEVENTS, etc.
    'AMOUNT',  # INPUTEVENTS_CV, INPUTEVENTS_MV
    'RATE',  # INPUTEVENTS_CV, INPUTEVENTS_MV
    'ORIGINALAMOUNT',  # INPUTEVENTS_CV, INPUTEVENTS_MV
    'ORIGINALRATE',  # INPUTEVENTS_CV, INPUTEVENTS_MV
    'DILUTION_TEXT',  # MICROBIOLOGYEVENTS
    'DILUTION_COMMENTS',  # MICROBIOLOGYEVENTS
    'DOSE_VAL_RX',  # PRESCRIPTIONS
    'FORM_VAL_DISP',  # PRESCRIPTIONS
    'CPT_CD' 
]


In [10]:
# process the extracted files and turn them into parquet files for dask or duckDB
location = "./mimicIII/mimic-iii-clinical-database-1.4"
files_to_process = os.listdir("./mimicIII/mimic-iii-clinical-database-1.4")
#files_to_process = ["/".join([location,file]) for file in files_to_process]

for i, src_file in enumerate(files_to_process[25:]):
    print(f"Processing {src_file} / {1+i} of {len(files_to_process)}")
    if not src_file.endswith(".csv.gz"):
        print(f"\t Skipping since not proper type")
        continue
    if src_file.startswith("CHARTEVENTS"):
        print(f"\t Already loaded CHARTEVENTS")
        continue
    
    base_fn = src_file.replace(".csv.gz", "")

    #print(f"{base_fn} = {mimic_table_row_counts.get(base_fn)}")

    chunksize = 10e6
    reader = pd.read_csv("/".join([location,src_file]), chunksize=chunksize, compression='gzip')  

    
    for i, chunk in enumerate(tqdm.tqdm(reader, total=1+mimic_table_row_counts.get(base_fn)//chunksize, desc="Processing Chunks")):
        # convert numerical values
        num_columns = list(set(numeric_columns_translation).intersection(chunk.columns))
        dt_columns = list(set(datetime_columns_translation).intersection(chunk.columns))
        for c, col in enumerate(num_columns): 
            #print(f"\t\tChanging numeric in {c+1} of {len(num_columns)}")               
            chunk[col] = pd.to_numeric(chunk[col], errors="coerce")
        
        for c, col in enumerate(dt_columns): 
            #print(f"\t\tChanging dt in {c+1} of {len(dt_columns)}")               
            chunk[col] = pd.to_datetime(chunk[col], errors="coerce")
        
        if "GSN" in chunk.columns:
            chunk["GSN"] = chunk["GSN"].fillna("").astype(str)

        # Save each chunk as a separate Parquet file
        chunk.to_parquet(f"parquet/{base_fn}_{i}.parquet", engine="pyarrow", index=False)

Processing PRESCRIPTIONS.csv.gz / 1 of 32


  for obj in iterable:
Processing Chunks: 100%|██████████| 1/1.0 [00:13<00:00, 13.09s/it]


Processing PROCEDUREEVENTS_MV.csv.gz / 2 of 32


Processing Chunks: 100%|██████████| 1/1.0 [00:01<00:00,  1.08s/it]


Processing PROCEDURES_ICD.csv.gz / 3 of 32


Processing Chunks: 100%|██████████| 1/1.0 [00:00<00:00,  8.93it/s]


Processing README.md / 4 of 32
	 Skipping since not proper type
Processing SERVICES.csv.gz / 5 of 32


Processing Chunks: 100%|██████████| 1/1.0 [00:00<00:00, 10.98it/s]


Processing SHA256SUMS.txt / 6 of 32
	 Skipping since not proper type
Processing TRANSFERS.csv.gz / 7 of 32


Processing Chunks: 100%|██████████| 1/1.0 [00:00<00:00,  1.73it/s]


# Set up DuckDB helpers

In [125]:
# column look ups for convenience when wanting to see what columns are available in what tables
mimic_columns = dict()
for tbl in mimic_table_row_counts.keys():
    df = pd.read_parquet(f"{tbl}_0.parquet")
    mimic_columns[tbl] = df.columns

del df
# reformat index objects to lists 
mimic_columns = {col: list(mimic_columns[col]) for col in mimic_columns}
# Setup DuckDB helper functions

In [15]:
# change working directory to where parquet files are
os.chdir("./parquet")

In [144]:
def table_check():
    def decorator(fnc):
        def wrapper(*args, **kwargs):
            for arg in args:
                if arg not in mimic_columns.keys():
                    raise ValueError(f"'{arg}' is not in the list of MIMIC III tables")
            return fnc(*args, **kwargs)
        return wrapper
    return decorator

In [112]:
# decorator to change anything I am passing a query to be updated with the parquet files
def duckify():
    def decorator(fnc):
        def wrapper(*args, **kwargs):
            #print(*args)
            new_args = []
            for arg in args:
                if type(arg)==str:
                # print(arg)
                    found=False
                    for tbl in mimic_columns.keys():
                        if arg.find(tbl)>-1:
                            #print(f"before {arg=}")
                            arg = arg.replace(tbl, f"'{tbl}*.parquet'")
                            #print(f"after {arg=}")
                            found=True
                    if not found:
                        raise ValueError("Found no table name in the query string")
                new_args.append(arg)
                #print(args)
            return fnc(*new_args, **kwargs)
        return wrapper
    return decorator


In [113]:
@duckify()
def run_query(qry, print_qry=False):
    """
    Run query and return dataframe, or simply print resulting query
    """
    if print_qry:
        print(qry)
        return None
    else:
        return duckdb.query(qry).to_df()

# Assignment

## 1.  Who are the most recorded patients in the database? 
Find the patients that have the most records across all databases but also the highest average rank

In [168]:
@table_check()
def recon_query(tbl):
    qry = f""" 
    select 
    '{tbl.capitalize()}' as Table,
    subject_id,
    count(*) Count_,
    row_number() over (partition by null order by Count_ desc) Record_Rank
    from {tbl}
    group by 2
    """
    return qry

In [169]:
run_query(recon_query("ADMISSIONS")).head(20)

Unnamed: 0,Table,SUBJECT_ID,Count_,Record_Rank
0,Admissions,13033,42,1
1,Admissions,109,34,2
2,Admissions,11861,34,3
3,Admissions,5060,31,4
4,Admissions,20643,24,5
5,Admissions,19213,23,6
6,Admissions,7809,22,7
7,Admissions,5727,21,8
8,Admissions,23657,20,9
9,Admissions,11318,19,10


In [170]:
# build the recon query for every table in MIMIC iii and get the statistics
qry = "union all".join([recon_query(table) for table, columns in mimic_columns.items() if 'SUBJECT_ID' in columns])
recon_df = run_query(f"select subject_id, avg(record_rank), min(record_rank), max(record_rank), sum(Count_) from ({qry}) src group by 1 order by 2")


Unnamed: 0,SUBJECT_ID,avg(record_rank),min(record_rank),max(record_rank),sum(Count_)
0,73713,343.894737,6,4774,86271.0
1,357,463.5,24,3255,169927.0
2,55337,470.578947,12,2108,131359.0
3,11861,568.2,2,2591,79938.0
4,20643,586.65,5,3210,73335.0


Unnamed: 0,SUBJECT_ID,avg(record_rank),min(record_rank),max(record_rank),sum(Count_)
46515,26654,40915.583333,30756,46207,65.0
46516,16448,40939.0,33804,46303,35.0
46517,26235,40987.727273,33299,46447,30.0
46518,19645,41046.888889,33496,46233,155.0
46519,1490,41656.833333,33367,46517,100.0


## 2.  What are diseases that they have in common

Write intersection query for diseases 

## 3. What medicines do they have in common?

Write intersection query for medicines

## 4.  What types of ICU visits have they had?

Have subject ID for columns and number of ICU stays by ICU type as columns

## 5.  In their stays in ICU, what kind of chart events have they had?

## 6.  Let's compare their vitals

## 7. How many other people use the same diseases? 

## 7. How many other people use the same meds? 

## 8. How many ICU visits do patients have who had the same use of meds and with same diseases?

## 9.  What are the rarest diagnoses and what meds are associated?

## 10.  What are the rarest meds and what diseases are they associated with? 