**CREATE EHR TABLES**

**AUTHOR:** Anna Zink 

**DATE:** April 23, 2024

**DESCRIPTION:** Pull EHR data for people with survey (ideally access survey).
  Include some limits (e.g., don't include super historic records). Note that access survey 
  is taken between 2018 and 2023. We are going to use a 2-year lookback window max, so we'll pull data from 2016-2023. Few advantages here: 
  we are pulling in less data, don't want to have access survesy so disconnected (in time) from answers, 
  coding shifts from icd-9 to icd-10. 
  Note, keeping some of these tables pretty sparse (e.g., for medications, not including whether it is common from a prescription or a fill, just if there is a record of it.) 

**EHR tables:**
- condition_occurence{_ext}
- procedure_occurence{_ext}
- drug_exposure{_ext}
- measurement{_ext}
- visit_occurence{_ext}
- person
- (maybe) visit_detail
- (maybe) specimen
- (maybe) observation

**UPDATES**

August 21, 2025 - Add in drug classes

August 18, 2025 - add triglycerides 

January 29, 2025 - add in A1C values and additional concept IDs

Feb 5, 2025 - update for v8 

In [None]:
BILLING_PROJECT_ID <- Sys.getenv('GOOGLE_PROJECT')
CDR <- Sys.getenv('WORKSPACE_CDR')
MY_BUCKET <- Sys.getenv('WORKSPACE_BUCKET')

In [None]:
library(viridis)    # A nice color scheme for plots.
library(ggthemes)   # Common themes to change the look and feel of plots.
library(scales)     # Graphical scales map data to aesthetics in plots.
library(skimr)      # Better summaries of data.
library(lubridate)  # Date library from the tidyverse.
library(bigrquery)  # BigQuery R client.
library(tidyverse)  # Data wrangling packages.
library(data.table) # data.table is good for handling large datasets 

In [None]:
# Replace df with THE NAME OF YOUR DATAFRAME
load_data<-function(file, folder){
    my_bucket <- Sys.getenv('WORKSPACE_BUCKET')
    system(paste0("gsutil cp ", my_bucket, folder, file, " ."), intern=T)
    dsn <- read_csv(file, show_col_types = FALSE)
    return(dsn)
}

# Replace df with THE NAME OF YOUR DATAFRAME
# folder = "/ehr/" 
write_csv<-function(df, fn, folder) {
   my_dataframe <- df
   destination_filename <- fn
   write_excel_csv(my_dataframe, destination_filename)
   my_bucket <- Sys.getenv('WORKSPACE_BUCKET')
   system(paste0("gsutil cp ./", destination_filename, " ", my_bucket, folder), intern=T)
}

# Demographics
Pull from "person" table data on gender, age, race, ethnicity
gendere_concept_id
year_of_birth
month_of_birth
date_of_birth
race_concept_id
ethnicity_concept_id
gender_concept_id
sex_at_birth_concept_id


In [None]:
# pull condition data 
get_demo_sql <- paste("
    SELECT
        a.PERSON_ID,
        c.CONCEPT_NAME AS GENDER,
        a.YEAR_OF_BIRTH,
        a.MONTH_OF_BIRTH,
        a.DAY_OF_BIRTH,
        d.CONCEPT_NAME AS RACE,
        e.CONCEPT_NAME AS ETHNICITY
    FROM
        `person` a
         JOIN (SELECT DISTINCT a.PERSON_ID FROM `cb_search_all_events` a 
                   join `cb_search_person` b on a.PERSON_ID = b.PERSON_ID
                WHERE b.has_ehr_data = 1  
                 AND  (concept_id IN (SELECT distinct concept_id FROM `cb_criteria` 
                 WHERE path LIKE '%3000000694%' and is_standard = 0 AND is_selectable = 1)) 
         ) b on a.person_id = b.person_id 
         LEFT JOIN `concept` c on a.GENDER_CONCEPT_ID = c.CONCEPT_ID
         LEFT JOIN `concept` d on a.RACE_CONCEPT_ID = d.CONCEPT_ID
         LEFT JOIN `concept` e on a.ETHNICITY_CONCEPT_ID = e.CONCEPT_ID
        ", sep="")

ehr_demo_path <- file.path(
  Sys.getenv("WORKSPACE_BUCKET"),
  "bq_exports",
  Sys.getenv("OWNER_EMAIL"),
  #%strftime(lubridate::now(), "%Y%m%d"),  # Comment out this line if you want the export to always overwrite.
  "ehr",
  "demographics_*.csv")

# Perform the query and export the dataset to Cloud Storage as CSV files.
bq_table_save(
  bq_dataset_query(Sys.getenv("WORKSPACE_CDR"), get_demo_sql, billing = Sys.getenv("GOOGLE_PROJECT")),
  ehr_demo_path,
  destination_format = "CSV")

In [None]:
# Check if file is in the bucket
# Get the bucket name
my_bucket <- Sys.getenv('WORKSPACE_BUCKET')
system(paste0("gsutil ls ", my_bucket, "/bq_exports/azink@researchallofus.org/ehr/demographics_*.csv"), intern=T)

In [None]:
# read export into bucket 
read_bq_export_from_workspace_bucket <- function(export_path) {
  col_types <- cols()
  bind_rows(
    map(system2('gsutil', args = c('ls', export_path), stdout = TRUE, stderr = TRUE),
        function(csv) {
          message(str_glue('Loading {csv}.'))
          chunk <- read_csv(pipe(str_glue('gsutil cat {csv}')), col_types = col_types, show_col_types = FALSE)
          if (is.null(col_types)) {
            col_types <- spec(chunk)
          }
          chunk
        }))
}
demo_ehr <- read_bq_export_from_workspace_bucket(ehr_demo_path)

In [None]:
write_csv(demo_ehr, 'demographics.csv', "/survey/")

# Mortality Data

In [None]:
# pull condition data 
get_death_sql <- paste("
    SELECT
        a.PERSON_ID,
        a.DEATH_DATE, 
        c.concept_name as DEATH_TYPE, 
        d.concept_name as DEATH_CAUSE
    FROM
        `death` a
         JOIN (SELECT DISTINCT a.PERSON_ID FROM `cb_search_all_events` a 
                   join `cb_search_person` b on a.PERSON_ID = b.PERSON_ID
                WHERE b.has_ehr_data = 1  
                 AND  (concept_id IN (SELECT distinct concept_id FROM `cb_criteria` 
                 WHERE path LIKE '%3000000694%'  and is_standard = 0 AND is_selectable = 1)) 
         ) b on a.person_id = b.person_id 
         LEFT JOIN `concept` c on a.DEATH_TYPE_CONCEPT_ID = c.CONCEPT_ID
         LEFT JOIN `concept` d on a.CAUSE_CONCEPT_ID = d.CONCEPT_ID
        ", sep="")

ehr_death_path <- file.path(
  Sys.getenv("WORKSPACE_BUCKET"),
  "bq_exports",
  Sys.getenv("OWNER_EMAIL"),
  #%strftime(lubridate::now(), "%Y%m%d"),  # Comment out this line if you want the export to always overwrite.
  "ehr",
  "death_*.csv")

# Perform the query and export the dataset to Cloud Storage as CSV files.
bq_table_save(
  bq_dataset_query(Sys.getenv("WORKSPACE_CDR"), get_death_sql, billing = Sys.getenv("GOOGLE_PROJECT")),
  ehr_death_path,
  destination_format = "CSV")

In [None]:
# read export into bucket 
read_bq_export_from_workspace_bucket <- function(export_path) {
  col_types <- cols()
  bind_rows(
    map(system2('gsutil', args = c('ls', export_path), stdout = TRUE, stderr = TRUE),
        function(csv) {
          message(str_glue('Loading {csv}.'))
          chunk <- read_csv(pipe(str_glue('gsutil cat {csv}')), col_types = col_types, show_col_types = FALSE)
          if (is.null(col_types)) {
            col_types <- spec(chunk)
          }
          chunk
        }))
}
death_ehr <- read_bq_export_from_workspace_bucket(ehr_death_path)

In [None]:
death_ehr$DEATH_YEAR<-year(death_ehr$DEATH_DATE)
table(death_ehr$DEATH_YEAR, useNA="always")

In [None]:
write_csv(death_ehr, 'mortality.csv', "/ehr/")

# Conditions
Pull from "condition_occurence" fieds:
- condition_occurrence_id (*link to ext for source id*)
- person_id
- condition_concept_id (*link to concept table for description*)
- condition_start_date
- condition_end_date
- condition_source_value
- condition_source_concept_id (*link to concept table for description*)

and from "condition_occurrence_ext" fields:
- condition_occurrence_id
- src_id 

subset to people in ehr and survey data using the "cb_search_person":
- has_ehr_data = 1
- has_ppi_survey_data = 1

Either want to link up to the visit_occurrence table so we know if it was an ed visit, hospitalization, etc.
all seem to link with the "visit_occurence_id" OR
keep visit occurence id in and then link after transferred to workspace?

In [None]:
# pull condition data 
get_conditions_sql <- paste("
    SELECT
        a.PERSON_ID,
        a.CONDITION_CONCEPT_ID,
        a.CONDITION_START_DATE,
        a.CONDITION_END_DATE,
        a.CONDITION_SOURCE_VALUE,
        a.CONDITION_SOURCE_CONCEPT_ID,
        a.VISIT_OCCURRENCE_ID,
        c.SRC_ID,
        d.CONCEPT_NAME AS CONDITION_NAME,
        e.CONCEPT_NAME AS SOURCE
    FROM
        `condition_occurrence` a
         JOIN (SELECT DISTINCT a.PERSON_ID FROM `cb_search_all_events` a 
                   join `cb_search_person` b on a.PERSON_ID = b.PERSON_ID
                WHERE b.has_ehr_data = 1  
                 AND  (concept_id IN (SELECT distinct concept_id FROM `cb_criteria` 
                 WHERE path LIKE '%3000000694%' and is_standard = 0 AND is_selectable = 1)) 
         ) b on a.person_id = b.person_id 
         LEFT JOIN `condition_occurrence_ext` c on a.condition_occurrence_id = c.condition_occurrence_id
         LEFT JOIN `concept` d on d.concept_id = a.condition_concept_id 
         LEFT JOIN `concept` e on e.concept_id = a.condition_source_concept_id
    WHERE
        EXTRACT(YEAR FROM a.CONDITION_START_DATE) >= 2016
        ", sep="")

# Formulate a Cloud Storage destination path for the data exported from BigQuery.
# NOTE: By default data exported multiple times on the same day will overwrite older copies.
#       But data exported on a different days will write to a new location so that historical
#       copies can be kept as the dataset definition is changed.

ehr_conditions_path <- file.path(
  Sys.getenv("WORKSPACE_BUCKET"),
  "bq_exports",
  Sys.getenv("OWNER_EMAIL"),
  #%strftime(lubridate::now(), "%Y%m%d"),  # Comment out this line if you want the export to always overwrite.
  "ehr",
  "conditions_*.csv")

# Perform the query and export the dataset to Cloud Storage as CSV files.
#bq_table_save(
#  bq_dataset_query(Sys.getenv("WORKSPACE_CDR"), get_conditions_sql, billing = Sys.getenv("GOOGLE_PROJECT")),
#  ehr_conditions_path,
#  destination_format = "CSV")

In [None]:
# read export into bucket 
read_bq_export_from_workspace_bucket <- function(export_path) {
  col_types <- cols()
  bind_rows(
    map(system2('gsutil', args = c('ls', export_path), stdout = TRUE, stderr = TRUE),
        function(csv) {
          message(str_glue('Loading {csv}.'))
          chunk <- read_csv(pipe(str_glue('gsutil cat {csv}')), col_types = col_types, show_col_types = FALSE)
          if (is.null(col_types)) {
            col_types <- spec(chunk)
          }
          chunk
        }))
}
conditions_ehr <- read_bq_export_from_workspace_bucket(ehr_conditions_path)

In [None]:
names(conditions_ehr)
keepvars<-c('PERSON_ID','CONDITION_CONCEPT_ID','CONDITION_NAME','CONDITION_START_DATE','VISIT_OCCURRENCE_ID','SRC_ID')

In [None]:
head(conditions_ehr[,keepvars])

In [None]:
conditions_ehr$year<-year(conditions_ehr$CONDITION_START_DATE)
table(conditions_ehr$year)

In [None]:
# subset to each year and write to csv 
for (yr in seq(2023, 2023)) {
    print(yr)
    subset<-conditions_ehr[conditions_ehr$year == yr, keepvars]
    write_csv(subset, paste0("conditions_", yr, ".csv"), "/ehr/")
}

In [None]:
# peep files
my_bucket <- Sys.getenv('WORKSPACE_BUCKET')
system(paste0("gsutil ls ", my_bucket, "/ehr/conditions_*.csv"), intern=T)

# Procedures

Update conditions sql for procedures, following same format.

In [None]:
get_procedures_sql <- paste("
    SELECT
        a.PERSON_ID,
        a.PROCEDURE_CONCEPT_ID,
        a.PROCEDURE_DATE,
        a.VISIT_OCCURRENCE_ID,
        c.SRC_ID,
        d.CONCEPT_NAME AS CONDITION_NAME,
        e.CONCEPT_NAME AS SOURCE
    FROM
        `procedure_occurrence` a
         JOIN (SELECT DISTINCT a.PERSON_ID FROM `cb_search_all_events` a 
                   join `cb_search_person` b on a.PERSON_ID = b.PERSON_ID
                WHERE b.has_ehr_data = 1  
                 AND  (concept_id IN (SELECT distinct concept_id FROM `cb_criteria` 
                  WHERE path LIKE '%3000000694%' and is_standard = 0 AND is_selectable = 1)) 
         ) b on a.person_id = b.person_id 
         LEFT JOIN `procedure_occurrence_ext` c on a.procedure_occurrence_id = c.procedure_occurrence_id
         LEFT JOIN `concept` d on d.concept_id = a.procedure_concept_id 
         LEFT JOIN `concept` e on e.concept_id = a.procedure_source_concept_id
    WHERE
        EXTRACT(YEAR FROM a.PROCEDURE_DATE) >= 2016
        ", sep="")

In [None]:
ehr_procedures_path <- file.path(
  Sys.getenv("WORKSPACE_BUCKET"),
  "bq_exports",
  Sys.getenv("OWNER_EMAIL"),
  #%strftime(lubridate::now(), "%Y%m%d"),  # Comment out this line if you want the export to always overwrite.
  "ehr",
  "procedures_*.csv")

# Perform the query and export the dataset to Cloud Storage as CSV files.
bq_table_save(
  bq_dataset_query(Sys.getenv("WORKSPACE_CDR"), get_procedures_sql, billing = Sys.getenv("GOOGLE_PROJECT")),
  ehr_procedures_path,
  destination_format = "CSV")

In [None]:
# to delete files
#my_bucket <- Sys.getenv('WORKSPACE_BUCKET')
#system(paste0("gsutil rm ", my_bucket, "/bq_exports/azink@researchallofus.org/ehr/procedures_*.csv"), intern=T)

In [None]:
# read export into bucket 
read_bq_export_from_workspace_bucket <- function(export_path) {
  col_types <- cols()
  bind_rows(
    map(system2('gsutil', args = c('ls', export_path), stdout = TRUE, stderr = TRUE),
        function(csv) {
          message(str_glue('Loading {csv}.'))
          chunk <- read_csv(pipe(str_glue('gsutil cat {csv}')), col_types = col_types, show_col_types = FALSE)
          if (is.null(col_types)) {
            col_types <- spec(chunk)
          }
          chunk
        }))
}
procedures_ehr <- read_bq_export_from_workspace_bucket(ehr_procedures_path)

In [None]:
procedures_ehr$year<-year(procedures_ehr$PROCEDURE_DATE)
table(procedures_ehr$year)

In [None]:
# subset to each year and write to csv 
for (yr in seq(2022, 2023)) {
    print(yr)
    subset<-procedures_ehr[procedures_ehr$year == yr,]
    write_csv(subset, paste0("procedures_", yr, ".csv"), "/ehr/")
}

# Drug_exposure

Update conditions sql for drug exposures, following same format

In [None]:
get_drugs_sql <- paste("
    SELECT
        a.PERSON_ID,
        a.DRUG_CONCEPT_ID,
        a.DRUG_EXPOSURE_START_DATE,
        a.DRUG_EXPOSURE_END_DATE, 
        a.VISIT_OCCURRENCE_ID,
        c.SRC_ID,
        d.CONCEPT_NAME AS DRUG_NAME
    FROM
        `drug_exposure` a
         JOIN (SELECT DISTINCT a.PERSON_ID FROM `cb_search_all_events` a 
                   join `cb_search_person` b on a.PERSON_ID = b.PERSON_ID
                WHERE b.has_ehr_data = 1  
                 AND  (concept_id IN (SELECT distinct concept_id FROM `cb_criteria` 
                 WHERE path LIKE '%3000000694%' and is_standard = 0 AND is_selectable = 1)) 
         ) b on a.person_id = b.person_id 
         LEFT JOIN `drug_exposure_ext` c on a.drug_exposure_id = c.drug_exposure_id
         LEFT JOIN `concept` d on d.concept_id = a.drug_concept_id 
    WHERE
        EXTRACT(YEAR FROM a.DRUG_EXPOSURE_START_DATE) >= 2016 
        ", sep="")

In [None]:
ehr_drugs_path <- file.path(
  Sys.getenv("WORKSPACE_BUCKET"),
  "bq_exports",
  Sys.getenv("OWNER_EMAIL"),
  #%strftime(lubridate::now(), "%Y%m%d"),  # Comment out this line if you want the export to always overwrite.
  "ehr",
  "drugs_*.csv")

# Perform the query and export the dataset to Cloud Storage as CSV files. - only need to run once
#bq_table_save(
#  bq_dataset_query(Sys.getenv("WORKSPACE_CDR"), get_drugs_sql, billing = Sys.getenv("GOOGLE_PROJECT")),
# ehr_drugs_path,
#  destination_format = "CSV")

In [None]:
# read export into bucket 
read_bq_export_from_workspace_bucket <- function(export_path) {
  col_types <- cols()
  bind_rows(
    map(system2('gsutil', args = c('ls', export_path), stdout = TRUE, stderr = TRUE),
        function(csv) {
          message(str_glue('Loading {csv}.'))
          chunk <- read_csv(pipe(str_glue('gsutil cat {csv}')), col_types = col_types, show_col_types = FALSE)
          if (is.null(col_types)) {
            col_types <- spec(chunk)
          }
          chunk
        }))
}
drugs_ehr <- read_bq_export_from_workspace_bucket(ehr_drugs_path)

In [None]:
drugs_ehr$year<-year(drugs_ehr$DRUG_EXPOSURE_START_DATE)
table(drugs_ehr$year)

In [None]:
# subset to each year and write to csv 
for (yr in seq(2023, 2023)) {
    print(yr)
    subset<-drugs_ehr[drugs_ehr$year == yr,]
    write_csv(subset, paste0("drugs_", yr, ".csv"), "/ehr/")
}

## Create crosswalk of drugs to drug classes

In [None]:
get_drug_classes_sql <- paste("
    SELECT
        a.DRUG_CONCEPT_ID,
        b.ANCESTOR_CONCEPT_ID, 
        d.CONCEPT_NAME AS DRUG_CLASS,
        d.VOCABULARY_ID, 
        d.CONCEPT_CLASS_ID
    FROM
        (SELECT DISTINCT DRUG_CONCEPT_ID FROM `drug_exposure`) a
         JOIN `concept_ancestor` b on a.drug_concept_id = b.descendant_concept_id
         LEFT JOIN `concept` d on d.concept_id = b.ancestor_concept_id 
    WHERE
        d.standard_concept = 'C' AND d.domain_id = 'Drug'
        ", sep="")

In [None]:
drug_classes_path <- file.path(
  Sys.getenv("WORKSPACE_BUCKET"),
  "bq_exports",
  Sys.getenv("OWNER_EMAIL"),
  #%strftime(lubridate::now(), "%Y%m%d"),  # Comment out this line if you want the export to always overwrite.
  "ehr",
  "drug_classes_*.csv")

# Perform the query and export the dataset to Cloud Storage as CSV files. - only need to run once
bq_table_save(
  bq_dataset_query(Sys.getenv("WORKSPACE_CDR"), get_drug_classes_sql, billing = Sys.getenv("GOOGLE_PROJECT")),
  drug_classes_path,
  destination_format = "CSV")

In [None]:
# read export into bucket 
read_bq_export_from_workspace_bucket <- function(export_path) {
  col_types <- cols()
  bind_rows(
    map(system2('gsutil', args = c('ls', export_path), stdout = TRUE, stderr = TRUE),
        function(csv) {
          message(str_glue('Loading {csv}.'))
          chunk <- read_csv(pipe(str_glue('gsutil cat {csv}')), col_types = col_types, show_col_types = FALSE)
          if (is.null(col_types)) {
            col_types <- spec(chunk)
          }
          chunk
        }))
}
drug_class <- read_bq_export_from_workspace_bucket(drug_classes_path)

In [None]:
atc<-drug_class[grepl('ATC', drug_class$CONCEPT_CLASS_ID),]

In [None]:
head(atc)

In [None]:
# collapse multiple concept class id
atc_updt <- atc %>% select(CONCEPT_CLASS_ID, DRUG_CLASS, DRUG_CONCEPT_ID) %>% 
    group_by(CONCEPT_CLASS_ID, DRUG_CONCEPT_ID) %>% 
    summarize( atc_class = ifelse(n_distinct(DRUG_CLASS) > 1,
                       "multiple classes",
                       first(DRUG_CLASS)))

In [None]:
atc_wide <- atc_updt %>% select(CONCEPT_CLASS_ID, atc_class, DRUG_CONCEPT_ID) %>%
  mutate(CONCEPT_CLASS_ID = gsub(" ", "_", CONCEPT_CLASS_ID)) %>%  # replace spaces with underscores
  pivot_wider(names_from = CONCEPT_CLASS_ID, values_from = atc_class)

In [None]:
length(unique(atc_wide$ATC_2nd))

In [None]:
write_csv(atc_wide, 'atc_classes.csv',"/ehr/")

# Measurement

There are LOTS of measurements. With the new data, too many to pull in. Going to separate out large files by measurement type
- heart rate (hr_YYYY.csv)
- blood pressure (bp_YYY.csv)
- respiratory rate (rr_YYYY.csv)
- other 

## Get heart rate

In [None]:
# heart rate query 
get_hr_sql <- paste("
    SELECT
        a.PERSON_ID,
        a.MEASUREMENT_CONCEPT_ID,
        a.MEASUREMENT_DATE,
        a.VALUE_AS_NUMBER,
        a.VISIT_OCCURRENCE_ID,
        c.SRC_ID,
        d.CONCEPT_NAME AS MEASUREMENT,
        e.CONCEPT_NAME AS UNIT
    FROM
        `measurement` a
         JOIN (SELECT DISTINCT a.PERSON_ID FROM `cb_search_all_events` a 
                   join `cb_search_person` b on a.PERSON_ID = b.PERSON_ID
                WHERE b.has_ehr_data = 1  
                 AND  (concept_id IN (SELECT distinct concept_id FROM `cb_criteria` 
                 WHERE path LIKE '%3000000694%' and is_standard = 0 AND is_selectable = 1)) 
         ) b on a.person_id = b.person_id 
         LEFT JOIN `measurement_ext` c on a.measurement_id = c.measurement_id
         LEFT JOIN `concept` d on d.concept_id = a.measurement_concept_id 
         LEFT JOIN `concept` e on e.concept_id = a.unit_concept_id
    WHERE
        EXTRACT(YEAR FROM a.MEASUREMENT_DATE) >= 2016
        AND A.MEASUREMENT_CONCEPT_ID in (3027018)
        ", sep="")

ehr_hr_path <- file.path(
  Sys.getenv("WORKSPACE_BUCKET"),
  "bq_exports",
  Sys.getenv("OWNER_EMAIL"),
  #%strftime(lubridate::now(), "%Y%m%d"),  # Comment out this line if you want the export to always overwrite.
  "ehr",
  "hr_*.csv")

In [None]:
# ONLY NEED TO RUN ONCE
# Perform the query and export the dataset to Cloud Storage as CSV files.
#bq_table_save(
#  bq_dataset_query(Sys.getenv("WORKSPACE_CDR"), get_hr_sql, billing = Sys.getenv("GOOGLE_PROJECT")),
#  ehr_hr_path,
#  destination_format = "CSV")

# read export into bucket 
read_bq_export_from_workspace_bucket <- function(export_path) {
  col_types <- cols()
  bind_rows(
    map(system2('gsutil', args = c('ls', export_path), stdout = TRUE, stderr = TRUE),
        function(csv) {
          message(str_glue('Loading {csv}.'))
          chunk <- read_csv(pipe(str_glue('gsutil cat {csv}')), col_types = col_types, show_col_types = FALSE)
          if (is.null(col_types)) {
            col_types <- spec(chunk)
          }
          chunk
        }))
}
hr <- read_bq_export_from_workspace_bucket(ehr_hr_path)

In [None]:
hr$year<-year(hr$MEASUREMENT_DATE)
table(hr$year)

In [None]:
# subset to each year and write to csv 
keepvars<-c('MEASUREMENT','MEASUREMENT_CONCEPT_ID','PERSON_ID','VISIT_OCCURRENCE_ID','MEASUREMENT_DATE','VALUE_AS_NUMBER', 'UNIT','year')
for (yr in seq(2022, 2023)) {
    print(yr)
    subset<-hr[hr$year == yr,keepvars]
    write_csv(subset, paste0("hr_", yr, ".csv"), "/ehr/")
}

## Blood Pressure Readings

In [None]:
bp <- bq_table_download(bq_project_query(
    BILLING_PROJECT_ID, page_size = 25000,
    query = str_glue('
SELECT
  a.measurement_concept_id,
  b.concept_name,
  count(*) as n
FROM
 `{CDR}.measurement` a
 join `{CDR}.concept` b on a.measurement_concept_id = b.concept_id
WHERE
        EXTRACT(YEAR FROM MEASUREMENT_DATE) >= 2016 
        AND upper(b.concept_name) like "%SYSTOLIC BLOOD PRESSURE%" or
            upper(b.concept_name) like "%DIASTOLIC BLOOD PRESSURE%"
GROUP BY 
 a.measurement_concept_id,
 b.concept_name
ORDER BY
 n desc
LIMIT 20
')))

In [None]:
# exclude invasive measures of blood pressure
bp$flag<-ifelse(grepl('INVASIVE', toupper(bp$concept_name)), 1, 0)
bp$flag<-ifelse(grepl('ANESTHESIA', toupper(bp$concept_name)), 1, bp$flag)
bp$flag<-ifelse(grepl('ARTERY', toupper(bp$concept_name)), 1, bp$flag)
bp
bp_ids<-bp[bp$flag == 0, 'measurement_concept_id']

In [None]:
paste(as.list(bp_ids), collapse=", ")

In [None]:
# blood pressure query 
get_bp_sql <- paste("
    SELECT
        a.PERSON_ID,
        a.MEASUREMENT_CONCEPT_ID,
        a.MEASUREMENT_DATE,
        a.VALUE_AS_NUMBER,
        a.VISIT_OCCURRENCE_ID,
        c.SRC_ID,
        d.CONCEPT_NAME AS MEASUREMENT,
        e.CONCEPT_NAME AS UNIT
    FROM
        `measurement` a
         JOIN (SELECT DISTINCT a.PERSON_ID FROM `cb_search_all_events` a 
                   join `cb_search_person` b on a.PERSON_ID = b.PERSON_ID
                WHERE b.has_ehr_data = 1  
                 AND  (concept_id IN (SELECT distinct concept_id FROM `cb_criteria` 
                 WHERE path LIKE '%3000000694%' and is_standard = 0 AND is_selectable = 1)) 
         ) b on a.person_id = b.person_id 
         LEFT JOIN `measurement_ext` c on a.measurement_id = c.measurement_id
         LEFT JOIN `concept` d on d.concept_id = a.measurement_concept_id 
         LEFT JOIN `concept` e on e.concept_id = a.unit_concept_id
    WHERE
        EXTRACT(YEAR FROM a.MEASUREMENT_DATE) >= 2016
        AND A.MEASUREMENT_CONCEPT_ID in (3012888, 3004249, 4154790, 4152194, 3034703, 3018586, 
        903115, 903118, 3028737, 44789316, 44789315, 4248524, 4232915, 3019962, 3009395)
        ", sep="")

ehr_bp_path <- file.path(
  Sys.getenv("WORKSPACE_BUCKET"),
  "bq_exports",
  Sys.getenv("OWNER_EMAIL"),
  #%strftime(lubridate::now(), "%Y%m%d"),  # Comment out this line if you want the export to always overwrite.
  "ehr",
  "bp_*.csv")

In [None]:
# ONLY NEED TO RUN ONCE
# Perform the query and export the dataset to Cloud Storage as CSV files.
bq_table_save(
  bq_dataset_query(Sys.getenv("WORKSPACE_CDR"), get_bp_sql, billing = Sys.getenv("GOOGLE_PROJECT")),
  ehr_bp_path,
  destination_format = "CSV")

# read export into bucket 
read_bq_export_from_workspace_bucket <- function(export_path) {
  col_types <- cols()
  bind_rows(
    map(system2('gsutil', args = c('ls', export_path), stdout = TRUE, stderr = TRUE),
        function(csv) {
          message(str_glue('Loading {csv}.'))
          chunk <- read_csv(pipe(str_glue('gsutil cat {csv}')), col_types = col_types, show_col_types = FALSE)
          if (is.null(col_types)) {
            col_types <- spec(chunk)
          }
          chunk
        }))
}
bp <- read_bq_export_from_workspace_bucket(ehr_bp_path)

In [None]:
bp$year<-year(bp$MEASUREMENT_DATE)
table(bp$year)

In [None]:
# subset to each year and write to csv 
keepvars<-c('MEASUREMENT','MEASUREMENT_CONCEPT_ID','PERSON_ID','VISIT_OCCURRENCE_ID','MEASUREMENT_DATE','VALUE_AS_NUMBER', 'UNIT','year')
for (yr in seq(2023, 2023)) {
    print(yr)
    subset<-bp[bp$year == yr,keepvars]
    write_csv(subset, paste0("bp_", yr, ".csv"), "/ehr/")
}

In [None]:
# my_bucket <- Sys.getenv('WORKSPACE_BUCKET')
# system(paste0("gsutil ls ", my_bucket, "/ehr/bp_*.csv"), intern=T)

## Respiratory Rate

In [None]:
rr <- bq_table_download(bq_project_query(
    BILLING_PROJECT_ID, page_size = 25000,
    query = str_glue('
SELECT
  a.measurement_concept_id,
  b.concept_name,
  count(*) as n
FROM
 `{CDR}.measurement` a
 join `{CDR}.concept` b on a.measurement_concept_id = b.concept_id
WHERE
        EXTRACT(YEAR FROM MEASUREMENT_DATE) >= 2016 
        AND A.MEASUREMENT_CONCEPT_ID in (3024171, 4313591)
GROUP BY 
 a.measurement_concept_id,
 b.concept_name
ORDER BY
 n desc
LIMIT 20
')))

In [None]:
rr

In [None]:
# repiratory rate query 
get_rr_sql <- paste("
    SELECT
        a.PERSON_ID,
        a.MEASUREMENT_CONCEPT_ID,
        a.MEASUREMENT_DATE,
        a.VALUE_AS_NUMBER,
        a.VISIT_OCCURRENCE_ID,
        c.SRC_ID,
        d.CONCEPT_NAME AS MEASUREMENT,
        e.CONCEPT_NAME AS UNIT
    FROM
        `measurement` a
         JOIN (SELECT DISTINCT a.PERSON_ID FROM `cb_search_all_events` a 
                   join `cb_search_person` b on a.PERSON_ID = b.PERSON_ID
                WHERE b.has_ehr_data = 1  
                 AND  (concept_id IN (SELECT distinct concept_id FROM `cb_criteria` 
                 WHERE path LIKE '%3000000694%' and is_standard = 0 AND is_selectable = 1)) 
         ) b on a.person_id = b.person_id 
         LEFT JOIN `measurement_ext` c on a.measurement_id = c.measurement_id
         LEFT JOIN `concept` d on d.concept_id = a.measurement_concept_id 
         LEFT JOIN `concept` e on e.concept_id = a.unit_concept_id
    WHERE
        EXTRACT(YEAR FROM a.MEASUREMENT_DATE) >= 2016
        AND A.MEASUREMENT_CONCEPT_ID in (3024171, 4313591)
        ", sep="")

ehr_rr_path <- file.path(
  Sys.getenv("WORKSPACE_BUCKET"),
  "bq_exports",
  Sys.getenv("OWNER_EMAIL"),
  #%strftime(lubridate::now(), "%Y%m%d"),  # Comment out this line if you want the export to always overwrite.
  "ehr",
  "rr_*.csv")

In [None]:
# ONLY NEED TO RUN ONCE
# Perform the query and export the dataset to Cloud Storage as CSV files.
bq_table_save(
  bq_dataset_query(Sys.getenv("WORKSPACE_CDR"), get_rr_sql, billing = Sys.getenv("GOOGLE_PROJECT")),
  ehr_rr_path,
  destination_format = "CSV")

# read export into bucket 
read_bq_export_from_workspace_bucket <- function(export_path) {
  col_types <- cols()
  bind_rows(
    map(system2('gsutil', args = c('ls', export_path), stdout = TRUE, stderr = TRUE),
        function(csv) {
          message(str_glue('Loading {csv}.'))
          chunk <- read_csv(pipe(str_glue('gsutil cat {csv}')), col_types = col_types, show_col_types = FALSE)
          if (is.null(col_types)) {
            col_types <- spec(chunk)
          }
          chunk
        }))
}
rr <- read_bq_export_from_workspace_bucket(ehr_rr_path)

In [None]:
rr$year<-year(rr$MEASUREMENT_DATE)
table(rr$year)

In [None]:
# subset to each year and write to csv 
keepvars<-c('MEASUREMENT','MEASUREMENT_CONCEPT_ID','PERSON_ID','VISIT_OCCURRENCE_ID','MEASUREMENT_DATE','VALUE_AS_NUMBER', 'UNIT','year')
for (yr in seq(2021, 2023)) {
    print(yr)
    subset<-rr[rr$year == yr,keepvars]
    write_csv(subset, paste0("rr_", yr, ".csv"), "/ehr/")
}

## Other

Include: oxygen sat, BMI, calcium, creatinine, glucose, hemoglobin, potassium, chloride, sodium, urea

In [None]:
# get top 20 measurements (there is so much data to pull in otherwise)
t<- bq_table_download(bq_project_query(
    BILLING_PROJECT_ID, page_size = 25000,
    query = str_glue('
SELECT
  a.measurement_concept_id,
  b.concept_name,
  count(*) as n
FROM
 `{CDR}.measurement` a
 join `{CDR}.concept` b on a.measurement_concept_id = b.concept_id
WHERE
        EXTRACT(YEAR FROM MEASUREMENT_DATE) >= 2016 
GROUP BY 
 a.measurement_concept_id,
 b.concept_name
ORDER BY
 n desc
LIMIT 20
')))

In [None]:
get_measurements_sql <- paste("
    SELECT
        a.PERSON_ID,
        a.MEASUREMENT_CONCEPT_ID,
        a.MEASUREMENT_DATE,
        a.VALUE_AS_NUMBER,
        a.VISIT_OCCURRENCE_ID,
        e.CONCEPT_NAME AS UNIT, 
        c.SRC_ID,
        d.CONCEPT_NAME AS MEASUREMENT
    FROM
        `measurement` a
         JOIN (SELECT DISTINCT a.PERSON_ID FROM `cb_search_all_events` a 
                   join `cb_search_person` b on a.PERSON_ID = b.PERSON_ID
                WHERE b.has_ehr_data = 1  
                 AND  (concept_id IN (SELECT distinct concept_id FROM `cb_criteria` 
                 WHERE path LIKE '%3000000694%' and is_standard = 0 AND is_selectable = 1)) 
         ) b on a.person_id = b.person_id 
         LEFT JOIN `measurement_ext` c on a.measurement_id = c.measurement_id
         LEFT JOIN `concept` d on d.concept_id = a.measurement_concept_id 
         LEFT JOIN `concept` e on e.concept_id = a.unit_concept_id
    WHERE
        EXTRACT(YEAR FROM a.MEASUREMENT_DATE) >= 2016
        AND A.MEASUREMENT_CONCEPT_ID IN (40762499, 3025315,
             3038553, 3004501, 3023103, 3019550, 3013682, 3016723, 3014576, 3006906,
             3000963, 3004410, 3005673, 2212392, 4197971, 3034639, 4184637, 3007263, 2212393,
            42869630, 40762352, 36304734, 3003309, 3034962, 3000483, 3024629, 3005131, 3039896, 3011424,
            3014053, 3004077, 3037110, 3020399, 3033408,3022192,4017787)
        ", sep="")

In [None]:
ehr_msrs_path <- file.path(
  Sys.getenv("WORKSPACE_BUCKET"),
  "bq_exports",
  Sys.getenv("OWNER_EMAIL"),
  #%strftime(lubridate::now(), "%Y%m%d"),  # Comment out this line if you want the export to always overwrite.
  "ehr",
  "msrs_*.csv")

# ONLY NEED TO RUN ONCE
# Perform the query and export the dataset to Cloud Storage as CSV files.
bq_table_save(
  bq_dataset_query(Sys.getenv("WORKSPACE_CDR"), get_measurements_sql, billing = Sys.getenv("GOOGLE_PROJECT")),
  ehr_msrs_path,
  destination_format = "CSV")

In [None]:
# to delete files
#my_bucket <- Sys.getenv('WORKSPACE_BUCKET')
#system(paste0("gsutil rm ", my_bucket, "/bq_exports/azink@researchallofus.org/ehr/drugs_*.csv"), intern=T)

# to peep files
#my_bucket <- Sys.getenv('WORKSPACE_BUCKET')
#system(paste0("gsutil ls ", my_bucket, "/bq_exports/azink@researchallofus.org/ehr/msrs_*.csv"), intern=T)

In [None]:
# read export into bucket 
read_bq_export_from_workspace_bucket <- function(export_path) {
  col_types <- cols()
  bind_rows(
    map(system2('gsutil', args = c('ls', export_path), stdout = TRUE, stderr = TRUE),
        function(csv) {
          message(str_glue('Loading {csv}.'))
          chunk <- read_csv(pipe(str_glue('gsutil cat {csv}')), col_types = col_types, show_col_types = FALSE)
          if (is.null(col_types)) {
            col_types <- spec(chunk)
          }
          chunk
        }))
}
msrs_ehr <- read_bq_export_from_workspace_bucket(ehr_msrs_path)

In [None]:
msrs_ehr$year<-year(msrs_ehr$MEASUREMENT_DATE)
table(msrs_ehr$year)

In [None]:
keepvars<-c('MEASUREMENT','MEASUREMENT_CONCEPT_ID','PERSON_ID','VISIT_OCCURRENCE_ID','MEASUREMENT_DATE','VALUE_AS_NUMBER', 'UNIT','year')
for (yr in seq(2016, 2018)) {
    print(yr)
    subset<-msrs_ehr[msrs_ehr$year == yr,keepvars]
    write_csv(subset, paste0("msrs_", yr, ".csv"), "/ehr/")
}

# Visit Occurrence

Pull in visit type 

In [None]:
get_visits_sql <- paste("
    SELECT
        a.PERSON_ID,
        a.VISIT_OCCURRENCE_ID,
        a.VISIT_CONCEPT_ID,
        a.VISIT_START_DATE,
        a.VISIT_END_DATE, 
        c.SRC_ID,
        d.CONCEPT_NAME AS VISIT_TYPE
    FROM
        `visit_occurrence` a
         JOIN (SELECT DISTINCT a.PERSON_ID FROM `cb_search_all_events` a 
                   join `cb_search_person` b on a.PERSON_ID = b.PERSON_ID
                WHERE b.has_ehr_data = 1  
                 AND  (concept_id IN (SELECT distinct concept_id FROM `cb_criteria` 
                 WHERE path LIKE '%3000000694%' and is_standard = 0 AND is_selectable = 1)) 
         ) b on a.person_id = b.person_id 
         LEFT JOIN `visit_occurrence_ext` c on a.visit_occurrence_id = c.visit_occurrence_id
         LEFT JOIN `concept` d on d.concept_id = a.visit_concept_id 
    WHERE
        EXTRACT(YEAR FROM a.VISIT_START_DATE) >= 2016
        ", sep="")

In [None]:
ehr_visits_path <- file.path(
  Sys.getenv("WORKSPACE_BUCKET"),
  "bq_exports",
  Sys.getenv("OWNER_EMAIL"),
  #%strftime(lubridate::now(), "%Y%m%d"),  # Comment out this line if you want the export to always overwrite.
  "ehr",
  "visits_*.csv")

# ONLY NEED TO RUN ONCE
# Perform the query and export the dataset to Cloud Storage as CSV files.
bq_table_save(
  bq_dataset_query(Sys.getenv("WORKSPACE_CDR"), get_visits_sql, billing = Sys.getenv("GOOGLE_PROJECT")),
  ehr_visits_path,
  destination_format = "CSV")

In [None]:
#my_bucket <- Sys.getenv('WORKSPACE_BUCKET')
#system(paste0("gsutil ls ", my_bucket, "/bq_exports/azink@researchallofus.org/ehr/visits_*.csv"), intern=T)

In [None]:
# read export into bucket 
read_bq_export_from_workspace_bucket <- function(export_path) {
  col_types <- cols()
  bind_rows(
    map(system2('gsutil', args = c('ls', export_path), stdout = TRUE, stderr = TRUE),
        function(csv) {
          message(str_glue('Loading {csv}.'))
          chunk <- read_csv(pipe(str_glue('gsutil cat {csv}')), col_types = col_types, show_col_types = FALSE)
          if (is.null(col_types)) {
            col_types <- spec(chunk)
          }
          chunk
        }))
}
visits_ehr <- read_bq_export_from_workspace_bucket(ehr_visits_path)

In [None]:
visits_ehr$year<-year(visits_ehr$VISIT_START_DATE)
table(visits_ehr$year)

In [None]:
# subset to each year and write to csv 
for (yr in seq(2020, 2023)) {
    print(yr)
    subset<-visits_ehr[visits_ehr$year == yr,]
    write_csv(subset, paste0("visits_", yr, ".csv"), "/ehr/")
}