In [None]:
# if any of the modules above are not already installed please use the command below in your notebook to install the module
# !pip install NameOfYourModule (e.g. !pip install pandas)

import pandas as pd

from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from sqlalchemy import text as sqlalchemy_text

import pyarrow.parquet as pq
import pyarrow as pa

import os


In [None]:
# replace the empty strings below with the correct server/database information
server = ""
database = ""
username = ""
password = ""
port = ''

database_string = f"postgres+psycopg2://{username}:{password}@{server}:{port}/{database}"

database_engine = create_engine(database_string)


In [None]:
# main folder of the query
main_path = ""

# where the raw data will be saved
source_data_path = f"{main_path}/source data"
# create an empty folder if it does not exist
if os.path.exists(source_data_path) != True:
    os.makedirs(source_data_path)

# where the results will be saved
result_path = f"{main_path}/result"
# create an empty folder if it does not exist
if os.path.exists(result_path) != True:
    os.makedirs(result_path)

# where all external data needed for analysis is already saved (e.g. PASC definition spreadsheet)
external_source_path = ""


In [None]:
# a list of site names used in the analysis
# the site names should exactly match the schema names live in the database
site_names = ['mshs', 'wcm', 'nyu', 'montefiore', 'columbia']

# Study period start and end date (YYYY-MM-DD)
study_start_date = '2020-03-01'
study_end_date = '2022-07-30'


In [None]:
def extract_raw_data(query: str, site_names: list, source_data_path: str, data_name: str, database_engine: str):
    '''extract_raw_data is a function to query the live data for all sites in the analysis, concatenate them together, and save them as parquet files.

    Args:
        query (str): SQL query to be executed aginst the live data in database.
        site_names (list): a list of all site names (schemas as they appear in the database) used in the analysis.
        source_data_path (str): the source data folder path where the final data frame will be saved.
        data_name (str): name of the table as it appears in the database.
        database_engine (str): the database engine address.

    Returns:
        final_df (DataFrame): a pandas dataframe containing all sites data
    '''

    final_df = pd.DataFrame()
    counter = 1

    for site in site_names:
        try:
            print(f"Query {counter}. {site}'s {data_name} started")

            # replace where input query indicates "<SCHEMA>" with the site's real schema
            modified_query = query.replace("<SCHEMA>", f"{site}")

            df = pd.read_sql(
                modified_query, database_engine
            )
            # creating an additional column to indicate the site
            df['site'] = site
            print(f"{site}'s {data_name} query is finished")

            # optional lines to generate information about each site's data
            print(f"{site} table shape: {df.shape}")
            print(f"{site} table has: {len(df.syn_pt_id.unique())} unique patients")

            # concatenate individual site data into one data frame (i.e. final_df)
            final_df = pd.concat([final_df, df], ignore_index=True)

            del df
            counter += 1
            print("*"*50)

        # error-agnostic pass which will leave out the individual site
        # make sure to investigate further why the query failed for a specific site
        # best way to investigate the issue is to run the SQL query in PgAdmin
        # possible issues could be data type mismatch for certain columns
        except:
            error_msg = f"# {counter}. {site}'s {data_name} WAS NOT PROCESSED #"
            print("#"*len(error_msg))
            print(error_msg)
            print("#"*len(error_msg))

    # saving the table with all sites data concatenated as parquet format in source data folder
    pq.write_table(pa.Table.from_pandas(
        final_df), f"{source_data_path}/{data_name}.parquet", compression="BROTLI")
    print(f"All sites {data_name} data have been saved as a parquet file in:")
    print(f"{source_data_path}/{data_name}.parquet")

    # optional lines to generate information about all sites data
    print(f"{site} table shape: {final_df.shape}")
    print(f"{site} table has: {len(final_df.syn_pt_id.unique())} unique patients")
    print(f"{site} table has: {len(final_df.site.unique())} unique sites")

    return final_df


In [None]:
# make sure to replace all mentions of the site's schema name in the SQL query below with <SCHEMA>
# extract_raw_data function will automatically replace all <SCHEMA> instaces with the site's name when executing the query
# make sure to always include the first line after SELECT statement to generate a unique synthetic patient identifier (i.e. syn_pt_id)
# the unique synthetic patient identifier (i.e. syn_pt_id) ensures there are no overlapping patid across sites
# make sure to only select columns needed for analysis in the SELECT statement and avoid using "*"
test_query = f"""
SELECT
CONCAT({"'<SCHEMA>'"}, '_', patid) AS syn_pt_id
, t1.* 
FROM <SCHEMA>.demographic t1
LIMIT 100;
"""

In [None]:
demographic = extract_raw_data(
    query=test_query, 
    site_names=site_names, 
    source_data_path=source_data_path, 
    data_name='demographic_test',
    database_engine=database_engine)
