#  1 Readme

1. "Helpers" will load necessary items to proceed with R processing, variable calling etc.
2. The following items will download the raw All Of Us Data. There are two options to download items:

#### 1. Live, direct
Live from BigQuery (bq_table_download):

Always freshest Tier-8 data.

Great for smaller result sets or ad-hoc pulls.

No intermediate files on disk.


#### 2. Manual via Google Cloud Storage (GCS) -->
Export → GCS → Read (bq_table_save + gsutil + read_csv):

Useful for very large tables or when you want to persist a snapshot in GCS.

Allows “download once, read many times” workflows.

Shares the CSVs across team members or jobs.


Standard is GCS Download. The data will be queried from AOU via BIGquery, and directly moved to our Google Cloud bucket. From there it will be read and merged onto the df_covariates. There are several consecutive steps for extracting separate data entities

IMPORTANT: If you move these scripts to your own workspace, make sure to change the path for the google cloud bucket, this is different for every workspace!

#TODO: Store survey_paths in temporary variables to replace manual exchange of that line

# Helpers

In [None]:
# Load "config.R" for utility functions. 
#Will also triggger loading of 
    
    # user_config.JSON (including key for project_config)
    # project_config.JSON
    # preprocessing_visualizations.R
    # preprocessing_functions.R

user <- "Jan" 
source("config.r")



#If certain packages not installed yet via requirements.txt, install them here via
# install.packages("package_name")

In [None]:
#debug
data_path

# Covariates

## Date of primary consent (LIVE)

### Extract

In [None]:
# Get date of primary consent
# - https://support.researchallofus.org/hc/en-us/articles/13176125767188-How-to-find-participant-enrollment-data
# - use to compute age from dob

DATASET <- Sys.getenv('WORKSPACE_CDR')

