In [None]:
# Load packages for data analysis
import getpass, re, json, sys
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
import pandas as pd
import pandas_gbq as pgbq
# Load packages for Big Query 
from google.cloud import bigquery
import os
from sqlalchemy import create_engine

In [None]:
# Define configurations for Big Query
project_id = '' # Location of stride datalake
client = bigquery.Client(project=project_id) # Set project to project_id
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = ''
os.environ['GCLOUD_PROJECT'] = "" # specify environment
db = "som-nero-phi-boussard" # Define the database
stanford_ds = ""
yh_ds = ""

In [None]:
def save_table(project_id, yh_ds, new_table_name, query):
    table_id = f"{project_id}.{yh_ds}.{new_table_name}"
    job_config = bigquery.QueryJobConfig(destination=table_id)
    job_config.write_disposition = "WRITE_TRUNCATE"
    # Start the query, passing in the extra configuration.
    query_job = client.query(query, job_config=job_config)  # Make an API request.
    query_job.result()  # Wait for the job to complete.
    print("Query results loaded to the table {}".format(table_id))  
def load_pgbq(project_id, yh_ds, table_name):
    sql_query = f"SELECT * FROM {project_id}.{yh_ds}.{table_name}"
    return_df = pgbq.read_gbq(sql_query, dialect="standard")
    print (f"{project_id}.{yh_ds}.{table_name}", "is loaded") 
    return return_df
def upload_pgbq(project_id, yh_ds, table_name, df):
    table_id = f"{yh_ds}.{table_name}"
    pgbq.to_gbq(df, table_id, project_id=project_id)
    print ("dataframe", df, "is uploaded as", f"{project_id}.{yh_ds}.{table_name}") 
def remove_table(project_id, yh_ds, table_name):
    client = bigquery.Client()
    table_id = f"{project_id}.{yh_ds}.{table_name}"
    client.delete_table(table_id, not_found_ok=True)  # Make an API request.
    print("Deleted table '{}'.".format(table_id))

In [None]:
# #get medications medication id 
# stanford_ds = "stanfordmed_datalake"
# yh_ds = "YH_dementia"
# shc_mapping = "shc_medication_rxnorm_map"
# yh_med_map = "medication_rxnorm"
# sql_query = f"""
# SELECT 
#     map.med_id, 
#     map.medication_name,
#     map.str,
#     map.rxcui,
#     med_rxnorm.ingredient, 
#     med_rxnorm.therapeutic_category,
#     med_rxnorm.chemical_type,
#     med_rxnorm.type
# FROM {db}.{stanford_ds}.{shc_mapping} map 
# JOIN {db}.{yh_ds}.{yh_med_map} med_rxnorm ON map.rxcui = med_rxnorm.rxnorm
# """

# table_name = "medication_med_id"
# save_table(project_id, yh_ds, table_name, sql_query)

In [None]:
#load relevant medication records of dementia patients 
stanford_ds = "stanfordmed_datalake"
yh_ds = "YH_dementia"
med_med_id = "medication_med_id" 
dem_patients = "dementia_pat_exposure_group"
med_records = "shc_medication"
sql_query = f"""
SELECT 
    dm_pat.*,
    DATE_DIFF(DATE(med_med.ordering_date), DATE(dm_pat.diagnosis_date), DAY) AS med_from_diagnosis,
    med_med.order_deid,
    med_med.pat_enc_csn_deid,
    med_med.med_id,
    med_med.ingredient, 
    med_med.therapeutic_category,
    med_med.chemical_type,
    med_med.type AS med_category,
    med_med.ordering_date, 
    med_med.med_route, 
    med_med.ordering_mode, 
    med_med.order_class,
    med_med.is_administered
FROM {db}.{yh_ds}.{dem_patients} dm_pat 
LEFT JOIN (
    SELECT 
        med.pat_deid,
        med_id.med_id, 
        med_id.ingredient,
        med_id.therapeutic_category,
        med_id.chemical_type, 
        med_id.type,
        med.order_deid, 
        med.pat_enc_csn_deid, 
        med.ordering_date, 
        med.med_route, 
        med.ordering_mode, 
        med.order_class, 
        med.is_administered
    FROM {db}.{yh_ds}.{med_med_id} med_id
    INNER JOIN {db}.{stanford_ds}.{med_records} med ON med_id.med_id = med.medication_id
) med_med 
ON dm_pat.pat_deid = med_med.pat_deid
WHERE med_med.ordering_date >= DATE_SUB(DATE(dm_pat.diagnosis_date), INTERVAL 1 YEAR)
"""


