In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import pyarrow as pa
import pyarrow.parquet as pq

In [74]:
#the directory path and filenames
filepath = (r"D:\iti\big_data\Data_Needed")
filenames = os.listdir(r"D:\iti\big_data\Data_Needed")

In [75]:
#Combining the directory path with each filename to create the full file paths
files = [os.path.join(filepath,filename) for filename in filenames]
files

['D:\\iti\\big_data\\Data_Needed\\ADMISSIONS.csv',
 'D:\\iti\\big_data\\Data_Needed\\DIAGNOSES_ICD.csv',
 'D:\\iti\\big_data\\Data_Needed\\D_ICD_DIAGNOSES.csv',
 'D:\\iti\\big_data\\Data_Needed\\ICUSTAYS.csv',
 'D:\\iti\\big_data\\Data_Needed\\PATIENTS.csv']

In [76]:
# Removing rows with null values and dropping duplicate for each csv file
dfs = []
for file in files:
    df = pd.read_csv(file, header = 0)
    df.dropna(how = 'all',inplace = True)
    df.drop_duplicates(inplace = True)
    df.reset_index(inplace = True)
    dfs.append(df)
admissions, diagnoses_icd, d_icd_diagnoses, icustays, patients = dfs

In [77]:
#this step just to simply the process when quering in Hive for the diagnoses:

diagnosis_full = diagnoses_icd.merge(d_icd_diagnoses, on='icd9_code', how='left')

In [78]:
diagnosis_full.head()

Unnamed: 0,index_x,row_id_x,subject_id,hadm_id,seq_num,icd9_code,index_y,row_id_y,short_title,long_title
0,0,112344,10006,142345,1,99591,11402.0,11403.0,Sepsis,Sepsis
1,1,112345,10006,142345,2,99662,11437.0,11438.0,React-oth vasc dev/graft,Infection and inflammatory reaction due to oth...
2,2,112346,10006,142345,3,5672,,,,
3,3,112347,10006,142345,4,40391,4315.0,4316.0,Hyp kid NOS w cr kid V,"Hypertensive chronic kidney disease, unspecifi..."
4,4,112348,10006,142345,5,42731,4461.0,4462.0,Atrial fibrillation,Atrial fibrillation


In [79]:
#dropping unnessecary columns

diagnosis_full.drop(columns=['row_id_y', 'index_y'], errors='ignore', inplace=True)

### Checking each dataset to see if it needs any cleaning

### admissions Cleaning

In [80]:
admissions.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 129 entries, 0 to 128
Data columns (total 20 columns):
 #   Column                Non-Null Count  Dtype 
---  ------                --------------  ----- 
 0   index                 129 non-null    int64 
 1   row_id                129 non-null    int64 
 2   subject_id            129 non-null    int64 
 3   hadm_id               129 non-null    int64 
 4   admittime             129 non-null    object
 5   dischtime             129 non-null    object
 6   deathtime             40 non-null     object
 7   admission_type        129 non-null    object
 8   admission_location    129 non-null    object
 9   discharge_location    129 non-null    object
 10  insurance             129 non-null    object
 11  language              81 non-null     object
 12  religion              128 non-null    object
 13  marital_status        113 non-null    object
 14  ethnicity             129 non-null    object
 15  edregtime             92 non-null     ob

In [81]:
##### # I found some columns that should be converted to datetime

In [82]:
admissions['deathtime'] = pd.to_datetime(admissions['deathtime'])
admissions['admittime'] = pd.to_datetime(admissions['admittime'])
admissions['dischtime'] = pd.to_datetime(admissions['dischtime'])
admissions['edregtime'] = pd.to_datetime(admissions['edregtime'])
admissions['edouttime'] = pd.to_datetime(admissions['edouttime'])

In [83]:
admissions.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 129 entries, 0 to 128
Data columns (total 20 columns):
 #   Column                Non-Null Count  Dtype         
---  ------                --------------  -----         
 0   index                 129 non-null    int64         
 1   row_id                129 non-null    int64         
 2   subject_id            129 non-null    int64         
 3   hadm_id               129 non-null    int64         
 4   admittime             129 non-null    datetime64[ns]
 5   dischtime             129 non-null    datetime64[ns]
 6   deathtime             40 non-null     datetime64[ns]
 7   admission_type        129 non-null    object        
 8   admission_location    129 non-null    object        
 9   discharge_location    129 non-null    object        
 10  insurance             129 non-null    object        
 11  language              81 non-null     object        
 12  religion              128 non-null    object        
 13  marital_status      