primary_consent_date_df <- bq_table_download(bq_project_query(
    Sys.getenv("GOOGLE_PROJECT"), page_size = 25000,
    query = str_glue("
-- Compute the count of unique participants in our All of Us cohort.
SELECT DISTINCT
    person_id,
    MIN(observation_date) AS primary_consent_date
FROM 
    `{DATASET}.concept`
JOIN 
    `{DATASET}.concept_ancestor` 
        ON concept_id = ancestor_concept_id
JOIN 
    `{DATASET}.observation` 
        ON descendant_concept_id = observation_source_concept_id
WHERE 
    concept_name = 'Consent PII' AND concept_class_id = 'Module'
GROUP BY 1
")))

head(primary_consent_date_df)
str(primary_consent_date_df)

In [None]:
ls()
str(primary_consent_date_df)

### Process and save

In [None]:
dim(primary_consent_date_df)
min(primary_consent_date_df$primary_consent_date)

print(paste("NAs in 'Date of attending' Spalte: ", sum(is.na(primary_consent_date_df$`primary_consent_date`))))

print(paste("Out of range from 2006-today: ", sum(primary_consent_date_df$`primary_consent_date` < as.Date("2006-01-01") | 
    primary_consent_date_df$`primary_consent_date` > Sys.Date())))

print(paste("Inconsistent format: ", sum(as.character(primary_consent_date_df$`primary_consent_date`, format = "%Y-%m-%d") != primary_consent_date_df$`primary_consent_date`)))

df_covariates <- primary_consent_date_df

write.table(df_covariates, file.path(data_path, "dataframes/df_covariates_1.1.txt"), sep="\t", quote=F, row.names=F)


## Age, Sex, Ethnicity (Via GCS)

### Extract

In [None]:
# This query represents dataset "df_Covariates" for domain "person" and was generated for All of Us Controlled Tier Dataset v7
dataset_03039562_person_sql <- paste("
    SELECT
        person.person_id,
        person.gender_concept_id,
        p_gender_concept.concept_name as gender,
        person.birth_datetime as date_of_birth,
        person.race_concept_id,
        p_race_concept.concept_name as race,
        person.ethnicity_concept_id,
        p_ethnicity_concept.concept_name as ethnicity,
        person.sex_at_birth_concept_id,
        p_sex_at_birth_concept.concept_name as sex_at_birth 
    FROM
        `person` person 
    LEFT JOIN
        `concept` p_gender_concept 
            ON person.gender_concept_id = p_gender_concept.concept_id 
    LEFT JOIN
        `concept` p_race_concept 
            ON person.race_concept_id = p_race_concept.concept_id 
    LEFT JOIN
        `concept` p_ethnicity_concept 
            ON person.ethnicity_concept_id = p_ethnicity_concept.concept_id 
    LEFT JOIN
        `concept` p_sex_at_birth_concept 
            ON person.sex_at_birth_concept_id = p_sex_at_birth_concept.concept_id", sep="")

# Formulate a Cloud Storage destination path for the data exported from BigQuery.
# NOTE: By default data exported multiple times on the same day will overwrite older copies.
#       But data exported on a different days will write to a new location so that historical
#       copies can be kept as the dataset definition is changed.
person_03039562_path <- file.path(
  Sys.getenv("WORKSPACE_BUCKET"),
  "bq_exports",
  Sys.getenv("OWNER_EMAIL"),
  strftime(lubridate::now(), "%Y%m%d"),  # Comment out this line if you want the export to always overwrite.
  "person_03039562",
  "person_03039562_*.csv")
message(str_glue('The data will be written to {person_03039562_path}. Use this path when reading ',
                 'the data into your notebooks in the future.'))

# Perform the query and export the dataset to Cloud Storage as CSV files.
# NOTE: You only need to run `bq_table_save` once. After that, you can
#       just read data from the CSVs in Cloud Storage.
bq_table_save(
  bq_dataset_query(Sys.getenv("WORKSPACE_CDR"), dataset_03039562_person_sql, billing = Sys.getenv("GOOGLE_PROJECT")),
  person_03039562_path,
  destination_format = "CSV")



In [None]:
# Read the data directly from Cloud Storage into memory.
# NOTE: Alternatively you can `gsutil -m cp {person_03039562_path}` to copy these files
#       to the Jupyter disk.
read_bq_export_from_workspace_bucket <- function(export_path) {
  col_types <- cols(gender = col_character(), race = col_character(), ethnicity = col_character(), sex_at_birth = col_character())
  bind_rows(
    map(system2('gsutil', args = c('ls', export_path), stdout = TRUE, stderr = TRUE),
        function(csv) {
          message(str_glue('Loading {csv}.'))
          chunk <- read_csv(pipe(str_glue('gsutil cat {csv}')), col_types = col_types, show_col_types = FALSE)
          if (is.null(col_types)) {
            col_types <- spec(chunk)
          }
          chunk
        }))
}
#person_03039562_path <- "gs://fc-secure-b96fb036-3379-4be0-8834-e7e486f2b76e/bq_exports/davidz1@researchallofus.org/20240509/person_03039562/person_03039562_*.csv"
person_03039562_path <- "gs://fc-secure-cde9a0f0-7d5a-4045-98bb-fb1d394a535b/bq_exports/janclusmann@researchallofus.org/20250701/person_03039562/person_03039562_*.csv"
dataset_03039562_person_df <- read_bq_export_from_workspace_bucket(person_03039562_path)

dim(dataset_03039562_person_df)

head(dataset_03039562_person_df, 5)

### Process

In [None]:
unique(dataset_03039562_person_df$race)
unique(dataset_03039562_person_df$ethnicity)
race_counts <- table(dataset_03039562_person_df$race)

# Get the proportions
race_proportions <- prop.table(race_counts)

# Combine counts and proportions into a data frame
race_summary <- data.frame(
  Count = race_counts,
  Proportion = race_proportions
)

# Print the result
print(race_summary)
race_summary <- race_summary[order(-race_summary$Count.Var1),]
pie(race_counts, main="Proportion of Races")

# Further preprocessing of race/ethnicity see "Ethnicity Processing"

### Merge with Covariates

In [None]:
df_covariates <- data.table::fread(file.path(data_path, "dataframes/df_covariates_1.1.txt"), sep="\t")
dim(df_covariates)

dataset_03039562_person_df <- dataset_03039562_person_df %>%
    select(person_id, date_of_birth, sex_at_birth, race, ethnicity)

df_covariates <- merge(df_covariates, dataset_03039562_person_df, by="person_id")
df_covariates$date_of_birth <- as.Date(df_covariates$date_of_birth)
df_covariates$primary_consent_date <- as.Date(df_covariates$primary_consent_date)
df_covariates$AGE <- as.numeric(difftime(df_covariates$primary_consent_date, df_covariates$date_of_birth, unit="days"))/365.25

df_covariates <- df_covariates %>%
    select(-date_of_birth)

# df_covariates$AGE_cat <- cut(df_covariates$AGE,
#                         breaks = c(0, 10, 20, 30, 40, 50, 60, 70, 80, 90, max(df_covariates$AGE)),
#                         labels = c("0-10", "10-20", "20-30", "30-40", "40-50", "50-60", "60-70", "70-80", "80-90", ">90"))
# table(df_covariates$AGE_cat, useNA = "ifany")

df_covariates <- df_covariates %>%
    rename(SEX = sex_at_birth) %>%
    filter(SEX %in% c("Female","Male"))

table(df_covariates$SEX)

dim(df_covariates)
head(df_covariates)

#df_covariates$SEX <- factor(df_covariates$SEX, levels = c(0, 1), labels = c("Female", "Male"))

#levels(df_covariates$SEX)

write.table(df_covariates, file.path(data_path, "dataframes/df_covariates_1.2.txt"), sep="\t", quote=F, row.names=F)


## BMI, Height, Weight, Waist, BP (Via GCS)

### Extract to GCS

In [None]:
# This query represents dataset "df_Covariates" for domain "measurement" and was generated for All of Us Controlled Tier Dataset v8
dataset_35873912_measurement_sql <- paste("
    SELECT
        measurement.person_id,
        measurement.measurement_concept_id,
        m_standard_concept.concept_name as standard_concept_name,
        m_standard_concept.concept_code as standard_concept_code,
        m_standard_concept.vocabulary_id as standard_vocabulary,
        measurement.measurement_datetime,
        measurement.measurement_type_concept_id,
        m_type.concept_name as measurement_type_concept_name,
        measurement.operator_concept_id,
        m_operator.concept_name as operator_concept_name,
        measurement.value_as_number,
        measurement.value_as_concept_id,
        m_value.concept_name as value_as_concept_name,
        measurement.unit_concept_id,
        m_unit.concept_name as unit_concept_name,
        measurement.range_low,
        measurement.range_high,
        measurement.visit_occurrence_id,
        m_visit.concept_name as visit_occurrence_concept_name,
        measurement.measurement_source_value,
        measurement.measurement_source_concept_id,
        m_source_concept.concept_name as source_concept_name,
        m_source_concept.concept_code as source_concept_code,
        m_source_concept.vocabulary_id as source_vocabulary,
        measurement.unit_source_value,
        measurement.value_source_value 
    FROM
        ( SELECT
            * 
        FROM
            `measurement` measurement 
        WHERE
            (
                measurement_source_concept_id IN (SELECT
                    DISTINCT c.concept_id 
                FROM
                    `cb_criteria` c 
                JOIN
                    (SELECT
                        CAST(cr.id as string) AS id       
                    FROM
                        `cb_criteria` cr       
                    WHERE
                        concept_id IN (903107, 903115, 903118, 903121, 903124, 903133, 903135)       
                        AND full_text LIKE '%_rank1]%'      ) a 
                        ON (c.path LIKE CONCAT('%.', a.id, '.%') 
                        OR c.path LIKE CONCAT('%.', a.id) 
                        OR c.path LIKE CONCAT(a.id, '.%') 
                        OR c.path = a.id) 
                WHERE
                    is_standard = 0 
                    AND is_selectable = 1)
            )) measurement 
    LEFT JOIN
        `concept` m_standard_concept 
            ON measurement.measurement_concept_id = m_standard_concept.concept_id 
    LEFT JOIN
        `concept` m_type 
            ON measurement.measurement_type_concept_id = m_type.concept_id 
    LEFT JOIN
        `concept` m_operator 
            ON measurement.operator_concept_id = m_operator.concept_id 
    LEFT JOIN
        `concept` m_value 
            ON measurement.value_as_concept_id = m_value.concept_id 
    LEFT JOIN
        `concept` m_unit 
            ON measurement.unit_concept_id = m_unit.concept_id 
    LEFT JOIn
        `visit_occurrence` v 
            ON measurement.visit_occurrence_id = v.visit_occurrence_id 
    LEFT JOIN
        `concept` m_visit 
            ON v.visit_concept_id = m_visit.concept_id 
    LEFT JOIN
        `concept` m_source_concept 
            ON measurement.measurement_source_concept_id = m_source_concept.concept_id", sep="")

# Formulate a Cloud Storage destination path for the data exported from BigQuery.
# NOTE: By default data exported multiple times on the same day will overwrite older copies.
#       But data exported on a different days will write to a new location so that historical
#       copies can be kept as the dataset definition is changed.
measurement_35873912_path <- file.path(
  Sys.getenv("WORKSPACE_BUCKET"),
  "bq_exports",
  Sys.getenv("OWNER_EMAIL"),
  strftime(lubridate::now(), "%Y%m%d"),  # Comment out this line if you want the export to always overwrite.
  "measurement_35873912",
  "measurement_35873912_*.csv")
message(str_glue('The data will be written to {measurement_35873912_path}. Use this path when reading ',
                 'the data into your notebooks in the future.'))

# Perform the query and export the dataset to Cloud Storage as CSV files.
# NOTE: You only need to run `bq_table_save` once. After that, you can
#       just read data from the CSVs in Cloud Storage.
bq_table_save(
  bq_dataset_query(Sys.getenv("WORKSPACE_CDR"), dataset_35873912_measurement_sql, billing = Sys.getenv("GOOGLE_PROJECT")),
  measurement_35873912_path,
  destination_format = "CSV")



### Load from GCS

In [None]:
# Read the data directly from Cloud Storage into memory.
# NOTE: Alternatively you can `gsutil -m cp {measurement_35873912_path}` to copy these files
#       to the Jupyter disk.
read_bq_export_from_workspace_bucket <- function(export_path) {
  col_types <- cols(standard_concept_name = col_character(), standard_concept_code = col_character(), standard_vocabulary = col_character(), measurement_type_concept_name = col_character(), operator_concept_name = col_character(), value_as_concept_name = col_character(), unit_concept_name = col_character(), visit_occurrence_concept_name = col_character(), measurement_source_value = col_character(), source_concept_name = col_character(), source_concept_code = col_character(), source_vocabulary = col_character(), unit_source_value = col_character(), value_source_value = col_character())
  bind_rows(
    map(system2('gsutil', args = c('ls', export_path), stdout = TRUE, stderr = TRUE),
        function(csv) {
          message(str_glue('Loading {csv}.'))
          chunk <- read_csv(pipe(str_glue('gsutil cat {csv}')), col_types = col_types, show_col_types = FALSE)
          if (is.null(col_types)) {
            col_types <- spec(chunk)
          }
          chunk
        }))
}

#measurement_35873912_path <- "gs://fc-secure-b96fb036-3379-4be0-8834-e7e486f2b76e/bq_exports/davidz1@researchallofus.org/20240517/measurement_35873912/measurement_35873912_*.csv"
measurement_35873912_path <- "gs://fc-secure-cde9a0f0-7d5a-4045-98bb-fb1d394a535b/bq_exports/janclusmann@researchallofus.org/20250701/measurement_35873912/measurement_35873912_*.csv"

dataset_35873912_measurement_df <- read_bq_export_from_workspace_bucket(measurement_35873912_path)

dim(dataset_35873912_measurement_df)

head(dataset_35873912_measurement_df, 5)

### Process

In [None]:
df_covariates <- data.table::fread(file.path(data_path, "dataframes/df_covariates_1.2.txt"), sep="\t")
dim(df_covariates)

measurement_df <- dataset_35873912_measurement_df %>%
    select(person_id, standard_concept_name, value_as_number, measurement_datetime) %>%
    filter(standard_concept_name != "Computed blood pressure systolic and diastolic, mean of 2nd and 3rd measures") %>%
    mutate(obs_date = as.Date(measurement_datetime))

measurement_df$standard_concept_name[measurement_df$standard_concept_name == "Body height"] <- "Standing height"
measurement_df$standard_concept_name[measurement_df$standard_concept_name == "Body mass index (BMI) [Ratio]"] <- "BMI"
measurement_df$standard_concept_name[measurement_df$standard_concept_name == "Computed systolic blood pressure, mean of 2nd and 3rd measures"] <- "SBP"
measurement_df$standard_concept_name[measurement_df$standard_concept_name == "Computed diastolic blood pressure, mean of 2nd and 3rd measures"] <- "DBP"
measurement_df$standard_concept_name[measurement_df$standard_concept_name == "Computed waist circumference, mean of closest two measures"] <- "Waist circumference"
measurement_df$standard_concept_name[measurement_df$standard_concept_name == "Body weight"] <- "Weight"

measurement_df_wide <- measurement_df %>%
  pivot_wider(
    id_cols     = c(person_id, obs_date),
    names_from  = standard_concept_name,
    values_from = value_as_number,
    values_fn   = mean,     # take the average of duplicates in one day
    values_fill = NA        # fill in truly missing labs
  )

df_covariates <- merge(df_covariates, measurement_df_wide, by="person_id")

dim(df_covariates)
head(df_covariates)

### Impute by calculation
bmi_NA <- sum(is.na(df_covariates$BMI))
print(paste("Number of NA values in BMI before imputation:", bmi_NA))

df_covariates$BMI[is.na(df_covariates$`BMI`) & !is.na(df_covariates$Weight) & !is.na(df_covariates$`Standing height`)] <- df_covariates$Weight[is.na(df_covariates$`BMI`) & !is.na(df_covariates$Weight) & !is.na(df_covariates$`Standing height`)] / (df_covariates$`Standing height`[is.na(df_covariates$`BMI`) & !is.na(df_covariates$Weight) & !is.na(df_covariates$`Standing height`)]/100)^2
bmi_NA <- sum(is.na(df_covariates$BMI))
print(paste("Number of NA values in BMI after manual calculation:", bmi_NA))

### Impute by waist circumference in categories
df_covariates$BMI_cat <- cut(df_covariates$BMI,
                        breaks = c(0, 18.5, 24.9, 29.9, max(df_covariates$BMI, na.rm=T)),
                        labels = c("Underweight", "Normal weight", "Overweight", "Obese"))
bmi_NA <- sum(is.na(df_covariates$BMI) & !is.na(df_covariates$`Waist circumference`))
print(paste("Number of NA values that could be imputed by estimation from Waist circumference:", bmi_NA))

### ********** CHANGED SEX *************** ###
#Label as obese or normal according to WHO definition
df_covariates$BMI_cat[is.na(df_covariates$BMI_cat) & (df_covariates$`Waist circumference` >= 80) & (df_covariates$SEX == "Female")] <- "Obese"
df_covariates$BMI_cat[is.na(df_covariates$BMI_cat) & (df_covariates$`Waist circumference` < 80) & (df_covariates$SEX == "Female")] <- "Normal weight"
df_covariates$BMI_cat[is.na(df_covariates$BMI_cat) & (df_covariates$`Waist circumference` >= 94) & (df_covariates$SEX == "Male")] <- "Obese"
df_covariates$BMI_cat[is.na(df_covariates$BMI_cat) & (df_covariates$`Waist circumference` < 94) & (df_covariates$SEX == "Male")] <- "Normal weight"

# Calculate mean for Group "Normal weight" and "Obese" according to waist circumference and Sex
normal_men <- mean(df_covariates$BMI[df_covariates$BMI_cat == "Normal weight" & df_covariates$SEX=="Male"], na.rm=TRUE)
normal_women <- mean(df_covariates$BMI[df_covariates$BMI_cat == "Normal weight" & df_covariates$SEX=="Female"], na.rm=TRUE)
obese_men <- mean(df_covariates$BMI[df_covariates$BMI_cat == "Obese" & df_covariates$SEX=="Male"], na.rm=TRUE)
obese_women <- mean(df_covariates$BMI[df_covariates$BMI_cat == "Obese" & df_covariates$SEX=="Female"], na.rm=TRUE)

#Store mean of groups normal/obese for men/women (Not great but better than just mean imputing)
df_covariates$BMI[is.na(df_covariates$BMI) & !is.na(df_covariates$`Waist circumference`) & df_covariates$SEX== "Male"] <- ifelse(df_covariates$BMI_cat[is.na(df_covariates$BMI) & !is.na(df_covariates$`Waist circumference`) & df_covariates$SEX== "Male"] == "Normal weight", normal_men, obese_men) 
df_covariates$BMI[is.na(df_covariates$BMI) & !is.na(df_covariates$`Waist circumference`) & df_covariates$SEX== "Female"] <- ifelse(df_covariates$BMI_cat[is.na(df_covariates$BMI) & !is.na(df_covariates$`Waist circumference`) & df_covariates$SEX== "Female"] == "Normal weight", normal_women, obese_women) 

bmi_NA <- sum(is.na(df_covariates$BMI))
print(paste("Number of NA values after imputation from Waist circumference:", bmi_NA))

dim(df_covariates)
head(df_covariates)

#dim(df_covariates[complete.cases(df_covariates[,c("BMI", "Waist circumference", "Weight", "Standing height")]), ])
#df_covariates <- df_covariates[complete.cases(df_covariates[,c("BMI", "Waist circumference", "Weight", "Standing height")]), ]

write.table(df_covariates, file.path(data_path, "dataframes/df_covariates_1.3.txt"), sep="\t", quote=F, row.names=F)
print("Writing successful")


## Smoking (via GCS)

### Extract

In [None]:
# This query represents dataset "df_df_Smoking" for domain "survey" and was generated for All of Us Controlled Tier Dataset v7
dataset_22980624_survey_sql <- paste("
    SELECT
        answer.person_id,
        answer.survey_datetime,
        answer.survey,
        answer.question_concept_id,
        answer.question,
        answer.answer_concept_id,
        answer.answer,
        answer.survey_version_concept_id,
        answer.survey_version_name  
    FROM
        `ds_survey` answer   
    WHERE
        (
            question_concept_id IN (1585857, 1585860, 1585873, 1586159, 1586162)
        )", sep="")

# Formulate a Cloud Storage destination path for the data exported from BigQuery.
# NOTE: By default data exported multiple times on the same day will overwrite older copies.
#       But data exported on a different days will write to a new location so that historical
#       copies can be kept as the dataset definition is changed.
survey_22980624_path <- file.path(
  Sys.getenv("WORKSPACE_BUCKET"),
  "bq_exports",
  Sys.getenv("OWNER_EMAIL"),
  strftime(lubridate::now(), "%Y%m%d"),  # Comment out this line if you want the export to always overwrite.
  "survey_22980624",
  "survey_22980624_*.csv")
message(str_glue('The data will be written to {survey_22980624_path}. Use this path when reading ',
                 'the data into your notebooks in the future.'))

# Perform the query and export the dataset to Cloud Storage as CSV files.
# NOTE: You only need to run `bq_table_save` once. After that, you can
#       just read data from the CSVs in Cloud Storage.
bq_table_save(
  bq_dataset_query(Sys.getenv("WORKSPACE_CDR"), dataset_22980624_survey_sql, billing = Sys.getenv("GOOGLE_PROJECT")),
  survey_22980624_path,
  destination_format = "CSV")



### Load from GCS

In [None]:
# Read the data directly from Cloud Storage into memory.
# NOTE: Alternatively you can `gsutil -m cp {survey_22980624_path}` to copy these files
#       to the Jupyter disk.
read_bq_export_from_workspace_bucket <- function(export_path) {
  col_types <- cols(survey = col_character(), question = col_character(), answer = col_character(), survey_version_name = col_character())
  bind_rows(
    map(system2('gsutil', args = c('ls', export_path), stdout = TRUE, stderr = TRUE),
        function(csv) {
          message(str_glue('Loading {csv}.'))
          chunk <- read_csv(pipe(str_glue('gsutil cat {csv}')), col_types = col_types, show_col_types = FALSE)
          if (is.null(col_types)) {
            col_types <- spec(chunk)
          }
          chunk
        }))
}
survey_22980624_path <- "gs://fc-secure-cde9a0f0-7d5a-4045-98bb-fb1d394a535b/bq_exports/janclusmann@researchallofus.org/20250701/survey_22980624/survey_22980624_*.csv"
dataset_22980624_survey_df <- read_bq_export_from_workspace_bucket(survey_22980624_path)

dim(dataset_22980624_survey_df)

head(dataset_22980624_survey_df, 5)

### Process

In [None]:
df_covariates <- data.table::fread(file.path(data_path, "dataframes/df_covariates_1.3.txt"), sep="\t")
head(df_covariates)

survey_df <- dataset_22980624_survey_df %>%
    select(person_id, question, answer)

survey_df$question[survey_df$question == "Smoking: 100 Cigs Lifetime"] <- "Ever smoked"
survey_df$question[survey_df$question == "Smoking: Smoke Frequency"] <- "Smoking status"
survey_df$question[survey_df$question == "Smoking: Average Daily Cigarette Number"] <- "Avg daily"
survey_df$question[survey_df$question == "Smoking: Current Daily Cigarette Number"] <- "Current daily"
survey_df$question[survey_df$question == "Smoking: Number Of Years"] <- "Years"

survey_df <- pivot_wider(survey_df, names_from = question, values_from = answer)

df_covariates <- merge(df_covariates, survey_df, by="person_id")

# Change ever smoked to yes, no, NA (we don't know)
df_covariates$`Ever smoked`[df_covariates$`Ever smoked` == "100 Cigs Lifetime: Yes"] <- "Yes"
df_covariates$`Ever smoked`[df_covariates$`Ever smoked` == "100 Cigs Lifetime: No"] <- "No"
df_covariates$`Ever smoked`[grepl("PMI", df_covariates$`Ever smoked`)] <- NA

# Change avg daily and years to NA if no answer (there's no way to impute)
df_covariates$`Avg daily`[grepl("PMI", df_covariates$`Avg daily`)] <- NA
df_covariates$`Years`[grepl("PMI", df_covariates$`Years`)] <- NA

# Compute pack years
df_covariates$`Pack years` <- NA
df_covariates$`Pack years`[!is.na(df_covariates$`Avg daily`) & !is.na(df_covariates$`Years`)] <- (as.numeric(df_covariates$`Avg daily`[!is.na(df_covariates$`Avg daily`) & !is.na(df_covariates$`Years`)]) / 20) * 
                                                                                                           as.numeric(df_covariates$`Years`[!is.na(df_covariates$`Avg daily`) & !is.na(df_covariates$`Years`)])
quantile(df_covariates$`Pack years`, na.rm=T)

#Impute ever smoked (is NA - no way of knowing)
df_covariates$`Ever smoked`[is.na(df_covariates$`Ever smoked`) & df_covariates$`Pack years` >= 0] <- "Yes"
df_covariates$`Ever smoked`[is.na(df_covariates$`Ever smoked`) & df_covariates$`Smoking status` != "Never"] <- "Yes"
df_covariates$`Ever smoked`[is.na(df_covariates$`Ever smoked`) & df_covariates$`Current daily` > 0] <- "Yes"
#df_covariates$`Ever smoked`[is.na(df_covariates$`Ever smoked`)] <- "No"
#table(df_covariates$`Ever smoked`, useNA = "ifany")

# Impute smoking status
df_covariates$`Smoking status`[df_covariates$`Smoking status` == "Smoke Frequency: Every Day"] <- "Current"
df_covariates$`Smoking status`[df_covariates$`Smoking status` == "Smoke Frequency: Some Days"] <- "Current"
df_covariates$`Smoking status`[df_covariates$`Smoking status` == "Smoke Frequency: Not At All"] <- "Previous"
df_covariates$`Smoking status`[grepl("PMI", df_covariates$`Smoking status`)] <- "Previous"
df_covariates$`Smoking status`[df_covariates$`Ever smoked` == "Yes" & is.na(df_covariates$`Smoking status`)] <- "Previous"
df_covariates$`Smoking status`[df_covariates$`Ever smoked` == "No" & is.na(df_covariates$`Smoking status`)] <- "Never"

#Pack years = 0 imputed from Smoking status=never or Ever smoked=No
df_covariates$`Pack years`[df_covariates$`Ever smoked` == "No" | df_covariates$`Smoking status` == "Never"] <- 0 

#All others have smoking = yes and will get the mean Pack years
print(paste0("Mean pack years: ", mean(df_covariates$`Pack years`, na.rm=T)))
df_covariates$`Pack years`[is.na(df_covariates$`Pack years`) & !is.na(df_covariates$`Ever smoked`)] <- mean(df_covariates$`Pack years`, na.rm=T)
quantile(df_covariates$`Pack years`, na.rm=T)

#############################
### Should missing smoking also get mean pack years? for now no - can add later or just remove pts
#############################

df_covariates <- df_covariates %>%
    select(-`Avg daily`, -`Current daily`, -`Years`)

sum(is.na(df_covariates$`Smoking status`))
sum(is.na(df_covariates$`Ever smoked`))
sum(is.na(df_covariates$`Pack years`))

#set factors with labels/levels
#df_covariates["Ever smoked"] <- factor(df_covariates$`Ever smoked`, levels=c(0, 1), labels = c("No", "Yes"))
#df_covariates["Smoking status"] <- factor(df_covariates$`Smoking status`, levels=c(0, 1, 2), labels = c("Never", "Previous", "Current"))

dim(df_covariates)
head(df_covariates)


In [None]:
df_covariates <- adjust_outliers(df_covariates, 'Pack years')

quantile(df_covariates$`Pack years`, na.rm=T)

dim(df_covariates)
head(df_covariates)

write.table(df_covariates, file.path(data_path, "dataframes/df_covariates_1.4.txt"), sep="\t", quote=F, row.names=F)


## Alcohol

### Extract

In [None]:
# This query represents dataset "df_df_Alcohol" for domain "survey" and was generated for All of Us Controlled Tier Dataset v7
dataset_05380700_survey_sql <- paste("
    SELECT
        answer.person_id,
        answer.survey_datetime,
        answer.survey,
        answer.question_concept_id,
        answer.question,
        answer.answer_concept_id,
        answer.answer,
        answer.survey_version_concept_id,
        answer.survey_version_name  
    FROM
        `ds_survey` answer   
    WHERE
        (
            question_concept_id IN (1586198, 1586201, 1586207, 1586213)
        )", sep="")

# Formulate a Cloud Storage destination path for the data exported from BigQuery.
# NOTE: By default data exported multiple times on the same day will overwrite older copies.
#       But data exported on a different days will write to a new location so that historical
#       copies can be kept as the dataset definition is changed.
survey_05380700_path <- file.path(
  Sys.getenv("WORKSPACE_BUCKET"),
  "bq_exports",
  Sys.getenv("OWNER_EMAIL"),
  strftime(lubridate::now(), "%Y%m%d"),  # Comment out this line if you want the export to always overwrite.
  "survey_05380700",
  "survey_05380700_*.csv")
message(str_glue('The data will be written to {survey_05380700_path}. Use this path when reading ',
                 'the data into your notebooks in the future.'))

# Perform the query and export the dataset to Cloud Storage as CSV files.
# NOTE: You only need to run `bq_table_save` once. After that, you can
#       just read data from the CSVs in Cloud Storage.
bq_table_save(
  bq_dataset_query(Sys.getenv("WORKSPACE_CDR"), dataset_05380700_survey_sql, billing = Sys.getenv("GOOGLE_PROJECT")),
  survey_05380700_path,
  destination_format = "CSV")



In [None]:
# Read the data directly from Cloud Storage into memory.
# NOTE: Alternatively you can `gsutil -m cp {survey_05380700_path}` to copy these files
#       to the Jupyter disk.
read_bq_export_from_workspace_bucket <- function(export_path) {
  col_types <- cols(survey = col_character(), question = col_character(), answer = col_character(), survey_version_name = col_character())
  bind_rows(
    map(system2('gsutil', args = c('ls', export_path), stdout = TRUE, stderr = TRUE),
        function(csv) {
          message(str_glue('Loading {csv}.'))
          chunk <- read_csv(pipe(str_glue('gsutil cat {csv}')), col_types = col_types, show_col_types = FALSE)
          if (is.null(col_types)) {
            col_types <- spec(chunk)
          }
          chunk
        }))
}
survey_05380700_path <- "gs://fc-secure-cde9a0f0-7d5a-4045-98bb-fb1d394a535b/bq_exports/janclusmann@researchallofus.org/20250701/survey_05380700/survey_05380700_*.csv"
dataset_05380700_survey_df <- read_bq_export_from_workspace_bucket(survey_05380700_path)

dim(dataset_05380700_survey_df)

head(dataset_05380700_survey_df, 5)

### Process

In [None]:
df_covariates <- data.table::fread(file.path(data_path, "dataframes/df_covariates_1.4.txt"), sep="\t")
head(df_covariates)

survey_df <- dataset_05380700_survey_df %>%
    select(person_id, question, answer)

survey_df$question[survey_df$question == "Alcohol: Alcohol Participant"] <- "Ever drank"
survey_df$question[survey_df$question == "Alcohol: Drink Frequency Past Year"] <- "Past year"
survey_df$question[survey_df$question == "Alcohol: Average Daily Drink Count"] <- "Avg daily"
survey_df$question[survey_df$question == "Alcohol: 6 or More Drinks Occurrence"] <- "More six"

survey_df <- pivot_wider(survey_df, names_from = question, values_from = answer)

df_covariates <- merge(df_covariates, survey_df, by="person_id")

# Rename `Past year`
df_covariates$`Past year`[df_covariates$`Past year` == "Drink Frequency Past Year: Never"] <- 0
df_covariates$`Past year`[df_covariates$`Past year` == "Drink Frequency Past Year: Monthly Or Less"] <- 1/4.345
df_covariates$`Past year`[df_covariates$`Past year` == "Drink Frequency Past Year: 2 to 4 Per Month"] <- 3/4.345
df_covariates$`Past year`[df_covariates$`Past year` == "Drink Frequency Past Year: 2 to 3 Per Week"] <- 2.5
df_covariates$`Past year`[df_covariates$`Past year` == "Drink Frequency Past Year: 4 or More Per Week"] <- 4
df_covariates$`Past year`[grepl("PMI", df_covariates$`Past year`)] <- NA

# Rename `Avg daily`
df_covariates$`Avg daily`[df_covariates$`Avg daily` == "Average Daily Drink Count: 1 or 2"] <- 1.5
df_covariates$`Avg daily`[df_covariates$`Avg daily` == "Average Daily Drink Count: 3 or 4"] <- 3.5
df_covariates$`Avg daily`[df_covariates$`Avg daily` == "Average Daily Drink Count: 5 or 6"] <- 5.5
df_covariates$`Avg daily`[df_covariates$`Avg daily` == "Average Daily Drink Count: 7 to 9"] <- 8
df_covariates$`Avg daily`[df_covariates$`Avg daily` == "Average Daily Drink Count: 10 or More"] <- 10
df_covariates$`Avg daily`[grepl("PMI", df_covariates$`Avg daily`)] <- NA

# Compute alk_g_d
df_covariates$`Alk_g_d` <- NA
df_covariates$`Alk_g_d`[!is.na(df_covariates$`Past year`) & !is.na(df_covariates$`Avg daily`)] <- 
    as.numeric(df_covariates$`Past year`[!is.na(df_covariates$`Past year`) & !is.na(df_covariates$`Avg daily`)]) * 
    as.numeric(df_covariates$`Avg daily`[!is.na(df_covariates$`Past year`) & !is.na(df_covariates$`Avg daily`)]) *
    14 / 7
df_covariates$`Alk_g_d`[df_covariates$`Ever drank` == "Alcohol Participant: No"] <- 0
df_covariates$`Alk_g_d`[df_covariates$`Past year` == 0] <- 0
# Otherwise keep NA if missing - if they prefer not to answer, we cannot make assumption

df_covariates <- df_covariates %>%
    select(-`Ever drank`, -`Past year`, -`Avg daily`, -`More six`)

quantile(df_covariates$`Alk_g_d`, na.rm=T)

dim(df_covariates)
head(df_covariates)


In [None]:
df_covariates <- adjust_outliers(df_covariates, 'Alk_g_d')

# WHO's gender-specific limits
df_covariates$Path_Alk <- ifelse(df_covariates$SEX == "Male", df_covariates$`Alk_g_d` / 60, df_covariates$`Alk_g_d` / 40)
df_covariates$Path_Alk <- ifelse(df_covariates$Path_Alk >= 1, 1, ifelse(df_covariates$Path_Alk < 0.9999999999, 0, df_covariates$Path_Alk))
df_covariates$High_Alk <- ifelse(df_covariates$SEX == "Male", df_covariates$`Alk_g_d` / 24, df_covariates$`Alk_g_d` / 12)
df_covariates$High_Alk <- ifelse(df_covariates$High_Alk >= 1, 1, ifelse(df_covariates$High_Alk < 0.9999999999, 0, df_covariates$High_Alk))

dim(df_covariates)
head(df_covariates)

write.table(df_covariates, file.path(data_path, "dataframes/df_covariates_1.5.txt"), sep="\t", quote=F, row.names=F)

sum(!is.na(df_covariates$`Alk_g_d`) & df_covariates$`Alk_g_d` > 0)
quantile(df_covariates$`Alk_g_d`, na.rm=T)
mean(df_covariates$`Alk_g_d`, na.rm=T)
sd(df_covariates$`Alk_g_d`, na.rm=T)


## Deprivation index

### Extract

In [None]:
# This query represents dataset "df_Deprivation" for domain "zip_code_socioeconomic" and was generated for All of Us Controlled Tier Dataset v7
dataset_14505310_zip_code_socioeconomic_sql <- paste("
    SELECT
        observation.person_id,
        observation.observation_datetime,
        zip_code.zip3_as_string as zip_code,
        zip_code.fraction_assisted_income as assisted_income,
        zip_code.fraction_high_school_edu as high_school_education,
        zip_code.median_income,
        zip_code.fraction_no_health_ins as no_health_insurance,
        zip_code.fraction_poverty as poverty,
        zip_code.fraction_vacant_housing as vacant_housing,
        zip_code.deprivation_index,
        zip_code.acs as american_community_survey_year 
    FROM
        `zip3_ses_map` zip_code 
    JOIN
        `observation` observation 
            ON CAST(SUBSTR(observation.value_as_string, 0, STRPOS(observation.value_as_string, '*') - 1) AS INT64) = zip_code.zip3 
            AND observation_source_concept_id = 1585250 
            AND observation.value_as_string NOT LIKE 'Res%'", sep="")

# Formulate a Cloud Storage destination path for the data exported from BigQuery.
# NOTE: By default data exported multiple times on the same day will overwrite older copies.
#       But data exported on a different days will write to a new location so that historical
#       copies can be kept as the dataset definition is changed.
zip_code_socioeconomic_14505310_path <- file.path(
  Sys.getenv("WORKSPACE_BUCKET"),
  "bq_exports",
  Sys.getenv("OWNER_EMAIL"),
  strftime(lubridate::now(), "%Y%m%d"),  # Comment out this line if you want the export to always overwrite.
  "zip_code_socioeconomic_14505310",
  "zip_code_socioeconomic_14505310_*.csv")
message(str_glue('The data will be written to {zip_code_socioeconomic_14505310_path}. Use this path when reading ',
                 'the data into your notebooks in the future.'))

# Perform the query and export the dataset to Cloud Storage as CSV files.
# NOTE: You only need to run `bq_table_save` once. After that, you can
#       just read data from the CSVs in Cloud Storage.
bq_table_save(
  bq_dataset_query(Sys.getenv("WORKSPACE_CDR"), dataset_14505310_zip_code_socioeconomic_sql, billing = Sys.getenv("GOOGLE_PROJECT")),
  zip_code_socioeconomic_14505310_path,
  destination_format = "CSV")



In [None]:
library(bigrquery)

# 1. Identify the project from the WORKSPACE_CDR env var:
#    This is usually of the form "fc-aou-cdr-prod.C2024Q3R4"
workspace_cdr <- Sys.getenv("WORKSPACE_CDR")
parts <- strsplit(workspace_cdr, "\\.")[[1]]
project_id <- parts[1]       # "fc-aou-cdr-prod"
dataset_id <- parts[2]       # "C2024Q3R4"

# 2. List all datasets in the Controlled-Tier project (to verify names):
all_datasets <- bq_project_datasets(project_id)
print(all_datasets)          # shows e.g. "C2024Q3R4", "C2024Q3R4_base", etc.

# 3. List all tables in your specific CDR v8 dataset:
cdr_dataset <- bq_dataset(project_id, dataset_id)
tables <- bq_dataset_tables(cdr_dataset)
print(tables)                # e.g. tables$table_id includes "observation", "zip3_ses_map"…

# 4. Inspect the schema of a specific table:
zip3_table <- bq_table(project_id, dataset_id, "zip3_ses_map")
bq_table_meta(zip3_table)$schema$fields  # shows each column and its type


In [None]:
library(bigrquery)

# 1. get your CDR project
cdr_proj <- strsplit(Sys.getenv("WORKSPACE_CDR"), "\\.")[[1]][1]

# 2. list every dataset in that project
all_ds <- bq_project_datasets(cdr_proj)
ds_ids <- all_ds$dataset # vector of names

# 3. filter to Controlled-Tier (starts with "C")
ct_ds <- grep("^C", ds_ids, value = TRUE)
print(ct_ds)

In [None]:
#debug

Sys.getenv("WORKSPACE_CDR")

### Load from GCS

In [None]:
# Read the data directly from Cloud Storage into memory.
# NOTE: Alternatively you can `gsutil -m cp {zip_code_socioeconomic_14505310_path}` to copy these files
#       to the Jupyter disk.
read_bq_export_from_workspace_bucket <- function(export_path) {
  col_types <- cols(zip3_as_string = col_character())
  bind_rows(
    map(system2('gsutil', args = c('ls', export_path), stdout = TRUE, stderr = TRUE),
        function(csv) {
          message(str_glue('Loading {csv}.'))
          chunk <- read_csv(pipe(str_glue('gsutil cat {csv}')), col_types = col_types, show_col_types = FALSE)
          if (is.null(col_types)) {
            col_types <- spec(chunk)
          }
          chunk
        }))
}
zip_code_socioeconomic_14505310_path <- "gs://fc-secure-cde9a0f0-7d5a-4045-98bb-fb1d394a535b/bq_exports/janclusmann@researchallofus.org/20250701/zip_code_socioeconomic_14505310/zip_code_socioeconomic_14505310_*.csv"
dataset_14505310_zip_code_socioeconomic_df <- read_bq_export_from_workspace_bucket(zip_code_socioeconomic_14505310_path)

dim(dataset_14505310_zip_code_socioeconomic_df)

head(dataset_14505310_zip_code_socioeconomic_df, 5)

### Process

In [None]:
df_covariates <- data.table::fread(file.path(data_path, "dataframes/df_covariates_1.5.txt"), sep="\t")
head(df_covariates)

dep_idx <- dataset_14505310_zip_code_socioeconomic_df %>%
    select(person_id, deprivation_index)

df_covariates <- merge(df_covariates, dep_idx, by="person_id")

dim(df_covariates)
head(df_covariates)

write.table(df_covariates, file.path(data_path, "dataframes/df_covariates_1.6.txt"), sep="\t", quote=F, row.names=F)


## Medications

### Extract

In [None]:
# This query represents dataset "df_df_Medications" for domain "survey" and was generated for All of Us Controlled Tier Dataset v7
dataset_00748392_survey_sql <- paste("
    SELECT
        answer.person_id,
        answer.survey_datetime,
        answer.survey,
        answer.question_concept_id,
        answer.question,
        answer.answer_concept_id,
        answer.answer,
        answer.survey_version_concept_id,
        answer.survey_version_name  
    FROM
        `ds_survey` answer   
    WHERE
        (
            question_concept_id IN (1384437, 1384641, 43528793, 43528819, 43528820, 836799, 836800)
        )", sep="")

# Formulate a Cloud Storage destination path for the data exported from BigQuery.
# NOTE: By default data exported multiple times on the same day will overwrite older copies.
#       But data exported on a different days will write to a new location so that historical
#       copies can be kept as the dataset definition is changed.
survey_00748392_path <- file.path(
  Sys.getenv("WORKSPACE_BUCKET"),
  "bq_exports",
  Sys.getenv("OWNER_EMAIL"),
  strftime(lubridate::now(), "%Y%m%d"),  # Comment out this line if you want the export to always overwrite.
  "survey_00748392",
  "survey_00748392_*.csv")
message(str_glue('The data will be written to {survey_00748392_path}. Use this path when reading ',
                 'the data into your notebooks in the future.'))

# Perform the query and export the dataset to Cloud Storage as CSV files.
# NOTE: You only need to run `bq_table_save` once. After that, you can
#       just read data from the CSVs in Cloud Storage.
bq_table_save(
  bq_dataset_query(Sys.getenv("WORKSPACE_CDR"), dataset_00748392_survey_sql, billing = Sys.getenv("GOOGLE_PROJECT")),
  survey_00748392_path,
  destination_format = "CSV")



In [None]:
# Read the data directly from Cloud Storage into memory.
# NOTE: Alternatively you can `gsutil -m cp {survey_00748392_path}` to copy these files
#       to the Jupyter disk.
read_bq_export_from_workspace_bucket <- function(export_path) {
  col_types <- cols(survey = col_character(), question = col_character(), answer = col_character(), survey_version_name = col_character())
  bind_rows(
    map(system2('gsutil', args = c('ls', export_path), stdout = TRUE, stderr = TRUE),
        function(csv) {
          message(str_glue('Loading {csv}.'))
          chunk <- read_csv(pipe(str_glue('gsutil cat {csv}')), col_types = col_types, show_col_types = FALSE)
          if (is.null(col_types)) {
            col_types <- spec(chunk)
          }
          chunk
        }))
}
survey_00748392_path <- "gs://fc-secure-cde9a0f0-7d5a-4045-98bb-fb1d394a535b/bq_exports/janclusmann@researchallofus.org/20250701/survey_00748392/survey_00748392_*.csv"
dataset_00748392_survey_df <- read_bq_export_from_workspace_bucket(survey_00748392_path)

dim(dataset_00748392_survey_df)

head(dataset_00748392_survey_df, 5)

### Process

In [None]:
df_covariates <- data.table::fread(file.path(data_path, "dataframes/df_covariates_1.6.txt"), sep="\t")
head(df_covariates)

survey_df <- dataset_00748392_survey_df %>%
    select(person_id, question, answer)

survey_df$question[survey_df$question == "Are you currently prescribed medications and/or receiving treatment for high blood pressure (hypertension)?"] <- "HTN"
survey_df$question[survey_df$question == "Are you currently prescribed medications and/or receiving treatment for high cholesterol?"] <- "High Chol"
survey_df$question[survey_df$question == "Are you currently prescribed medications and/or receiving treatment for other hormone/endocrine condition(s)?"] <- "Hormone"
survey_df$question[survey_df$question == "Are you currently prescribed medications and/or receiving treatment for type 1 diabetes?"] <- "T1DM"
survey_df$question[survey_df$question == "Are you currently prescribed medications and/or receiving treatment for type 2 diabetes?"] <- "T2DM"
survey_df$question[survey_df$question == "Including yourself, who in your family has had type 1 diabetes? Select all that apply."] <- "T1DM Fam"
survey_df$question[survey_df$question == "Including yourself, who in your family has had type 2 diabetes? Select all that apply."] <- "T2DM Fam"

# Set NA responses to 0 as well
htn_ids <- survey_df$person_id[survey_df$question == "HTN" & survey_df$answer == "Are you currently prescribed medications and/or receiving treatment for high blood pressure (hypertension)? - Yes"]
chol_ids <- survey_df$person_id[survey_df$question == "High Chol" & survey_df$answer == "Are you currently prescribed medications and/or receiving treatment for high cholesterol? - Yes"]
t1dm_ids <- survey_df$person_id[survey_df$question == "T1DM" & survey_df$answer == "Are you currently prescribed medications and/or receiving treatment for type 1 diabetes? - Yes"]
t2dm_ids <- survey_df$person_id[survey_df$question == "T2DM" & survey_df$answer == "Are you currently prescribed medications and/or receiving treatment for type 2 diabetes? - Yes"]
metabolic_ids <- unique(c(htn_ids, chol_ids, t1dm_ids, t2dm_ids))
hormone_ids <- survey_df$person_id[survey_df$question == "Hormone" & survey_df$answer == "Are you currently prescribed medications and/or receiving treatment for other hormone/endocrine condition(s)? - Yes"]

df_covariates$Medication <- 0
df_covariates$Medication[df_covariates$person_id %in% metabolic_ids] <- 1
df_covariates$Medication[df_covariates$person_id %in% hormone_ids] <- 2
df_covariates$Medication <- factor(df_covariates$Medication, levels=c(0,1,2), labels= c("No Medication", "Metabolic", "Hormones"))

table(df_covariates$Medication)

# Family diabetes
t1dm_ids <- survey_df$person_id[survey_df$question == "T1DM Fam" &
                                (survey_df$answer == "Including yourself, who in your family has had type 1 diabetes? - Father" | 
                                 survey_df$answer == "Including yourself, who in your family has had type 1 diabetes? - Mother" |
                                 survey_df$answer == "Including yourself, who in your family has had type 1 diabetes? - Sibling")]
t2dm_ids <- survey_df$person_id[survey_df$question == "T2DM Fam" &
                                (survey_df$answer == "Including yourself, who in your family has had type 2 diabetes? - Father" | 
                                 survey_df$answer == "Including yourself, who in your family has had type 2 diabetes? - Mother" |
                                 survey_df$answer == "Including yourself, who in your family has had type 2 diabetes? - Sibling")]
dm_ids <- unique(c(t1dm_ids, t2dm_ids))

df_covariates$Family_diabetes <- 0
df_covariates$Family_diabetes[df_covariates$person_id %in% dm_ids] <- 1
df_covariates$Family_diabetes <- as.factor(df_covariates$Family_diabetes)

table(df_covariates$Family_diabetes)

dim(df_covariates)
head(df_covariates)

write.table(df_covariates, file.path(data_path, "dataframes/df_covariates_1.7.txt"), sep="\t", quote=F, row.names=F)


## Additional corrections

In [None]:
df_covariates <- data.table::fread(file.path(data_path, "dataframes/df_covariates_1.7.txt"), sep="\t")
df_covariates <- df_covariates %>% select(-(c("AGE_cat")))



#write.table(df_covariates, "data/df_covariates_1.7.txt", sep="\t", quote=F, row.names=F) 

# ICD codes

## Extract

In [None]:
# EHR conditions SQL query (ONLY essential columns to reduce memory)
dataset_ehr_conditions_sql <- paste("
    SELECT
        condition_occurrence.person_id,
        c_source_concept.concept_code as source_concept_code,
        condition_occurrence.condition_start_datetime,
        c_source_concept.concept_name as source_concept_name
    FROM
        `condition_occurrence` condition_occurrence
    LEFT JOIN
        `concept` c_source_concept 
            ON condition_occurrence.condition_source_concept_id = c_source_concept.concept_id
    WHERE
        condition_occurrence.condition_start_datetime IS NOT NULL
        AND condition_occurrence.person_id IS NOT NULL
        AND c_source_concept.concept_code IS NOT NULL", sep="")

# Formulate a Cloud Storage destination path for EHR conditions data
ehr_conditions_path <- file.path(
  Sys.getenv("WORKSPACE_BUCKET"),
  "bq_exports",
  Sys.getenv("OWNER_EMAIL"),
  strftime(lubridate::now(), "%Y%m%d"),
  "ehr_conditions_comprehensive",
  "ehr_conditions_comprehensive_*.csv")

message(str_glue('EHR conditions data will be written to {ehr_conditions_path}. Use this path when reading ',
                 'the data into your notebooks in the future.'))

# Perform the query and export the dataset to Cloud Storage as CSV files
bq_table_save(
  bq_dataset_query(Sys.getenv("WORKSPACE_CDR"), dataset_ehr_conditions_sql, billing = Sys.getenv("GOOGLE_PROJECT")),
  ehr_conditions_path,
  destination_format = "CSV")

## Load from GCS

In [None]:
# =============================================================================
# READ DATA FROM CLOUD STORAGE
# =============================================================================

read_ehr_conditions_from_workspace_bucket <- function(export_path) {
  col_types <- cols(
    person_id = col_character(),
    source_concept_code = col_character(),
    condition_start_datetime = col_character(),
    source_concept_name = col_character()
  )
  bind_rows(
    map(system2('gsutil', args = c('ls', export_path), stdout = TRUE, stderr = TRUE),
        function(csv) {
          message(str_glue('Loading {csv}.'))
          chunk <- read_csv(pipe(str_glue('gsutil cat {csv}')), col_types = col_types, show_col_types = FALSE)
          chunk
        }))
}


# Read EHR conditions data from Cloud Storage  
message("Reading comprehensive EHR conditions data from Cloud Storage...")
df_diagnosis_raw <- read_bq_export_from_workspace_bucket(ehr_conditions_path)

message(str_glue("EHR conditions data loaded: {nrow(df_diagnosis_raw)} rows, {ncol(df_diagnosis_raw)} columns"))
head(df_diagnosis_raw, 5)

In [None]:
# Process EHR conditions data  
message("Processing EHR conditions data...")
df_diagnosis <- df_diagnosis_raw %>%
    select(person_id, source_concept_code, condition_start_datetime, source_concept_name) %>%
    filter(!is.na(source_concept_code), !is.na(condition_start_datetime))

# Clean ICD codes (following your existing pattern)
df_diagnosis$source_concept_code <- gsub("\\.", "", df_diagnosis$source_concept_code)
df_diagnosis$source_concept_code <- substr(df_diagnosis$source_concept_code, 1, 4)

message(str_glue("Processed diagnosis data: {nrow(df_diagnosis)} conditions"))

# =============================================================================
# DATA SUMMARIES
# =============================================================================

# EHR conditions summary
message("=== EHR CONDITIONS SUMMARY ===")  
message(str_glue("Total conditions: {nrow(df_diagnosis)}"))
message(str_glue("Unique persons: {n_distinct(df_diagnosis$person_id)}"))

if(nrow(df_diagnosis) > 0) {
    message(str_glue("Date range: {min(as.Date(df_diagnosis$condition_start_datetime), na.rm=TRUE)} to {max(as.Date(df_diagnosis$condition_start_datetime), na.rm=TRUE)}"))
    
    # Top condition codes
    condition_counts <- df_diagnosis %>% 
        count(source_concept_code, source_concept_name, sort = TRUE) %>% 
        head(20)
    message("Top 20 condition codes:")
    print(condition_counts)
    
    # ICD chapter distribution
    icd_distribution <- df_diagnosis %>%
        mutate(icd_chapter = str_sub(source_concept_code, 1, 1)) %>%
        count(icd_chapter, sort = TRUE)
    message("ICD code chapter distribution:")
    print(icd_distribution)
}


# Save processed diagnosis data (you can inspect before final storage)  
message("Saving processed diagnosis data...")
write.table(df_diagnosis, "data/df_diagnosis_processed.txt", sep="\t", quote=FALSE, row.names=FALSE)

message("=== EXTRACTION AND PROCESSING COMPLETE ===")
message("Files created for your inspection:")
message("  - data/df_blood_processed.txt")
message("  - data/df_diagnosis_processed.txt")
message("")
message("After inspection, you can rename these to:")
message("  - data/df_blood.txt") 
message("  - data/df_diagnosis.txt")
message("To integrate with your existing processing pipeline.")

## Postprocess

In [None]:
icd_codes_subset <- data.table::fread("data/df_ICD_codes_subset.txt", sep="\t")

dim(icd_codes_subset)
head(icd_codes_subset)


In [None]:
icd_codes_subset$source_concept_code <- gsub("\\.", "", icd_codes_subset$source_concept_code)
icd_codes_subset$source_concept_code <- substr(icd_codes_subset$source_concept_code, 1, 4)

head(icd_codes_subset)


In [None]:
# ICD Groups
ICD_Groups <- data.table::fread("data/ICD_Groups.txt", sep="\t")

pat_icds_labelled <- left_join(x = ICD_Groups, y = icd_codes_subset %>% select(person_id, source_concept_code) %>% distinct(), 
                               by = c("ICD10" = "source_concept_code")) %>% 
                     distinct()

dim(pat_icds_labelled)
#head(pat_icds_labelled)

#Check for amount of diagnosis per actual ICD, instead of group. Likely here will be the correct amounts, while summarized will be less due to parallel coded diagnosis
df_icd_groups_before <- pat_icds_labelled %>% group_by(Diagnosis, person_id) %>% summarise(occurence = n()) %>% spread(Diagnosis, occurence) 
sum_groups_before <- as.data.frame(colSums(df_icd_groups_before, na.rm=TRUE))

head(sum_groups_before)

#Groupby + summarises
df_icd_groups <- pat_icds_labelled %>% group_by(Group, person_id) %>% summarise(occurence = n()) %>% spread(Group, occurence) 
df_icd_groups[df_icd_groups > 1 & df_icd_groups < 100]<- 1 
df_icd_groups[is.na(df_icd_groups)] <- 0
sum_groups_after <- as.data.frame(colSums(df_icd_groups))

head(sum_groups_before)


In [None]:
# ICD Singles
ICD_Singles <- data.table::fread("data/ICD_Singles.txt", sep="\t")

pat_icds_labelled <- left_join(x = ICD_Singles, y = icd_codes_subset %>% select(person_id, source_concept_code) %>% distinct(), 
                               by = c("ICD10" = "source_concept_code"))%>% 
                     distinct() 

dim(pat_icds_labelled)

#Groupby + summarises
df_icd_singles <- pat_icds_labelled %>% group_by(Diagnosis, person_id) %>% summarise(occurence = n()) %>% spread(Diagnosis, occurence) 
df_icd_singles[df_icd_singles > 1 & df_icd_singles < 10000]<- 1 
df_icd_singles[is.na(df_icd_singles)] <- 0
sum_singles <- as.data.frame(colSums(df_icd_singles))

head(sum_singles)


In [None]:
# DM ICD codes
icd_dm_codes <- data.table::fread("data/df_ICD_codes_DM.txt", sep="\t")

icd_dm <- icd_dm_codes %>%
    select(person_id) %>%
    distinct()
icd_dm$DM <- 1
icd_dm <- select(icd_dm, c(person_id, DM))

dim(icd_dm)
head(icd_dm)


In [None]:
#Merging groups and singles
df_icd <- full_join(df_icd_singles, df_icd_groups, by = "person_id")
df_icd <- merge(df_icd, icd_dm, by="person_id", all=TRUE)
df_icd <- replace(df_icd, is.na(df_icd), 0)

dim(df_icd)
head(df_icd)

### Note:
### - person_id '0' represents all codes that had NO patients diagnosed


### Include 'control' patients - patients with no diagnoses
icd_person_ids <- read.table("data/AllofUs_v7_phenotype_icd_091223_person_ids.txt", sep="\t", header=T)

df_icd <- merge(df_icd, icd_person_ids, by="person_id", all=TRUE)
df_icd[is.na(df_icd)] <- 0

dim(df_icd)
head(df_icd)

write.table(df_icd, "data/df_diagnosis.txt", sep="\t", quote=F, row.names=F)


In [None]:
#Summarize ICD codes
create_summary <- function(df) {
  total_rows <- nrow(df)
  summary_df <- df %>% 
    summarise(across(-person_id, ~sum(. == "1", na.rm = TRUE))) %>% 
    pivot_longer(everything(), names_to = "Diagnosis", values_to = "Occurrence") %>% 
    mutate(Percentage = (Occurrence / total_rows) * 100) %>%
    arrange(desc(Occurrence))
  
  as.data.frame(summary_df)
}

df_icd <- data.table::fread("data/df_diagnosis.txt", sep="\t")

sum_diagnosis <- create_summary(df_icd)
sum_diagnosis
sum_diagnosis_sorted <- sum_diagnosis[order(sum_diagnosis$Diagnosis), ]
sum_diagnosis_sorted


## Extract DOI cases/controls

In [None]:
icd_codes_subset <- data.table::fread("data/HCC_ICD_codes_subset.txt", sep="\t")

dim(icd_codes_subset)
head(icd_codes_subset)

icd_codes_subset$source_concept_code <- gsub("\\.", "", icd_codes_subset$source_concept_code)
icd_codes_subset$source_concept_code <- substr(icd_codes_subset$source_concept_code, 1, 4)

head(icd_codes_subset)

### Process DOI cases

In [None]:
##### DOI Filters from project configs #####
print(IOIs)

# Build regex pattern: ^ ensures "starts with"
pattern <- paste0("^", IOIs, collapse = "|")

# Apply filter
cases <- icd_codes_subset[grepl(pattern, icd_codes_subset$source_concept_code), ]


cases <- icd_codes_subset[icd_codes_subset$source_concept_code == "C220",]
cases <- cases %>%
  mutate(condition_start_datetime = ymd_hms(condition_start_datetime))

# Filter for the first visit per unique person_id
cases_first_visit <- cases %>%
  group_by(person_id) %>%
  arrange(condition_start_datetime) %>%
  slice(1) %>%
  ungroup() %>%
  mutate(year_of_diag = sapply(condition_start_datetime, extract_year))

# Check the result
print(paste("Original number of rows:", nrow(cases)))
print(paste("Number of rows after filtering:", nrow(cases_first_visit)))
print(paste("Number of unique person_ids:", n_distinct(cases_first_visit$person_id)))

#Prepare a df_y that includes the 
df_y <- cases_first_visit %>% select(c("person_id", "year_of_diag")) %>% mutate(status = 1) %>% rename(eid = person_id)
       
df_y

write.csv(df_y, "data/df_y.csv")

### Plot DOI Cases

In [None]:
if (!requireNamespace("systemfonts", quietly = TRUE)) install.packages("systemfonts")
if (!requireNamespace("showtext", quietly = TRUE)) install.packages("showtext")

n_total <- nrow(df_y) #Assess absolute number

plot_included_discarded_cases(cases_first_visit, base_size=30, n_total = n_total)


## HCC Only: Explore diseases underlying DOI cases

### Rank priority of diagnosis

In [None]:
priority_order <- c("Cirrhosis", "Viral Hepatitis", "CLD", "No Liver disease")

pat_cld <- pat_icds[pat_icds$diag_icd10 %in% par_icd_codes | pat_icds$diag_icd9 %in% par_icd_codes, ] %>%
  select(c("eid", "diag_icd9", "diag_icd10", "epistart")) %>%
  left_join(Patients_at_risk, by = c("diag_icd10" = "ICD10")) %>%
  right_join(df_y, by = "eid") %>%
  subset(status==1) %>%
  select(-c("location_name", "location_code", "location_nr", "location_country", "country_code", "split_ext", "split_int"))

pat_cld$epistart[is.na(pat_cld$Group)] <- as.Date(pat_cld$date_of_diag)

pat_cld$Group[is.na(pat_cld$Group)] <- "No Liver disease"
pat_cld$Group[!pat_cld$Group %in% par_subset] <- "No Liver disease" #Replace all non-matching groups with "No LD"
pat_cld$Group <- factor(pat_cld$Group, levels=priority_order)

summary(pat_cld$Group)

priority <- function(diagnosis) {
  case_when(
    diagnosis == "Cirrhosis" ~ 1,
    diagnosis == "Viral Hepatitis" ~ 2,
    diagnosis == "CLD" ~ 3,
    diagnosis == "No Liver disease" ~ 4,
    TRUE ~ 5  # Assign a lower priority to other diagnoses
  )
}


pat_cld <- pat_cld %>%
  group_by(eid)

# Node 0 represents first visit to hospital after assessment
pat_cld_node0 <- pat_cld %>%
  mutate(Priority = priority(Group)) %>%
  group_by(eid) %>%
  filter(epistart == min(epistart)) %>%
  arrange(eid, Priority) %>%
  filter(row_number() == 1) %>%
  ungroup() %>%
  select(-Priority)

summary_node0 <- pat_cld_node0 %>%
  group_by(Group) %>%
  summarize(Count = n(), .groups = 'drop') %>%
  mutate(Time = "First \nEHR") %>%
  mutate(Order = 1 ) %>%
  mutate(Priority = priority(Group)) %>%
  arrange(Priority) %>%
  mutate(Percentage = round(Count / sum(Count) * 100)) 



pat_cld_node1 <- pat_cld %>%
  mutate(Priority = priority(Group)) %>%
  group_by(eid) %>%
  #filter(epistart == max(epistart)) %>%   #better to take all incidents than just the last, as not all diags get coded everytime
  arrange(eid, Priority) %>%
  filter(row_number() == 1) %>%
  ungroup() %>%
  select(-Priority)

summary_node1 <- pat_cld_node1 %>%
  group_by(Group) %>%
  summarize(Count = n(), .groups = 'drop') %>%
  mutate(Time = paste0("Prior to\n", DOI)) %>%
  mutate(Order = 2 ) %>%
  mutate(Priority = priority(Group)) %>%
  arrange(Priority) %>%
  mutate(Percentage = round(Count / sum(Count) * 100))


# View the summaries
print(summary_node0)
print(summary_node1)


# Merge timepoints
combined_data <- rbind(summary_node0, summary_node1)


stacked_bars_time_comparison(combined_data, base_size=22)


## Explore Cirrhosis

In [None]:
check_unique_participants <- function(df, name) {
  total_rows <- nrow(df)
  unique_participants <- n_distinct(df$person_id)
  print(paste("Checking", name))
  print(paste("Total rows:", total_rows))
  print(paste("Unique participants:", unique_participants))
}


# Check cirrhosis_cases and HCC_cases
check_unique_participants(cirrhosis_cases, "Cirrhosis Cases")
check_unique_participants(HCC_cases, "HCC Cases")


process_icd_codes <- function(icd_codes_subset, codes, disease_name, select_visits = "first") {
  # Filter for the specified ICD codes
  cases <- icd_codes_subset[icd_codes_subset$source_concept_code %in% codes, ]
  
  # Convert condition_start_datetime to datetime format
  cases <- cases %>%
    mutate(condition_start_datetime = ymd_hms(condition_start_datetime))
  
  # Function to extract year from datetime
  extract_year <- function(date) {
    return(year(date))
  }
  
  # Process based on select_visits option
  if (select_visits == "first") {
    # Filter for the first visit per unique person_id
    processed_cases <- cases %>%
      group_by(person_id) %>%
      arrange(condition_start_datetime) %>%
      slice(1) %>%
      ungroup()
  } else if (select_visits == "all") {
    # Keep all visits
    processed_cases <- cases
  } else {
    stop("Invalid select_visits option. Use 'first' or 'all'.")
  }
  
  # Add year and date_of_diag columns
  processed_cases <- processed_cases %>%
    mutate(
      year = sapply(condition_start_datetime, extract_year),
      date_of_diag = as.Date(condition_start_datetime)
    )
  
  # Add disease name column
  processed_cases$disease <- disease_name
  
  # Print summary information
  print(paste("Processing", disease_name, "cases:"))
  print(paste("ICD codes used:", paste(codes, collapse = ", ")))
  print(paste("Original number of rows:", nrow(cases)))
  print(paste("Number of rows after processing:", nrow(processed_cases)))
  print(paste("Number of unique person_ids:", n_distinct(processed_cases$person_id)))
  
  # Optional: Count of cases per ICD code
  if (length(codes) > 1) {
    code_counts <- processed_cases %>%
      group_by(source_concept_code) %>%
      summarise(count = n()) %>%
      arrange(desc(count))
    print("Cases per ICD code:")
    print(code_counts)
  }
    
  processed_cases <- processed_cases %>% select(c("person_id", "source_concept_code", "date_of_diag", "disease"))
  return(processed_cases)
}

cirrhosis_codes <- c("K703", "K743", "K745", "K746", "K767", "I850", "I859", "R18")


cirrhosis_cases <- process_icd_codes(icd_codes_subset, cirrhosis_codes, "Cirrhosis", "first")

HCC_cases <- process_icd_codes(icd_codes_subset, "C220", DOI, "first")


check_unique_participants(cirrhosis_cases, "Cirrhosis Cases")
check_unique_participants(HCC_cases, paste0("Cases with", DOI))


head(cirrhosis_cases)
head(HCC_cases)
dim(cirrhosis_cases)
dim(HCC_cases)


In [None]:
time_threshold = 90
print(time_threshold)

cirrhosis_cases <- cirrhosis_cases %>%
    mutate(date_of_diag = as.Date(date_of_diag))
  
HCC_cases <- HCC_cases %>%
    mutate(date_of_diag = as.Date(date_of_diag))

early_cirrhosis_cases <- merge(cirrhosis_cases, HCC_cases, by="person_id", suffix = c("_cirrhosis", "_HCC"), all=TRUE)

sum(is.na(early_cirrhosis_cases$date_of_diag_HCC))

early_cirrhosis_cases <- early_cirrhosis_cases %>%
  mutate(
    time_to_hcc = case_when(
      !is.na(date_of_diag_HCC) & !is.na(date_of_diag_cirrhosis) ~ 
        as.numeric(difftime(date_of_diag_HCC, date_of_diag_cirrhosis, units = "days")),
      TRUE ~ NA_real_
    ),
    cirrhosis_status = case_when(
      is.na(date_of_diag_cirrhosis) & is.na(date_of_diag_HCC) ~ "Neither Cirrhosis nor HCC",
      is.na(date_of_diag_HCC) ~ "Cirrhosis but No HCC",
      is.na(date_of_diag_cirrhosis) ~ "HCC but No Cirrhosis",
      time_to_hcc > time_threshold ~ "Cirrhosis prior to HCC",
      time_to_hcc >= -time_threshold & time_to_hcc <= time_threshold ~ "Simultaneous Cirrhosis + HCC",
      time_to_hcc < -time_threshold ~ "Cirrhosis after HCC",
      TRUE ~ "Error in date calculation"
    )
  )


early_cirrhosis_cases




In [None]:
case_analysis <- early_cirrhosis_cases %>%
    group_by(cirrhosis_status) %>%
    summarize(count = n()) %>%
    mutate(percentage = count / sum(count) * 100)
  
  print("Cirrhosis cases analysis:")
  print(case_analysis)

early_cirrhosis_only <- early_cirrhosis_cases %>%
  filter(cirrhosis_status %in% c("Cirrhosis but No HCC", "Cirrhosis prior to HCC"))

df_early_cirrhosis <- early_cirrhosis_only %>%
  select(person_id) %>%
  mutate(cirrhosis = 1) %>%
  distinct()  

write_csv(df_early_cirrhosis, "data/df_early_cirrhosis.csv")

In [None]:
early_cirrhosis_only

df_cirrhosis <- early_cirrhosis_only %>% select("person_id")

# Blood: Not functional yet, use "Extract quantitative phenotypes"

https://support.researchallofus.org/hc/en-us/articles/30125602539284-Introduction-to-All-of-Us-Electronic-Health-Record-EHR-Collection-and-Data-Transformation-Methods

## Extract (all blood data)

In [None]:
dataset_blood_labs_sql <- paste("
    SELECT
        measurement.person_id,
        m_standard_concept.concept_name as standard_concept_name,
        measurement.value_as_number,
        measurement.measurement_datetime
    FROM
        `measurement` measurement 
    LEFT JOIN
        `concept` m_standard_concept 
            ON measurement.measurement_concept_id = m_standard_concept.concept_id 
    WHERE
        measurement.value_as_number IS NOT NULL
        AND measurement.measurement_datetime IS NOT NULL
        AND measurement.person_id IS NOT NULL
        AND m_standard_concept.concept_name IS NOT NULL", sep="")

# Formulate a Cloud Storage destination path for blood labs data
blood_labs_path <- file.path(
  Sys.getenv("WORKSPACE_BUCKET"),
  "bq_exports",
  Sys.getenv("OWNER_EMAIL"),
  strftime(lubridate::now(), "%Y%m%d"),
  "blood_labs_comprehensive",
  "blood_labs_comprehensive_*.csv")

message(str_glue('Blood labs data will be written to {blood_labs_path}. Use this path when reading ',
                 'the data into your notebooks in the future.'))

# Perform the query and export the dataset to Cloud Storage as CSV files
bq_table_save(
  bq_dataset_query(Sys.getenv("WORKSPACE_CDR"), dataset_blood_labs_sql, billing = Sys.getenv("GOOGLE_PROJECT")),
  blood_labs_path,
  destination_format = "CSV")

## Load from GCS

#TODO: Runs into timeouts as memory is overloaded (330MB * 86)

In [None]:
read_blood_labs_from_workspace_bucket <- function(export_path) {
  col_types <- cols(
    person_id = col_character(),
    standard_concept_name = col_character(), 
    value_as_number = col_double(),
    measurement_datetime = col_character()
  )
  bind_rows(
    map(system2('gsutil', args = c('ls', export_path), stdout = TRUE, stderr = TRUE),
        function(csv) {
          message(str_glue('Loading {csv}.'))
          chunk <- read_csv(pipe(str_glue('gsutil cat {csv}')), col_types = col_types, show_col_types = FALSE)
          chunk
        }))
}

blood_labs_path <- file.path(
  Sys.getenv("WORKSPACE_BUCKET"),
  "bq_exports",
  Sys.getenv("OWNER_EMAIL"),
  strftime(lubridate::now(), "%Y%m%d"),
  "blood_labs_comprehensive",
  "blood_labs_comprehensive_*.csv")


# Read blood labs data from Cloud Storage
message("Reading comprehensive blood labs data from Cloud Storage...")
df_blood_raw <- read_blood_labs_from_workspace_bucket(blood_labs_path)

message(str_glue("Blood labs data loaded: {nrow(df_blood_raw)} rows, {ncol(df_blood_raw)} columns"))
head(df_blood_raw, 5)

## Process/merge on person_ids

In [None]:
# Function to process blood labs in batches
process_blood_labs_in_batches <- function(gcs_path, 
                                         output_file = "data/df_blood_batched.txt",
                                         batch_size = 5,  # Process 5 files at a time
                                         filters = NULL) {
  
  # Get list of all files
  file_list <- system2('gsutil', args = c('ls', gcs_path), stdout = TRUE, stderr = TRUE)
  n_files <- length(file_list)
  n_batches <- ceiling(n_files / batch_size)
  
  message(str_glue('Found {n_files} files. Processing in {n_batches} batches of {batch_size} files each.'))
  
  # Initialize output file (remove if exists)
  if(file.exists(output_file)) file.remove(output_file)
  
  # Process each batch
  for(batch in 1:n_batches) {
    start_idx <- (batch - 1) * batch_size + 1
    end_idx <- min(batch * batch_size, n_files)
    batch_files <- file_list[start_idx:end_idx]
    
    message(str_glue('Processing batch {batch}/{n_batches}: files {start_idx} to {end_idx}'))
    
    # Process this batch
    batch_data <- map_dfr(batch_files, function(csv_file) {
      tryCatch({
        message(str_glue('  Loading {basename(csv_file)}'))
        
        # Read with data.table for speed
        dt <- fread(
          pipe(str_glue('gsutil cat {csv_file}')),
          select = c("person_id", "standard_concept_name", "value_as_number", "measurement_datetime"),
          colClasses = list(character = c("person_id", "standard_concept_name", "measurement_datetime"),
                           numeric = "value_as_number")
        )
        
        # Apply filters if provided
        if(!is.null(filters)) {
          # Example filters - customize as needed
          dt <- dt[!is.na(value_as_number) & value_as_number > 0]
          
          # Filter by date if specified
          if("date_filter" %in% names(filters)) {
            dt[, measurement_date := as.Date(measurement_datetime)]
            dt <- dt[measurement_date >= as.Date(filters$date_filter)]
          }
          
          # Filter by lab types if specified
          if("lab_types" %in% names(filters)) {
            dt <- dt[standard_concept_name %in% filters$lab_types]
          }
        }
        
        return(as_tibble(dt))
        
      }, error = function(e) {
        message(str_glue('  ERROR loading {basename(csv_file)}: {e$message}'))
        return(tibble())
      })
    })
    
    # Save this batch to file (append mode)
    if(nrow(batch_data) > 0) {
      fwrite(batch_data, output_file, append = (batch > 1), sep = "\t")
      message(str_glue('  Batch {batch} processed: {nrow(batch_data)} rows'))
    }
    
    # Memory cleanup
    rm(batch_data)
    gc()
    
    # Optional: pause between batches to let system recover
    if(batch < n_batches) Sys.sleep(2)
  }
  
  message(str_glue('All batches processed. Final data saved to: {output_file}'))
  
  # Read and return final result
  return(fread(output_file, sep = "\t"))
}

In [None]:


# =============================================================================
# USAGE EXAMPLES
# =============================================================================

# Example 1: Process with basic filters
blood_labs_path <- "gs://your-bucket/bq_exports/.../blood_labs_comprehensive/blood_labs_comprehensive_*.csv"

# Define filters to reduce data size
my_filters <- list(
  date_filter = "2015-01-01",  # Only data from 2015 onwards
  lab_types = c(
    "Alanine aminotransferase",
    "Aspartate aminotransferase", 
    "Platelet count",
    "Glucose",
    "Albumin",
    "Hemoglobin",
    "Hematocrit"
  )
)

# Process in small batches
df_blood <- process_blood_labs_in_batches(
  gcs_path = blood_labs_path,
  output_file = "data/df_blood_batched.txt",
  batch_size = 3,  # Very small batches for safety
  filters = my_filters
)

# =============================================================================
# MONITOR MEMORY USAGE
# =============================================================================

# Function to monitor memory during processing
monitor_memory <- function() {
  mem_info <- gc()
  used_mb <- sum(mem_info[, 2]) * 8 / 1024  # Convert to MB
  message(str_glue('Memory used: {round(used_mb)} MB'))
  return(used_mb)
}

# Call this periodically during processing
monitor_memory()

## Blood Concept Explorer

In [None]:

# =============================================================================
# APPROACH 1: EXPLORE AVAILABLE LAB CONCEPTS
# =============================================================================

# First, run the lab concepts explorer query (ALL CONCEPTS - no filtering)
lab_concepts_explorer_sql <- paste("
SELECT 
    m_standard_concept.concept_id,
    m_standard_concept.concept_name as standard_concept_name,
    m_standard_concept.concept_code as standard_concept_code,
    m_standard_concept.vocabulary_id as vocabulary,
    COUNT(*) as measurement_count,
    COUNT(DISTINCT measurement.person_id) as unique_persons,
    MIN(measurement.measurement_datetime) as earliest_date,
    MAX(measurement.measurement_datetime) as latest_date,
    APPROX_QUANTILES(measurement.value_as_number, 4)[OFFSET(2)] as median_value,
    MIN(measurement.value_as_number) as min_value,
    MAX(measurement.value_as_number) as max_value
FROM 
    `", Sys.getenv("WORKSPACE_CDR"), ".measurement` measurement
LEFT JOIN
    `", Sys.getenv("WORKSPACE_CDR"), ".concept` m_standard_concept 
        ON measurement.measurement_concept_id = m_standard_concept.concept_id 
WHERE
    measurement.value_as_number IS NOT NULL
    AND measurement.measurement_datetime IS NOT NULL
    AND m_standard_concept.concept_name IS NOT NULL
GROUP BY 
    m_standard_concept.concept_id,
    m_standard_concept.concept_name,
    m_standard_concept.concept_code,
    m_standard_concept.vocabulary_id
HAVING 
    COUNT(*) >= 100
ORDER BY 
    measurement_count DESC", sep="")

# Run the explorer query
message("Running lab concepts explorer query...")
lab_concepts_df <- bq_table_download(bq_project_query(
    Sys.getenv("GOOGLE_PROJECT"), 
    query = str_glue(lab_concepts_explorer_sql)
))

# Save the lab concepts for inspection
write_xlsx(lab_concepts_df, "data/aou_lab_concepts_available.xlsx")
message("Available lab concepts saved to: data/aou_lab_concepts_available.xlsx")

# Display top 20 for quick inspection
message("Top 20 most common lab measurements:")
print(head(lab_concepts_df, 20))

# =============================================================================
# FUNCTION TO MATCH UKB BLOOD VALUES
# =============================================================================

match_ukb_blood_values <- function(ukb_file_path, aou_concepts_df) {
  # Load your UKB blood values file
  if(file.exists(ukb_file_path)) {
    ukb_blood <- read_excel(ukb_file_path)  # Adjust based on your file format
    
    # Perform fuzzy matching between UKB and AOU lab names
    # You can customize this matching logic
    matched_concepts <- aou_concepts_df %>%
      filter(
        str_detect(tolower(standard_concept_name), 
                   paste(tolower(ukb_blood$lab_name), collapse = "|"))  # Adjust column name
      )
    
    return(matched_concepts)
  } else {
    message("UKB file not found. Please provide the correct path.")
    return(aou_concepts_df)  # Return all concepts if no UKB file
  }
}

# Example usage (adjust the file path to your UKB blood values file)
# matched_labs <- match_ukb_blood_values("data/ukb_blood_values.xlsx", lab_concepts_df)

# =============================================================================
# APPROACH 2: HARMONIZED EXTRACTION WITH YEARLY MEANS (RECOMMENDED)
# =============================================================================

create_harmonized_blood_extraction <- function(selected_concept_ids = NULL) {
  
  # Base query for harmonized extraction
  base_harmonized_sql <- "
  SELECT
      measurement.person_id,
      m_standard_concept.concept_name as standard_concept_name,
      EXTRACT(YEAR FROM measurement.measurement_datetime) as year,
      CONCAT(CAST(measurement.person_id AS STRING), '_', CAST(EXTRACT(YEAR FROM measurement.measurement_datetime) AS STRING)) as person_id_year,
      AVG(measurement.value_as_number) as mean_value,
      COUNT(measurement.value_as_number) as measurement_count,
      MIN(measurement.measurement_datetime) as first_measurement_date,
      MAX(measurement.measurement_datetime) as last_measurement_date
  FROM 
      `measurement` measurement
  LEFT JOIN
      `concept` m_standard_concept 
          ON measurement.measurement_concept_id = m_standard_concept.concept_id 
  WHERE
      measurement.value_as_number IS NOT NULL
      AND measurement.measurement_datetime IS NOT NULL
      AND measurement.person_id IS NOT NULL
      AND m_standard_concept.concept_name IS NOT NULL
      AND measurement.measurement_datetime >= '2010-01-01'
      AND measurement.value_as_number > 0
      AND measurement.value_as_number < 1000000"
  
  # Add concept filtering if provided
  if(!is.null(selected_concept_ids)) {
    concept_filter <- paste0(" AND measurement.measurement_concept_id IN (", 
                            paste(selected_concept_ids, collapse = ","), ")")
    base_harmonized_sql <- paste0(base_harmonized_sql, concept_filter)
  }
  
  # Add GROUP BY and ORDER BY
  base_harmonized_sql <- paste0(base_harmonized_sql, "
  GROUP BY 
      measurement.person_id,
      m_standard_concept.concept_name,
      EXTRACT(YEAR FROM measurement.measurement_datetime)
  HAVING 
      COUNT(measurement.value_as_number) >= 1
  ORDER BY 
      measurement.person_id,
      standard_concept_name,
      year")
  
  return(base_harmonized_sql)
}

# =============================================================================
# USAGE EXAMPLES FOR ALL THREE APPROACHES
# =============================================================================

message("=== THREE EXTRACTION APPROACHES ===")
message("1. Lab Concepts Explorer: Complete ✓")
message("2. Harmonized Extraction: Ready to run")
message("3. Batch Processing: Ready to run")

message("\nNext steps:")
message("1. Inspect: data/aou_lab_concepts_available.xlsx")
message("2. Match against your UKB blood values")
message("3. Choose your preferred extraction approach")

# Example: Run harmonized extraction with top 20 concepts
# top_20_concepts <- head(lab_concepts_df$concept_id, 20)
# harmonized_sql <- create_harmonized_blood_extraction(top_20_concepts)
# message("Harmonized extraction SQL ready. Use bq_table_save() to run it.")

# For batch processing, use the previous batch_processor functions with:
# my_filters <- NULL  # No filtering

## Harmonized Blood Data yearly means

In [None]:
# =============================================================================
# HARMONIZED BLOOD DATA EXTRACTION WITH YEARLY MEANS - R SCRIPT
# =============================================================================
# This creates aggregated blood data (mean per person-year) in R using BigQuery


# =============================================================================
# HARMONIZED EXTRACTION FUNCTION
# =============================================================================

create_harmonized_blood_extraction <- function(selected_concept_ids = NULL, 
                                               start_year = 2010,
                                               min_measurements = 1) {
  
  # Base query for harmonized extraction
  base_harmonized_sql <- paste("
  SELECT
      measurement.person_id,
      m_standard_concept.concept_name as standard_concept_name,
      EXTRACT(YEAR FROM measurement.measurement_datetime) as year,
      CONCAT(CAST(measurement.person_id AS STRING), '_', CAST(EXTRACT(YEAR FROM measurement.measurement_datetime) AS STRING)) as person_id_year,
      AVG(measurement.value_as_number) as mean_value,
      COUNT(measurement.value_as_number) as measurement_count,
      MIN(measurement.measurement_datetime) as first_measurement_date,
      MAX(measurement.measurement_datetime) as last_measurement_date
  FROM 
      `", Sys.getenv("WORKSPACE_CDR"), ".measurement` measurement
  LEFT JOIN
      `", Sys.getenv("WORKSPACE_CDR"), ".concept` m_standard_concept 
          ON measurement.measurement_concept_id = m_standard_concept.concept_id 
  WHERE
      measurement.value_as_number IS NOT NULL
      AND measurement.measurement_datetime IS NOT NULL
      AND measurement.person_id IS NOT NULL
      AND m_standard_concept.concept_name IS NOT NULL
      AND measurement.measurement_datetime >= '", start_year, "-01-01'
      AND measurement.value_as_number > 0
      AND measurement.value_as_number < 1000000", sep="")
  
  # Add concept filtering if provided
  if(!is.null(selected_concept_ids)) {
    concept_filter <- paste0(" AND measurement.measurement_concept_id IN (", 
                            paste(selected_concept_ids, collapse = ","), ")")
    base_harmonized_sql <- paste0(base_harmonized_sql, concept_filter)
  }
  
  # Add GROUP BY and ORDER BY
  base_harmonized_sql <- paste0(base_harmonized_sql, "
  GROUP BY 
      measurement.person_id,
      m_standard_concept.concept_name,
      EXTRACT(YEAR FROM measurement.measurement_datetime)
  HAVING 
      COUNT(measurement.value_as_number) >= ", min_measurements, "
  ORDER BY 
      measurement.person_id,
      standard_concept_name,
      year")
  
  return(base_harmonized_sql)
}

# =============================================================================
# RUN HARMONIZED EXTRACTION
# =============================================================================

run_harmonized_blood_extraction <- function(selected_concept_ids = NULL,
                                           export_name = "blood_labs_harmonized",
                                           start_year = 2010) {
  
  # Create the SQL query
  harmonized_sql <- create_harmonized_blood_extraction(
    selected_concept_ids = selected_concept_ids,
    start_year = start_year
  )
  
  # Set up GCS export path
  harmonized_path <- file.path(
    Sys.getenv("WORKSPACE_BUCKET"),
    "bq_exports",
    Sys.getenv("OWNER_EMAIL"),
    strftime(lubridate::now(), "%Y%m%d"),
    export_name,
    paste0(export_name, "_*.csv")
  )
  
  message(str_glue('Harmonized blood data will be written to {harmonized_path}'))
  
  # Export to GCS
  bq_table_save(
    bq_dataset_query(Sys.getenv("WORKSPACE_CDR"), harmonized_sql, billing = Sys.getenv("GOOGLE_PROJECT")),
    harmonized_path,
    destination_format = "CSV"
  )
  
  return(harmonized_path)
}

# =============================================================================
# READ HARMONIZED DATA FROM GCS
# =============================================================================

read_harmonized_blood_from_gcs <- function(export_path) {
  
  # Function to read harmonized data
  read_bq_export_from_workspace_bucket <- function(export_path) {
    col_types <- cols(
      person_id = col_character(),
      standard_concept_name = col_character(),
      year = col_integer(),
      person_id_year = col_character(),
      mean_value = col_double(),
      measurement_count = col_integer(),
      first_measurement_date = col_character(),
      last_measurement_date = col_character()
    )
    
    bind_rows(
      map(system2('gsutil', args = c('ls', export_path), stdout = TRUE, stderr = TRUE),
          function(csv) {
            message(str_glue('Loading {csv}.'))
            chunk <- read_csv(pipe(str_glue('gsutil cat {csv}')), col_types = col_types, show_col_types = FALSE)
            chunk
          }))
  }
  
  # Read the data
  df_harmonized <- read_bq_export_from_workspace_bucket(export_path)
  
  message(str_glue("Harmonized blood data loaded: {nrow(df_harmonized)} rows, {ncol(df_harmonized)} columns"))
  
  return(df_harmonized)
}

# =============================================================================
# CONVERT TO WIDE FORMAT (Like your existing pipeline)
# =============================================================================

convert_to_wide_format <- function(df_harmonized) {
  
  # Convert to wide format (one row per person_id_year with lab columns)
  df_blood_wide <- df_harmonized %>%
    select(person_id, person_id_year, year, standard_concept_name, mean_value) %>%
    pivot_wider(
      names_from = standard_concept_name,
      values_from = mean_value,
      id_cols = c(person_id, person_id_year, year)
    )
  
  message(str_glue("Wide format data: {nrow(df_blood_wide)} rows, {ncol(df_blood_wide)} columns"))
  
  return(df_blood_wide)
}

# =============================================================================
# USAGE EXAMPLES
# =============================================================================

# Example 1: Extract ALL lab measurements (harmonized)
run_all_labs_harmonized <- function() {
  
  message("Running harmonized extraction for ALL lab measurements...")
  
  # Run without concept filtering (gets everything)
  export_path <- run_harmonized_blood_extraction(
    selected_concept_ids = NULL,  # NULL = all concepts
    export_name = "blood_labs_all_harmonized",
    start_year = 2010
  )
  
  # Read the data
  df_harmonized <- read_harmonized_blood_from_gcs(export_path)
  
  # Convert to wide format
  df_blood_wide <- convert_to_wide_format(df_harmonized)
  
  # Save processed data
  write.table(df_blood_wide, "data/df_blood_harmonized.txt", sep="\t", quote=FALSE, row.names=FALSE)
  
  message("Harmonized blood data saved to: data/df_blood_harmonized.txt")
  
  return(df_blood_wide)
}

# Example 2: Extract specific concepts (after you choose them)
run_selected_labs_harmonized <- function(concept_ids) {
  
  message(str_glue("Running harmonized extraction for {length(concept_ids)} selected concepts..."))
  
  # Run with concept filtering
  export_path <- run_harmonized_blood_extraction(
    selected_concept_ids = concept_ids,
    export_name = "blood_labs_selected_harmonized",
    start_year = 2010
  )
  
  # Read and process
  df_harmonized <- read_harmonized_blood_from_gcs(export_path)
  df_blood_wide <- convert_to_wide_format(df_harmonized)
  
  # Save
  write.table(df_blood_wide, "data/df_blood_selected_harmonized.txt", sep="\t", quote=FALSE, row.names=FALSE)
  
  return(df_blood_wide)
}

# =============================================================================
# READY TO USE
# =============================================================================

message("=== HARMONIZED BLOOD EXTRACTION READY ===")
message("Usage:")
message("1. For ALL labs: df_blood <- run_all_labs_harmonized()")
message("2. For specific labs: df_blood <- run_selected_labs_harmonized(c(3013682, 3000963, ...))")
message("")
message("This will create data with person_id_year structure matching your existing pipeline!")