new_table_name = "dementia_med_med_07312024"
save_table(project_id, yh_ds, new_table_name, sql_query )

In [None]:
stanford_ds = "stanfordmed_datalake"
yh_ds = "YH_dementia"
sql_table = "dementia_med_med_07312024"
sql_query = f"""
SELECT *,
    CASE 
        WHEN med_from_diagnosis >= -365 AND med_from_diagnosis < 0 THEN 1 
        ELSE 0 
    END AS exposure_within_1_year, 
    CASE 
        WHEN med_from_diagnosis >= 0 THEN 1 
        ELSE 0 
    END AS exposure_after,      
    CASE 
        WHEN med_from_diagnosis >= -365 AND DATETIME(ordering_date) < DATETIME(post_onset_exposure_start_time) THEN 1 
        ELSE 0 
    END AS exposure_within_1_year_before_first_op,    
    CASE 
        WHEN DATETIME(ordering_date) >= DATETIME(post_onset_exposure_start_time) THEN 1 
        ELSE 0 
    END AS exposure_after_first_op 
FROM {db}.{yh_ds}.{sql_table};
"""
med_med = pgbq.read_gbq(sql_query, dialect="standard")

In [None]:
sql_table = "dementia_pat_coverage_07312024"
sql_query = f"SELECT * FROM {db}.{yh_ds}.{sql_table}"
dementia_cohort = pgbq.read_gbq(sql_query, dialect="standard")

In [None]:
# Columns used to identify each row uniquely
id_columns = ['pat_deid', 'ingredient', 'therapeutic_category']

# Columns to keep without any aggregation
keep_columns = ['sex', 'ethnic_group', 'death_date', 'deceased', 'race',
                'diagnosis_date', 'death_from_diagnosis', 'AD', 'FTD', 'VD', 'LBD', 'other_D']

# Columns to aggregate using the maximum value
max_columns = ['exposure_within_1_year', 'exposure_after', 
               'exposure_within_1_year_before_first_op', 'exposure_after_first_op']

# Create an aggregation dictionary where each column in max_columns is aggregated with 'max'
agg_dict = {}
for x in max_columns:
    agg_dict[x] = 'max'

# Display the resulting aggregation dictionary
agg_dict

In [None]:
# Group the 'med_med' DataFrame by 'pat_deid', 'ingredient', and 'therapeutic_category'
# Do not use the grouped columns as the index (as_index=False)
# Apply the aggregation rules defined in 'agg_dict' to the grouped data
med_med_agg = med_med.groupby(['pat_deid', 'ingredient', 'therapeutic_category'], as_index=False).agg(agg_dict)

In [None]:
# Create a new DataFrame 'keep_df' by selecting the 'pat_deid' column 
# along with the columns specified in 'keep_columns' from 'dementia_cohort'.
# Remove duplicate rows to ensure unique entries.
keep_df = dementia_cohort[['pat_deid'] + keep_columns].drop_duplicates()

In [None]:
# Create a pivot table from 'med_med_agg':
# - Rows are indexed by 'pat_deid'
# - Columns are 'therapeutic_category', combined with the specified values
# - Values are aggregated using the 'max' function
pivot_table_df = med_med_agg.pivot_table(
    index='pat_deid', 
    columns='therapeutic_category', 
    values=['exposure_within_1_year', 'exposure_after', 'exposure_within_1_year_before_first_op', 'exposure_after_first_op'], 
    aggfunc='max'
)

