In [1]:
import pandas as pd
from prediction_utils.extraction_utils.database import BQDatabase

In [2]:
db = BQDatabase()



In [3]:
config_dict = {
    'dataset_project': 'som-rit-phi-starr-prod',
#     'dataset': 'starr_omop_cdm5_deid_1pcent_latest',
    'dataset': 'starr_omop_cdm5_deid_latest',
    'rs_dataset_project': 'som-nero-phi-nigam-starr',
    'rs_dataset': 'temp_dataset'
}

#### Defining CVD concepts (for determining history)

In [4]:
query = """
    WITH cvd_icd9_concepts as (
        SELECT concept_id, concept_name, concept_code 
        FROM {dataset_project}.{dataset}.concept
        WHERE vocabulary_id = 'ICD9CM'
            AND (
                concept_code LIKE '410%'
                OR concept_code LIKE '411%' 
                OR concept_code LIKE '413%' 
                OR concept_code LIKE '414%' 
                OR concept_code LIKE '430%' 
                OR concept_code LIKE '431%' 
                OR concept_code LIKE '432%' 
                OR concept_code LIKE '433%'
                OR concept_code LIKE '434%' 
                OR concept_code LIKE '436%' 
                OR concept_code = '427.31' 
                OR concept_code LIKE '428%'
            )
    )
    SELECT * FROM cvd_icd9_concepts
""".format_map(config_dict)
db.execute_sql_to_destination_table(query, "{rs_dataset_project}.{rs_dataset}.cvd_icd9_concepts".format_map(config_dict))
query = """
    WITH cvd_standard_concepts as (
        SELECT concept_id_2 as concept_id
        FROM {rs_dataset_project}.{rs_dataset}.cvd_icd9_concepts t1
        INNER JOIN {dataset_project}.{dataset}.concept_relationship as t2 
            ON t1.concept_id = t2.concept_id_1
        WHERE relationship_id = 'Maps to'
    ),
    cvd_concept_ancestors as (
        SELECT descendant_concept_id as concept_id
        FROM cvd_standard_concepts t1
        INNER JOIN {dataset_project}.{dataset}.concept_ancestor as t2
            ON t1.concept_id = t2.ancestor_concept_id
    )
    SELECT *
    FROM cvd_concept_ancestors
""".format_map(config_dict)
db.execute_sql_to_destination_table(query, "{rs_dataset_project}.{rs_dataset}.cvd_standard_concepts".format_map(config_dict))

#### Defining ASCVD concepts (MI + Stroke)

In [5]:
query = """
    WITH ascvd_icd9_concepts as (
        SELECT concept_id, concept_name, concept_code 
        FROM {dataset_project}.{dataset}.concept
        WHERE vocabulary_id = 'ICD9CM'
            AND (
                    ( -- MI
                        concept_code LIKE '410%'
                        AND concept_code NOT LIKE '410._2'
                    )
                OR 
                    ( -- Stroke
                        concept_code LIKE "430%" OR 
                        concept_code LIKE "431%" OR 
                        (concept_code LIKE "433%" AND concept_code NOT LIKE "433._0") OR 
                        (concept_code LIKE "434%" AND concept_code NOT LIKE "434._0") OR 
                        concept_code LIKE "436%"
                    )
            )
    )
    SELECT * FROM ascvd_icd9_concepts
""".format_map(config_dict)
db.execute_sql_to_destination_table(query, "{rs_dataset_project}.{rs_dataset}.ascvd_icd9_concepts".format_map(config_dict))

query = """
    WITH ascvd_standard_concepts as (
        SELECT concept_id_2 as concept_id
        FROM {rs_dataset_project}.{rs_dataset}.ascvd_icd9_concepts t1
        INNER JOIN {dataset_project}.{dataset}.concept_relationship as t2 
            ON t1.concept_id = t2.concept_id_1
        WHERE relationship_id = 'Maps to'
    ),
    ascvd_concept_ancestors as (
        SELECT descendant_concept_id as concept_id
        FROM ascvd_standard_concepts t1
        INNER JOIN {dataset_project}.{dataset}.concept_ancestor as t2
            ON t1.concept_id = t2.ancestor_concept_id
    )
    SELECT *
    FROM ascvd_concept_ancestors
""".format_map(config_dict)
db.execute_sql_to_destination_table(query, "{rs_dataset_project}.{rs_dataset}.ascvd_standard_concepts".format_map(config_dict))

#### Defining CHD concepts

