In [None]:
import dask.dataframe as dd
import multiprocessing as mp
import psycopg2
from psycopg2 import sql
import time
# pip list --format=freeze > requirements.txt

### <b> Topics </b>
* #### PostGreSQL vs BigQuery vs Dask : Most efficient?
* #### Dash - interactive app framework

In [85]:
def mimic_user_creation(host_user: str, db_name: str) -> dict[str, str]:
    '''
    Connects to PostGreSQL default database through super user.
    Then, creates a user with CREATEDB priviliges, which will be used to create and store the MIMICIII database.

    Args:
        -host_user: string (super user created at the time of installation)
        -db_name: string (postgres by deafult)

    Output:
        -No output
    '''
    # Admin connection (connect as the default PostgreSQL superuser)
    conn = psycopg2.connect(dbname="postgres", user="rodrigocastro")  # No password needed if peer authentication is enabled
    conn.autocommit = True
    cur = conn.cursor()

    # Fetch the current user
    cur.execute("SELECT current_user;")
    current_user = cur.fetchone()[0]

    print(f"Connected to PostgreSQL successfully as user: {current_user}\n")

    # New user credentials and new database name
    print(f"[Creating new user for ICU LoS database]\n")
    new_db_user = input("Enter new username:")

    # Query to check if new user exists
    cur.execute("SELECT 1 FROM pg_roles WHERE rolname = %s;\n", (new_db_user,))
    user_exists = cur.fetchone() is not None
    
    if user_exists:
        print(f"User '{new_db_user}' has already been created. Try a new username.\n")
        new_db_user = input("Enter new username:")
        new_db_password = input("Enter password for new user:")

        cur.execute(
        sql.SQL("CREATE ROLE {} WITH LOGIN PASSWORD %s CREATEDB;").format(sql.Identifier(new_db_user)),
        [new_db_password])

        print(f"User '{new_db_user}' created successfuly\n")
    
    else:
        new_db_password = input("Enter password for new user:")
        print(f"User '{new_db_user}' will be created.\n")
    
        cur.execute(
        sql.SQL("CREATE ROLE {} WITH LOGIN PASSWORD %s CREATEDB;").format(sql.Identifier(new_db_user)),
        [new_db_password])

        print(f"User '{new_db_user}' created successfuly\n")


    print(f"Closing connection to database")
    cur.close()
    conn.close()

    return {"new_user":new_db_user,"new_password":new_db_password}

In [79]:
'''
# Mimic User and Mimic Database
new_db_name = "mimic_db"
new_db_user = "mimic_user"
new_db_password = "secure_password"
new_db_host = "localhost"
db_port = "5432" # default port for PostGreSQL'
'''

'\n# Mimic User and Mimic Database\nnew_db_name = "mimic_db"\nnew_db_user = "mimic_user"\nnew_db_password = "secure_password"\nnew_db_host = "localhost"\ndb_port = "5432" # default port for PostGreSQL\'\n'

In [86]:
user_credentials = mimic_user_creation("rodrigocastro", "postgres")

Connected to PostgreSQL successfully as user: rodrigocastro

[Creating new user for ICU LoS database]

User 'mimic_user' will be created.

User 'mimic_user' created successfuly

Closing connection to database


In [87]:
new_user = user_credentials['new_user']
new_password = user_credentials['new_password']
new_user, new_password

('mimic_user', 'lengthofstay')

In [None]:
def create_mimic_iii_db(new_user:str, new_password:str) -> None:
    
    # MIMIC III database
    mimic_db = "mimic_iii"

    conn = psycopg2.connect(dbname="postgres", user=new_user, password=new_password, host="localhost", port="5432")
    conn.autocommit = True  # Database cannot be inside a transaction
    cur = conn.cursor()


    cur.execute(f"CREATE DATABASE {mimic_db};")
    print(f"Database '{mimic_db}' created successfully by '{new_user}'.")

    # Cleanup
    cur.close()
    conn.close()

In [None]:
#psql -U $(whoami) -d postgres -c "SHOW data_directory;" 
# Command to check postgresql installation directory

#pg_hba.conf File to configure postgres authentication, useful for connecting to database


In [4]:
chart_events = dd.read_csv('../data/raw/CHARTEVENTS.csv', 
                           dtype={'CGID': 'float64','ICUSTAY_ID': 'float64','ERROR': 'float64',
                                'RESULTSTATUS': 'object', 'STOPPED': 'object', 'VALUE': 'object', 
                                'WARNING': 'float64'}) #type specification is required due to dask's dtype inference failing

In [5]:
chart_events.head()

Unnamed: 0,ROW_ID,SUBJECT_ID,HADM_ID,ICUSTAY_ID,ITEMID,CHARTTIME,STORETIME,CGID,VALUE,VALUENUM,VALUEUOM,WARNING,ERROR,RESULTSTATUS,STOPPED
0,788,36,165660,241249.0,223834,2134-05-12 12:00:00,2134-05-12 13:56:00,17525.0,15.0,15.0,L/min,0.0,0.0,,
1,789,36,165660,241249.0,223835,2134-05-12 12:00:00,2134-05-12 13:56:00,17525.0,100.0,100.0,,0.0,0.0,,
2,790,36,165660,241249.0,224328,2134-05-12 12:00:00,2134-05-12 12:18:00,20823.0,0.37,0.37,,0.0,0.0,,
3,791,36,165660,241249.0,224329,2134-05-12 12:00:00,2134-05-12 12:19:00,20823.0,6.0,6.0,min,0.0,0.0,,
4,792,36,165660,241249.0,224330,2134-05-12 12:00:00,2134-05-12 12:19:00,20823.0,2.5,2.5,,0.0,0.0,,


In [6]:
chart_events.dtypes

ROW_ID                    int64
SUBJECT_ID                int64
HADM_ID                   int64
ICUSTAY_ID              float64
ITEMID                    int64
CHARTTIME       string[pyarrow]
STORETIME       string[pyarrow]
CGID                    float64
VALUE           string[pyarrow]
VALUENUM                float64
VALUEUOM        string[pyarrow]
ERROR                   float64
RESULTSTATUS    string[pyarrow]
STOPPED         string[pyarrow]
dtype: object

In [7]:
chart_events.columns

Index(['ROW_ID', 'SUBJECT_ID', 'HADM_ID', 'ICUSTAY_ID', 'ITEMID', 'CHARTTIME',
       'ERROR', 'RESULTSTATUS', 'STOPPED'],
      dtype='object')

In [8]:
num_entries = chart_events['ROW_ID'].count().compute()

  df = reader(bio, **kwargs)


In [None]:
num_entries

330712483