# Replace any missing values in the pivot table with 0
pivot_table_df = pivot_table_df.fillna(0)

# Flatten the MultiIndex column names by joining with '_'
pivot_table_df.columns = ['_'.join(col).strip() for col in pivot_table_df.columns.values]

# Reset the index to turn 'pat_deid' back into a regular column
pivot_table_df.reset_index(inplace=True)



In [None]:
pivot_table_df.rename(columns=lambda x: re.sub(r'[^a-zA-Z0-9_]', '',  # Remove special characters
                                re.sub('_+', '_',   # Convert multiple underscores to a single one
                                re.sub(r'(?<!^)(?=[A-Z])', '_',  # Handle camelCase or Title Case
                                x.replace('-', '_')  # Replace dashes with underscores
                                .replace(' ', '_'))  # Replace spaces with underscores
                                )).strip().lower()   # Trim spaces and convert to lowercase
                                if not x[0].isdigit() else 'col_' + x, inplace=True)

In [None]:
merged_df = keep_df.merge(pivot_table_df, 'left', on = 'pat_deid')

In [None]:
remove_table(project_id, yh_ds, "dementia_medication_categories_07312024")

In [None]:
upload_pgbq(project_id, yh_ds, "dementia_medication_categories_07312024", merged_df)

In [None]:
medication_table_name = 'dementia_medication_categories_07312024'
medication_table_id = f"{db}.{yh_ds}.{medication_table_name}"
medication_table = client.get_table(medication_table_id)

comorbidity_table_name = 'dementia_comorbidity_categories_aggregated_07312024'
comorbidity_table_id = f"{db}.{yh_ds}.{comorbidity_table_name}"
comorbidity_table = client.get_table(comorbidity_table_id)

comorbidity_before_exposure_table_name = 'dementia_comorbidity_before_op_categories_aggregated_07312024'
comorbidity_before_exposure_table_id = f"{db}.{yh_ds}.{comorbidity_before_exposure_table_name}"
comorbidity_before_exposure_table = client.get_table(comorbidity_before_exposure_table_id)

# Get the list of column names
comorbidity_column_names = [schema_field.name for schema_field in comorbidity_table.schema]
comorbidity_before_exposure_table_column_names = [schema_field.name for schema_field in comorbidity_before_exposure_table.schema]
medication_column_names = [schema_field.name for schema_field in medication_table.schema]

# Print the column names
print(comorbidity_column_names)

In [None]:
print(comorbidity_before_exposure_table_column_names)

In [None]:
print(medication_column_names)