In [6]:
query = """
    WITH chd_icd9_concepts as (
        SELECT concept_id, concept_name, concept_code 
        FROM {dataset_project}.{dataset}.concept
        WHERE vocabulary_id = 'ICD9CM'
            AND (
                concept_code LIKE '411%' 
                OR concept_code LIKE '413%' 
                OR concept_code LIKE '414%' 
            )
    )
    SELECT * FROM chd_icd9_concepts
""".format_map(config_dict)
db.execute_sql_to_destination_table(query, "{rs_dataset_project}.{rs_dataset}.chd_icd9_concepts".format_map(config_dict))
query = """
    WITH chd_standard_concepts as (
        SELECT concept_id_2 as concept_id
        FROM {rs_dataset_project}.{rs_dataset}.chd_icd9_concepts t1
        INNER JOIN {dataset_project}.{dataset}.concept_relationship as t2 
            ON t1.concept_id = t2.concept_id_1
        WHERE relationship_id = 'Maps to'
    ),
    chd_concept_ancestors as (
        SELECT descendant_concept_id as concept_id
        FROM chd_standard_concepts t1
        INNER JOIN {dataset_project}.{dataset}.concept_ancestor as t2
            ON t1.concept_id = t2.ancestor_concept_id
    )
    SELECT *
    FROM chd_concept_ancestors
""".format_map(config_dict)
db.execute_sql_to_destination_table(query, "{rs_dataset_project}.{rs_dataset}.chd_standard_concepts".format_map(config_dict))

#### Defining statins

In [7]:
query = """
    WITH antilipid_atc_concepts as (
        SELECT concept_id, concept_name, concept_code 
        FROM {dataset_project}.{dataset}.concept
        WHERE vocabulary_id = 'ATC'
            AND (
                concept_code IN (
                    'C10AA01',
                    'C10AA07',
                    'C10AA02',
                    'C10AA08',
                    'C10AA04',
                    'C10AA03',
                    'C10BA05',
                    'C10AA06',
                    'C10BX03',
                    'C10BA01',
                    'C10BA02',
                    'C10BA04'
                )
            )
    )
    SELECT * FROM antilipid_atc_concepts
""".format_map(config_dict)
db.execute_sql_to_destination_table(query, "{rs_dataset_project}.{rs_dataset}.antilipid_atc_concepts".format_map(config_dict))
query = """
    WITH antilipid_standard_concepts as (
        SELECT concept_id_2 as concept_id
        FROM {rs_dataset_project}.{rs_dataset}.antilipid_atc_concepts t1
        INNER JOIN {dataset_project}.{dataset}.concept_relationship as t2 
            ON t1.concept_id = t2.concept_id_1
        WHERE relationship_id = 'Maps to'
    ),
    antilipid_concept_ancestors as (
        SELECT descendant_concept_id as concept_id
        FROM antilipid_standard_concepts t1
        INNER JOIN {dataset_project}.{dataset}.concept_ancestor as t2
            ON t1.concept_id = t2.ancestor_concept_id
    )
    SELECT *
    FROM antilipid_concept_ancestors
""".format_map(config_dict)
db.execute_sql_to_destination_table(query, "{rs_dataset_project}.{rs_dataset}.antilipid_standard_concepts".format_map(config_dict))

#### Defining index times

In [8]:
query = """
WITH visits AS (
    SELECT t1.person_id, visit_occurrence_id, visit_concept_id, birth_datetime, visit_start_date, observation_period_start_date, observation_period_end_date,
        CAST(DATE_DIFF(CAST(visit_start_date AS DATE), CAST(birth_datetime AS DATE), DAY) AS FLOAT64) / 365.25 as age_in_years,
        CAST(DATE_DIFF(CAST(visit_start_date AS DATE), CAST(observation_period_start_date AS DATE), DAY) AS FLOAT64) / 365.25 as years_since_start,
        CAST(DATE_DIFF(CAST(observation_period_end_date AS DATE), CAST(visit_start_date AS DATE), DAY) AS FLOAT64) / 365.25 as years_until_end
    FROM {dataset_project}.{dataset}.visit_occurrence t1
    INNER JOIN {dataset_project}.{dataset}.person as t2
        ON t1.person_id = t2.person_id
    INNER JOIN {dataset_project}.{dataset}.observation_period as t3
        ON t1.person_id = t3.person_id
    WHERE 
        visit_concept_id = 9202
)
SELECT * 
FROM visits
WHERE years_since_start >= 1
    AND observation_period_end_date <= '2020-12-31'
    AND age_in_years >= 18.0
""".format_map(config_dict)
db.execute_sql_to_destination_table(
    query, 
    "{rs_dataset_project}.{rs_dataset}.ascvd_cohort_table".format_map(config_dict)
)

### Assign labels