In [84]:
# Merged 'HISPANIC/LATINO - PUERTO RICAN' and 'HISPANIC/LATINO' into 'HISPANIC/LATINO' in the 'ethnicity' column

admissions['ethnicity'] = admissions['ethnicity'].replace(['HISPANIC/LATINO - PUERTO RICAN','HISPANIC/LATINO'],'HISPANIC OR LATINO')
admissions['ethnicity'] = admissions['ethnicity'].replace('UNABLE TO OBTAIN','UNKNOWN/NOT SPECIFIED')

# Did the same in the 'religion' column for similar subgroups

admissions['religion'] = admissions['religion'].replace('UNOBTAINABLE','NOT SPECIFIED')

### in diagnosis_full there was nothing to be cleaned

In [85]:
diagnosis_full.rename(columns = {'row_id_x' : 'row_id','index_x' : 'index'}, inplace =  True)

### Cleaning icustays Data

In [86]:
icustays.head()

Unnamed: 0,index,row_id,subject_id,hadm_id,icustay_id,dbsource,first_careunit,last_careunit,first_wardid,last_wardid,intime,outtime,los
0,0,12742,10006,142345,206504,carevue,MICU,MICU,52,52,2164-10-23 21:10:15,2164-10-25 12:21:07,1.6325
1,1,12747,10011,105331,232110,carevue,MICU,MICU,15,15,2126-08-14 22:34:00,2126-08-28 18:59:00,13.8507
2,2,12749,10013,165520,264446,carevue,MICU,MICU,15,15,2125-10-04 23:38:00,2125-10-07 15:13:52,2.6499
3,3,12754,10017,199207,204881,carevue,CCU,CCU,7,7,2149-05-29 18:52:29,2149-05-31 22:19:17,2.1436
4,4,12755,10019,177759,228977,carevue,MICU,MICU,15,15,2163-05-14 20:43:56,2163-05-16 03:47:04,1.2938


In [87]:
icustays['intime'] = pd.to_datetime(icustays['intime'])
icustays['outtime'] = pd.to_datetime(icustays['outtime'])

### Cleaning patients Data

In [88]:
patients.head()

Unnamed: 0,index,row_id,subject_id,gender,dob,dod,dod_hosp,dod_ssn,expire_flag
0,0,9467,10006,F,2094-03-05 00:00:00,2165-08-12 00:00:00,2165-08-12 00:00:00,2165-08-12 00:00:00,1
1,1,9472,10011,F,2090-06-05 00:00:00,2126-08-28 00:00:00,2126-08-28 00:00:00,,1
2,2,9474,10013,F,2038-09-03 00:00:00,2125-10-07 00:00:00,2125-10-07 00:00:00,2125-10-07 00:00:00,1
3,3,9478,10017,F,2075-09-21 00:00:00,2152-09-12 00:00:00,,2152-09-12 00:00:00,1
4,4,9479,10019,M,2114-06-20 00:00:00,2163-05-15 00:00:00,2163-05-15 00:00:00,2163-05-15 00:00:00,1


In [89]:
patients['dob'] = pd.to_datetime(patients['dob'])
patients['dod'] = pd.to_datetime(patients['dod'])
patients['dod_hosp'] = pd.to_datetime(patients['dod_hosp'])
patients['dod_ssn'] = pd.to_datetime(patients['dod_ssn'])

### Optimizing the Dataypes so it won't throw an error in Hive later on:

In [90]:
dfs = [admissions , diagnosis_full , icustays , patients]

In [91]:
#Converty int64 to int32 so it takes less space in Hive:
for df in dfs:
    for col in df.columns:
        if pd.api.types.is_integer_dtype(df[col]):
            if df[col].max() < 2_147_483_648 and df[col].min() > -2_147_483_649:
                df[col] = df[col].astype("int32")

### Saving files to parquet

In [97]:
admissions.to_parquet(r"D:\iti\big_data\data_cleaned\admissions.parquet", engine='pyarrow', index=False,use_deprecated_int96_timestamps =True)

In [98]:
diagnosis_full.to_parquet(r"D:\iti\big_data\data_cleaned\diagnosis_full.parquet", engine='pyarrow', index=False, use_deprecated_int96_timestamps =True)

In [99]:
icustays.to_parquet(r"D:\iti\big_data\data_cleaned\icustays.parquet", engine='pyarrow', index=False, use_deprecated_int96_timestamps =True)

In [100]:
patients.to_parquet(r"D:\iti\big_data\data_cleaned\patients.parquet", engine='pyarrow', index=False, use_deprecated_int96_timestamps =True)