In [None]:
comorbidity_columns_to_add = ['myocardial_infarction', 'congestive_heart_failure', 'peripheral_vascular_disease', 'cerebrovascular_disease_', 'chronic_pulmonary_disease', 'rheumatologic_disease', 'peptic_ulcer', 'hemiplegia_paraplegia', 'diabetes_without_complications', 'diabetes_with_chronic_complications_', 'mild_liver_diseases', 'moderate_severe_liver_disease', 'renal_disease', 'any_malignancy__tumor__leukemia__lymphoma_', 'metastatic_solid_tumor', 'HIV_AIDS', 'cognitive_decline', 'hypertension', 'hyperlipidemia', 'excessive_alcohol', 'atherosclerosis', 'hypercholesterolemia', 'atrial_fibrillation', 'traumatic_brain_injury', 'hearing_loss', 'sleep_apnea', 'prediabetes', 'delirium', 'depression', 'schizophrenia_non_mood_psychotic_disorder', 'mood_disorder', 'anxiety_and_non_psychotic_mental_disorder', 'behavioral_syndromes_associated_with_physiological_disturbances_and_physical_factors', 'disorders_of_adult_personality_and_behavior', 'Intellecutual_disability', 'developmental_disorder', 'behavioral_and_emotional_isorder_with_onset_in_early_ages']
medication_columns_to_add = [i for i in medication_column_names if i.startswith('exposure')]
comorbidity_before_exposure_columns_to_add = ['before_exposure_myocardial_infarction', 'before_exposure_congestive_heart_failure', 'before_exposure_peripheral_vascular_disease', 'before_exposure_cerebrovascular_disease_', 'before_exposure_chronic_pulmonary_disease', 'before_exposure_rheumatologic_disease', 'before_exposure_peptic_ulcer', 'before_exposure_hemiplegia_paraplegia', 'before_exposure_diabetes_without_complications', 'before_exposure_diabetes_with_chronic_complications_', 'before_exposure_mild_liver_diseases', 'before_exposure_moderate_severe_liver_disease', 'before_exposure_renal_disease', 'before_exposure_any_malignancy__tumor__leukemia__lymphoma_', 'before_exposure_metastatic_solid_tumor', 'before_exposure_HIV_AIDS', 'before_exposure_cognitive_decline', 'before_exposure_hypertension', 'before_exposure_hyperlipidemia', 'before_exposure_excessive_alcohol', 'before_exposure_atherosclerosis', 'before_exposure_hypercholesterolemia', 'before_exposure_atrial_fibrillation', 'before_exposure_traumatic_brain_injury', 'before_exposure_hearing_loss', 'before_exposure_sleep_apnea', 'before_exposure_prediabetes', 'before_exposure_delirium', 'before_exposure_depression', 'before_exposure_schizophrenia_non_mood_psychotic_disorder', 'before_exposure_mood_disorder', 'before_exposure_anxiety_and_non_psychotic_mental_disorder', 'before_exposure_behavioral_syndromes_associated_with_physiological_disturbances_and_physical_factors', 'before_exposure_disorders_of_adult_personality_and_behavior', 'before_exposure_Intellecutual_disability', 'before_exposure_developmental_disorder', 'before_exposure_behavioral_and_emotional_isorder_with_onset_in_early_ages']

In [None]:
pat_df = 'dementia_pat_coverage_07312024'
cbd_df = comorbidity_table_name
cbd_b4_op_df = comorbidity_before_exposure_table_name
med_df = medication_table_name
sql_query = f"""
SELECT
    pat.*,"""
for c_index in range(len(comorbidity_columns_to_add)):
    c = comorbidity_columns_to_add[c_index]
    sql_query += f"""\n    COALESCE(cbd.{c},0) AS {c},"""
for c_index in range(len(comorbidity_before_exposure_columns_to_add)):
    c_b4_op = comorbidity_before_exposure_columns_to_add[c_index]
    sql_query += f"""\n    COALESCE(cbd_b4_op.{c_b4_op},0) AS {c_b4_op},"""
for m_index in range(len(medication_columns_to_add)):
    m = medication_columns_to_add[m_index]
    if m_index < len(medication_columns_to_add)-1:
        sql_query += f"""\n    COALESCE(med.{m},0) AS {m},"""
    elif m_index == len(medication_columns_to_add)-1:
        sql_query += f"""\n    COALESCE(med.{m},0) AS {m}"""
sql_query += f"""
FROM 
  `{project_id}.{yh_ds}.{pat_df}` AS pat
LEFT JOIN 
  `{project_id}.{yh_ds}.{cbd_df}` AS cbd
ON 
  pat.pat_deid = cbd.pat_deid
LEFT JOIN 
  `{project_id}.{yh_ds}.{cbd_b4_op_df}` AS cbd_b4_op
ON 
  pat.pat_deid = cbd_b4_op.pat_deid
LEFT JOIN 
  `{project_id}.{yh_ds}.{med_df}` AS med
ON 
  pat.pat_deid = med.pat_deid"""
print (sql_query)

In [None]:
save_table(project_id, yh_ds, "dementia_pat_complete_cohort_07312024", sql_query)