In [9]:
## ASCVD Outomes (define event times)
query = """
WITH mi_stroke_outcomes AS (
    SELECT DISTINCT t3.person_id, t1.condition_start_date as event_date
    FROM {dataset_project}.{dataset}.condition_occurrence t1
    INNER JOIN {rs_dataset_project}.{rs_dataset}.ascvd_standard_concepts as t2 ON
        t1.condition_concept_id = t2.concept_id
    INNER JOIN {rs_dataset_project}.{rs_dataset}.ascvd_cohort_table as t3 ON
        t1.person_id = t3.person_id
    WHERE t1.condition_start_date > t3.visit_start_date
), chd_death_outcomes AS (
    SELECT DISTINCT t1.person_id, t1.condition_start_date as event_date
    FROM {dataset_project}.{dataset}.condition_occurrence t1
    INNER JOIN {rs_dataset_project}.{rs_dataset}.chd_standard_concepts as t2 ON
        t1.condition_concept_id = t2.concept_id
    INNER JOIN {rs_dataset_project}.{rs_dataset}.ascvd_cohort_table as t3 ON
        t1.person_id = t3.person_id
    INNER JOIN {dataset_project}.{dataset}.death as t4 ON
        t1.person_id = t4.person_id
    WHERE t1.condition_start_date > t3.visit_start_date 
        AND DATE_DIFF(t4.death_date, t1.condition_start_date, DAY) > 0 
        AND DATE_DIFF(t4.death_date, t1.condition_start_date, DAY) <= 365
), all_outcome_times AS (
    SELECT * FROM mi_stroke_outcomes
    UNION ALL
    SELECT * FROM chd_death_outcomes
), min_outcome_times_per_index_date AS (
    SELECT person_id, visit_occurrence_id, visit_start_date, MIN(event_date) as event_date
    FROM {rs_dataset_project}.{rs_dataset}.ascvd_cohort_table
    INNER JOIN all_outcome_times USING (person_id)
    WHERE event_date > visit_start_date
    GROUP BY person_id, visit_occurrence_id, visit_start_date
)
SELECT *, DATE_DIFF(event_date, visit_start_date, DAY) as days_until_event
FROM min_outcome_times_per_index_date
""".format_map(config_dict)
db.execute_sql_to_destination_table(
    query, 
    "{rs_dataset_project}.{rs_dataset}.ascvd_outcomes".format_map(config_dict)
)

In [10]:
## Statin History (label index visits that have prior statin exposure)
query = """
WITH statin_exposure AS (
    SELECT DISTINCT t3.person_id, t3.visit_occurrence_id, t3.visit_start_date, 1 as has_statin_history
    FROM {dataset_project}.{dataset}.drug_exposure t1
    INNER JOIN {rs_dataset_project}.{rs_dataset}.antilipid_standard_concepts as t2 ON
        t1.drug_concept_id = t2.concept_id
    INNER JOIN {rs_dataset_project}.{rs_dataset}.ascvd_cohort_table as t3 ON
        t1.person_id = t3.person_id
    WHERE drug_exposure_start_date < visit_start_date
) SELECT * FROM statin_exposure
""".format_map(config_dict)
db.execute_sql_to_destination_table(
    query, 
    "{rs_dataset_project}.{rs_dataset}.statin_history".format_map(config_dict)
)

In [11]:
# CVD History (label index visits that have prior statin exposure)
query = """
WITH cvd_history AS (
    SELECT DISTINCT t3.person_id, t3.visit_occurrence_id, t3.visit_start_date, 1 as has_cvd_history
    FROM {dataset_project}.{dataset}.condition_occurrence t1
    INNER JOIN {rs_dataset_project}.{rs_dataset}.cvd_standard_concepts as t2 ON
        t1.condition_concept_id = t2.concept_id
    INNER JOIN {rs_dataset_project}.{rs_dataset}.ascvd_cohort_table as t3 ON
        t1.person_id = t3.person_id
    WHERE condition_start_date < visit_start_date
) SELECT * FROM cvd_history
""".format_map(config_dict)
db.execute_sql_to_destination_table(
    query, 
    "{rs_dataset_project}.{rs_dataset}.cvd_history".format_map(config_dict)
)

In [12]:
## Attach the history and time to event labels to the cohort table
query = """
WITH cohort_with_history AS (
    SELECT * EXCEPT(has_statin_history, has_cvd_history),
            IFNULL(has_statin_history, 0) as has_statin_history, 
            IFNULL(has_cvd_history, 0) as has_cvd_history
    FROM {rs_dataset_project}.{rs_dataset}.ascvd_cohort_table
    LEFT JOIN {rs_dataset_project}.{rs_dataset}.statin_history USING (person_id, visit_occurrence_id, visit_start_date)
    LEFT JOIN {rs_dataset_project}.{rs_dataset}.cvd_history USING (person_id, visit_occurrence_id, visit_start_date)
), cohort_with_labels AS (
    SELECT *
    FROM cohort_with_history
    LEFT JOIN {rs_dataset_project}.{rs_dataset}.ascvd_outcomes USING (person_id, visit_start_date, visit_occurrence_id)
) SELECT * FROM cohort_with_labels
""".format_map(config_dict)
db.execute_sql_to_destination_table(
    query, 
    "{rs_dataset_project}.{rs_dataset}.ascvd_cohort_labeled".format_map(config_dict)
)

In [13]:
label_config = {
    'ascvd_1yr': {
        'max_index_date': "2018-12-31",
        'event_followup_days': 365
    },
    'ascvd_5yr': {
        'max_index_date': '2014-12-31',
        'event_followup_days': 5*365.25
    },
    'ascvd_10yr': {
        'max_index_date': '2009-12-31',
        'event_followup_days': 10*365.25
    }
}

In [14]:
cohort_filter_query = """
    SELECT *, CAST(((days_until_event is NOT NULL) AND (days_until_event BETWEEN 0 AND {event_followup_days})) AS INT64) AS ascvd
    FROM {rs_dataset_project}.{rs_dataset}.ascvd_cohort_labeled
    WHERE visit_start_date <= CAST("{max_index_date}" AS DATE)
"""

In [15]:
db.execute_sql_to_destination_table(
    cohort_filter_query.format_map(
        {**config_dict, **label_config['ascvd_1yr']}
    ), 
    "{rs_dataset_project}.{rs_dataset}.ascvd_cohort_1yr".format_map(
        config_dict
    )
)
db.execute_sql_to_destination_table(
    cohort_filter_query.format_map(
        {**config_dict, **label_config['ascvd_5yr']}
    ), 
    "{rs_dataset_project}.{rs_dataset}.ascvd_cohort_5yr".format_map(
        config_dict
    )
)
db.execute_sql_to_destination_table(
    cohort_filter_query.format_map(
        {**config_dict, **label_config['ascvd_10yr']}
    ), 
    "{rs_dataset_project}.{rs_dataset}.ascvd_cohort_10yr".format_map(
        config_dict
    )
)

In [16]:
sample_query = """
    SELECT * EXCEPT (rnd, pos), 
    FARM_FINGERPRINT(GENERATE_UUID()) as prediction_id
    FROM (
        SELECT *, ROW_NUMBER() OVER(PARTITION BY person_id ORDER BY rnd) AS pos
        FROM (
            SELECT 
                *,
                FARM_FINGERPRINT(CONCAT(CAST(person_id AS STRING), CAST(visit_occurrence_id AS STRING))) as rnd
            FROM {base_query}
        )
    )
    WHERE pos = 1
"""
db.execute_sql_to_destination_table(
    sample_query.format_map(
        {**config_dict,
         **{'base_query':"{rs_dataset_project}.{rs_dataset}.ascvd_cohort_1yr".format_map(config_dict)}
        }
    ), 
    "{rs_dataset_project}.{rs_dataset}.ascvd_cohort_1yr_sampled".format_map(
        config_dict
    )
)
db.execute_sql_to_destination_table(
    sample_query.format_map(
        {**config_dict,
         **{'base_query':"{rs_dataset_project}.{rs_dataset}.ascvd_cohort_5yr".format_map(config_dict)}
        }
    ), 
    "{rs_dataset_project}.{rs_dataset}.ascvd_cohort_5yr_sampled".format_map(
        config_dict
    )
)
db.execute_sql_to_destination_table(
    sample_query.format_map(
        {
            **config_dict,
            **{'base_query':"{rs_dataset_project}.{rs_dataset}.ascvd_cohort_10yr".format_map(config_dict)}
        }
    ), 
    "{rs_dataset_project}.{rs_dataset}.ascvd_cohort_10yr_sampled".format_map(
        config_dict
    )
)

In [17]:
query = """
    SELECT * 
    FROM {rs_dataset_project}.{rs_dataset}.ascvd_cohort_1yr_sampled
""".format_map(config_dict)
cohort_1yr = db.read_sql_query(query)
query = """
    SELECT * 
    FROM {rs_dataset_project}.{rs_dataset}.ascvd_cohort_5yr_sampled
""".format_map(config_dict)
cohort_5yr = db.read_sql_query(query)
query = """
    SELECT * 
    FROM {rs_dataset_project}.{rs_dataset}.ascvd_cohort_10yr_sampled
""".format_map(config_dict)
cohort_10yr = db.read_sql_query(query)

Downloading: 100%|██████████| 563824/563824 [00:04<00:00, 127668.42rows/s]
Downloading: 100%|██████████| 247099/247099 [00:02<00:00, 90002.92rows/s]
Downloading: 100%|██████████| 54510/54510 [00:01<00:00, 31928.03rows/s]
