# Data Pipeline for Metformin vs DPP4 Study

In [1]:
%pip install sqlparse

Collecting sqlparse
  Using cached sqlparse-0.4.2-py3-none-any.whl (42 kB)
Installing collected packages: sqlparse
Successfully installed sqlparse-0.4.2
Note: you may need to restart the kernel to use updated packages.


In [2]:
import snowflake.connector
import getpass
import sys
import pandas as pd
import sqlparse

### Connections

In [3]:
# Configure Snowflake Connector
SNOWFLAKE_CREDS_DICT = {
    "SNOWFLAKE_USER": "YWEI",
    "SNOWFLAKE_PASSWORD": getpass.getpass()
}
SNOWFLAKE_CONFIG_DICT = {
    'account':'komodohealth',
    'database':'SANDBOX_KOMODO',
    'warehouse': 'XLARGE_WH',
    'role': 'ANALYST'
}
SF_DICT = {
  'sfURL':SNOWFLAKE_CONFIG_DICT['account'] + '.snowflakecomputing.com',
  'sfUser':SNOWFLAKE_CREDS_DICT['SNOWFLAKE_USER'],
  'sfPassword':SNOWFLAKE_CREDS_DICT['SNOWFLAKE_PASSWORD'],
  'sfDatabase':SNOWFLAKE_CONFIG_DICT['database'],
  'sfWarehouse':SNOWFLAKE_CONFIG_DICT['warehouse'],
  'sfRole': SNOWFLAKE_CONFIG_DICT['role'],
  'tracing':'All',
}   
ctx = snowflake.connector.connect(
  user=SF_DICT['sfUser'],
  password=SF_DICT['sfPassword'],
  account=SNOWFLAKE_CONFIG_DICT['account'],
  )

········


In [4]:
# Switch database, warehouse and role
ctx.cursor().execute('USE ROLE ' + SF_DICT['sfRole'])
ctx.cursor().execute('USE WAREHOUSE ' + SF_DICT['sfWarehouse'])
ctx.cursor().execute('USE ' + SF_DICT['sfDatabase'])
ctx.cursor().execute('USE SCHEMA ' + 'YWEI')

<snowflake.connector.cursor.SnowflakeCursor at 0x7f87eb2b9490>

In [5]:
sql = """
select * from METDPP4_DEF_V4
"""
pd.read_sql(sql, ctx)



Unnamed: 0,CODE,CODETYPE,DESCRIPTION,TYPE,NOTES
0,E08,ICD10CM,Diabetes mellitus due to underlying condition,Diabetes due to underlying condition,
1,E080,ICD10CM,Diabetes mellitus due to underlying condition ...,Diabetes due to underlying condition,
2,E0800,ICD10CM,Diabetes mellitus due to underlying condition ...,Diabetes due to underlying condition,
3,E0801,ICD10CM,Diabetes mellitus due to underlying condition ...,Diabetes due to underlying condition,
4,E081,ICD10CM,Diabetes mellitus due to underlying condition ...,Diabetes due to underlying condition,
...,...,...,...,...,...
621,33738,IN RxCUI,pioglitazone,Thiazolidinediones,
622,72610,IN RxCUI,troglitazone,Thiazolidinediones,
623,84108,IN RxCUI,rosiglitazone,Thiazolidinediones,
624,J1815,HCPCS,"Injection, insulin, per 5 units",Insulins,


### Data Asset Needed

Cohort Creation (2016-2021)
1. All Metformin/DPP4 Rx Claims
2. All Diabetes claims

Covariates (2017-2020)
1. CCI Variables 2017-2020 for eligible cohort (or all patients?)
2. ER visits 
3. CKD
4. Obesity

Outcome (2017 - )
1. Hypoglycemia
2. ER visits

Solution:   
1. Query all Rx from the list  
2. Query all Mx from the list restricted to patients with any Rx  


### Helper Functions for each step

In [6]:
def send_query(connection = None, sql = "", execute = False, verbose = True):
    if not execute or verbose:
        print(sqlparse.format(sql, reindent = True))
        
    if execute and connection is not None:
        connection.cursor().execute(sql)
    return

def load_data(connection = None, table = "", limit = None):
#     sql = """
#     select * from METDPP4_V3_COHORT_FUNNEL_183_BY_YEAR 
#     ORDER BY cohort, year, criteria
#     """
    sql = f"""
        select * from {table}
    """
    if limit is not None:
        sql += f"""
            limit {limit}
        """
    sql += ";"
    #pd.set_option('display.max_rows', 100)
    df = pd.read_sql(sql, connection)
    return df

In [7]:
def get_encounters(
    connection = None,
    prefix = 'METDPP4_V4',
    query_start_date = '2016-01-01',
    query_end_date = '2022-12-31',
    mx_version = '20220405',
    rx_version = '20220409',
    cci_version = '20220404',
    execute = False,
    verbose = True
    ):
    '''
    query all data needed from encounters into tables
    '''
    
    rx_enc = f"MAP_ENCOUNTERS.RX_ENCOUNTERS_{rx_version}.RX_ENCOUNTER_LS_GA"
    rx_def = 'METDPP4_V4_DEF_NDC'
    
    mx_lite_enc = f"MAP_ENCOUNTERS.MX_ENCOUNTERS_{mx_version}.ENCOUNTERSMX_LITE_LS_GA"
    mx_enc = f"MAP_ENCOUNTERS.MX_ENCOUNTERS_{mx_version}.ENCOUNTERSMX_LS_GA"
    mx_visit = f"MAP_ENCOUNTERS.MX_ENCOUNTERS_{mx_version}.VISITS_LS_GA"
    
    mx_def = 'METDPP4_V4_DEF'
    mx_def_ckd = 'METDPP4_V4_DEF_CKD'
    mx_def_obesity = 'METDPP4_V4_DEF_OBESITY'
    mx_def_cci = f"MAP_VOCABULARY.RXNORM_{cci_version}.CHARLSON_CODE"
    
    bene = f"MAP_ENCOUNTERS.MX_ENCOUNTERS_{mx_version}.BENEFICIARY_LS_GA"
    
    dpp4_list = "'%alogliptin%', '%linagliptin%', '%sitagliptin%', '%saxagliptin%'"
    
    # create all Rx tables
    sql_rx = f"""
        create or replace table {prefix}_ALL_RX as
        select upk_key2, claim_date, 
            patient_dob, patient_gender, patient_state, patient_zip,
            ndc, days_supply, quantity_dispensed,
            case 
                when NDC in (select NDC from {rx_def} 
                    where PRODUCT_NAME ilike '%metformin%' 
                        and multi_ingredient = FALSE)
                then 'metformin'
                when NDC in (select NDC from {rx_def} 
                    where PRODUCT_NAME ilike any ({dpp4_list})
                        and multi_ingredient = FALSE)
                then 'dpp4'
                when NDC in (select NDC from {rx_def} 
                    where PRODUCT_NAME ilike any ({dpp4_list}) 
                        and PRODUCT_NAME ilike '%metformin%')
                then 'metformin_dpp4'
                when NDC in (select NDC from {rx_def} 
                    where PRODUCT_NAME ilike '%metformin%' 
                        and multi_ingredient = TRUE)
                then 'metformin_multi'
                when NDC in (select NDC from {rx_def} 
                    where PRODUCT_NAME ilike any ({dpp4_list}) 
                        and multi_ingredient = TRUE)
                then 'dpp4_multi'
                else 'other'
            end as cohort
        from {rx_enc}
        where claim_date >= '{query_start_date}'
            and ndc in (select ndc from {rx_def})
        ;
    """
    
    # create all Mx tables, subset to patients with target Rx
    sql_mx = f"""
        create or replace table {prefix}_ALL_MX as
        select upk_key2, claim_date, 
            patient_dob, patient_gender, patient_state, patient_zip,
            diagnosis_array, procedure_array
        from {mx_lite_enc}
        where claim_date >= '{query_start_date}'
            and (
            arrays_overlap(
                diagnosis_array,
                (select array_agg(code) from {mx_def} where codetype = 'ICD10CM')
            ) or
            arrays_overlap(
                procedure_array,
                (select array_agg(code) from {mx_def} where codetype = 'HCPCS')
            ) or
            arrays_overlap(
                diagnosis_array,
                (select array_agg(code) from {mx_def_ckd})
            ) or            
            arrays_overlap(
                diagnosis_array,
                (select array_agg(code) from {mx_def_obesity})
            ) or            
            arrays_overlap(
                diagnosis_array,
                (select array_agg(icd) from {mx_def_cci})
            )
        ) and upk_key2 in (
            select distinct upk_key2 
            from {prefix}_ALL_RX
            where cohort in ('metformin', 'dpp4')
        )
        ;
    """
    
    # create all ER tables, subset to patients with target Rx
    sql_er = f"""
        create or replace table {prefix}_ALL_ER as
        select upk_key2, claim_date,
            visit_start_date, visit_end_date, visit_setting_of_care
        from {mx_enc} e
        inner join {mx_visit} v
        on e.visit_id = v.visit_id
        where claim_date >= '{query_start_date}'
        and visit_setting_of_care = 'Emergency Room Visit'
        and upk_key2 in (
            select distinct upk_key2 
            from {prefix}_ALL_RX
            where cohort in ('metformin', 'dpp4')
        )
        ;
    """
    
    # create all bene tables, subset to patients with target Rx
    sql_bene = f"""
        create or replace table {prefix}_ALL_BENE as
        select upk_key2, patient_dob, patient_gender, patient_state, patient_zip,
            eligibility_start_date, eligibility_end_date, closed_start_date, closed_end_date,
            medical_coverage_indicator, pharmacy_coverage_indicator, closed_indicator
        from {bene}
        where upk_key2 in (
            select distinct upk_key2 
            from {prefix}_ALL_RX
            where cohort in ('metformin', 'dpp4')
        )
        ;
    """
    
    # send_query(connection, sql_rx, execute, verbose)
    send_query(connection, sql_mx, execute, verbose)
    send_query(connection, sql_er, execute, verbose)
    send_query(connection, sql_bene, execute, verbose)
    
    return

In [8]:
# get_encounters(connection = ctx, execute = True, verbose = False)

In [9]:
def get_sql_ce_by_type(type, prefix = 'METDPP4_V4', grace_period = 45):
    if type == 'mx':
        cov_ind = 'MEDICAL_COVERAGE_INDICATOR'
    elif type == 'rx':
        cov_ind = 'PHARMACY_COVERAGE_INDICATOR'

    sql_ce = f"""    
        with bene_clean as (
            select distinct upk_key2,
            coalesce(closed_start_date, eligibility_start_date) as start_date,
            least(coalesce(closed_end_date, eligibility_end_date), current_date) as end_date
        from {prefix}_ALL_BENE AS bene
        where {cov_ind}
            and start_date <= end_date
            and start_date <= current_date
            and closed_start_date is not null and closed_end_date is not null and closed_indicator 
       )
       ,gap as (
           select upk_key2, 
            '1970-01-01' as gap_start_date, min(start_date-1) as gap_end_date
            from bene_clean
            group by upk_key2
            union all
            select upk_key2,
                max(end_date + 1) over (partition by upk_key2 order by start_date) as gap_start_date,
                lead(start_date - 1) over (partition by upk_key2 order by start_date) as gap_end_date
            from bene_clean
            qualify gap_end_date - gap_start_date + 1 >= {grace_period} 
            union all
            select upk_key2, 
                max(end_date+1) as gap_start_date, current_date as gap_end_date
            from bene_clean
            group by upk_key2
        )
        select upk_key2,
        gap_end_date+1 as start_date,
            lead(gap_start_date-1) over (partition by upk_key2 order by gap_start_date) as end_date
        from gap
        qualify end_date IS NOT null
    """
    return sql_ce
    
def get_ce(
    connection = None,
    prefix = 'METDPP4_V4',
    grace_period = 45,
    execute = False,
    verbose = True):
    
    sql_ce_mx = get_sql_ce_by_type(type = "mx", prefix = prefix, grace_period = grace_period)
    sql_ce_rx = get_sql_ce_by_type(type = "rx", prefix = prefix, grace_period = grace_period)
    
    sql_ce = f"""  
        create or replace table {prefix}_ELIG as
        with mx as ({sql_ce_mx}), rx as ({sql_ce_rx})
        select mx.upk_key2, 
            greatest(mx.start_date, rx.start_date) as start_date,
            least(mx.end_date, rx.end_date) as end_date
            from mx inner join rx
            on mx.upk_key2 = rx.upk_key2
            where mx.start_date <= rx.end_date and mx.end_date >= rx.start_date
        ;
    """
    
    send_query(connection, sql_ce, execute, verbose)
    return

In [10]:
def get_sql_split_fun(grace_period = 45):
    sql_split = f"""
create or replace function SPLIT_RANGES(dates variant)
   returns variant
   language javascript
as '
return DATES
            .sort(function (a, b) {{ return a.start - b.start || a.end - b.end; }})
            .reduce(function (r, a) {{
                  var last = r[r.length - 1] || [];
                  if (last.start <= a.start && a.start <= last.end + {grace_period}*24*60*60) {{
                        if (last.end < a.end) {{
                              last.end = a.end;
                        }}
                        return r;
                  }}
                  return r.concat(a);
            }}, []);
'
;"""
    return sql_split

def get_sql_ce_by_type_kh(type, prefix = 'METDPP4_V4', grace_period = 45):
    if type == 'mx':
        cov_ind = 'MEDICAL_COVERAGE_INDICATOR'
    elif type == 'rx':
        cov_ind = 'PHARMACY_COVERAGE_INDICATOR'

    sql_ce = f"""    
        with b as (
            SELECT distinct upk_key2, 
            closed_start_date AS start_date_date, 
            date_part('EPOCH_SECOND', to_timestamp(start_date_date)) as start_date, 
            least(closed_end_date, current_date) as end_date_date,
            date_part('EPOCH_SECOND', to_timestamp(end_date_date)) as end_date, 
            max(end_date) over (partition by upk_key2, start_date) as max_end_date, 
            min(start_date) over (partition by upk_key2, end_date) as min_start_date
        FROM {prefix}_ALL_BENE
        WHERE {cov_ind} 
            and closed_start_date IS NOT NULL 
            AND closed_end_date IS NOT NULL 
            AND start_date <= end_date 
            AND start_date <= date_part('EPOCH_SECOND', to_timestamp(current_date)) 
            and closed_indicator 
            qualify max_end_date=end_date and min_start_date=start_date
        ),
        g as (
            select upk_key2, 
                arrayagg(object_construct('start', start_date, 'end', end_date)) as ranges 
                from b
                group by upk_key2
        ), 
        non_overlap as (
            select upk_key2, ranges, SPLIT_RANGES(ranges) as clean_ranges from g
        )
        select upk_key2, 
            dateadd(day, 1, to_date(to_timestamp(clean_range.value:start))) as start_date,
            dateadd(day, 1, to_date(to_timestamp(clean_range.value:end))) as end_date
        from non_overlap, lateral flatten(input => clean_ranges) as clean_range
    """
    return sql_ce

def get_ce_kh(
    connection = None,
    prefix = 'METDPP4_V4',
    grace_period = 45,
    execute = False,
    verbose = True):
    
    sql_split = get_sql_split_fun(grace_period = grace_period)
    sql_ce_mx = get_sql_ce_by_type_kh(type = "mx", prefix = prefix, grace_period = grace_period)
    sql_ce_rx = get_sql_ce_by_type_kh(type = "rx", prefix = prefix, grace_period = grace_period)
    
    sql_ce = f"""  
        create or replace table {prefix}_ELIG_KH as
        with mx as ({sql_ce_mx}), rx as ({sql_ce_rx})
        select mx.upk_key2, 
            greatest(mx.start_date, rx.start_date) as start_date,
            least(mx.end_date, rx.end_date) as end_date
            from mx inner join rx
            on mx.upk_key2 = rx.upk_key2
            where mx.start_date <= rx.end_date and mx.end_date >= rx.start_date
        ;
    """
    send_query(connection, sql_split, execute, verbose)
    send_query(connection, sql_ce, execute, verbose)
    return    

In [11]:
# get_ce(connection = ctx, execute = True, verbose = False)

In [12]:
# this generate enrollment tables identical to the results above
# get_ce_kh(connection = ctx, execute = True, verbose = False)

In [13]:
def get_all_potential(
    connection = None,
    prefix = 'METDPP4_V4',
    cohort_start_date = '2017-01-01',
    cohort_end_date = '2020-12-31',
    execute = False,
    verbose = True):
    '''
    this table contains all Rx that is possibly an index date
    index date is not necessarily the first rx. 
    Someone could have had an Rx many years earlier but not eligible,
    then another Rx many years later becomes an eligible index date.
    '''

    mx_def = 'METDPP4_V4_DEF'
    
    sql_all_potential = f"""  
        create or replace table {prefix}_ALL_POTENTIAL as
        with input_rx as (
            select upk_key2, claim_date, cohort, days_supply, quantity_dispensed
            from {prefix}_ALL_RX
            where cohort in ('metformin', 'dpp4')
                and claim_date >= '{cohort_start_date}' and claim_date <= '{cohort_end_date}'),
        single_rx as (
            select upk_key2, claim_date, cohort,
                max(days_supply) as days_supply, 
                max(quantity_dispensed) as quantity_dispensed,
                count(*) as n_claim
            from input_rx
            group by upk_key2, claim_date, cohort),
        single_rx_elig as (
            select single_rx.upk_key2, claim_date,
                count_if(start_date - claim_date <= -365 and end_date - claim_date >= -1) as elig_b_365_1,
                count_if(start_date - claim_date <= 0 and end_date - claim_date >= 183) as elig_f_0_183,
                count_if(start_date - claim_date <= 0 and end_date - claim_date >= 365) as elig_f_0_365
            from single_rx
            left join {prefix}_ELIG elig
            on single_rx.upk_key2 = elig.upk_key2
            group by single_rx.upk_key2, claim_date),
        diab as (
            select upk_key2, claim_date from {prefix}_ALL_MX
            where arrays_overlap(DIAGNOSIS_ARRAY, (
                select array_agg(code) from {mx_def} 
                where codetype = 'ICD10CM' 
                    and description ilike '%iabetes%'
                    and description not ilike '%ypoglycemia%'
                ))
        ),
        single_rx_diab as (
            select single_rx.upk_key2, single_rx.claim_date, 
                count_if(-365 <= diab.claim_date - single_rx.claim_date and diab.claim_date - single_rx.claim_date <= -31) as n_diab_b_365_31,
                count_if(-30 <= diab.claim_date - single_rx.claim_date and diab.claim_date - single_rx.claim_date <= -1) as n_diab_b_30_1,
                count_if(0 <= diab.claim_date - single_rx.claim_date and diab.claim_date - single_rx.claim_date <= 183) as n_diab_f_0_183,
                count_if(0 <= diab.claim_date - single_rx.claim_date and diab.claim_date - single_rx.claim_date <= 365) as n_diab_f_0_365
            from single_rx
            left join diab
            on single_rx.upk_key2 = diab.upk_key2
                and -365 <= diab.claim_date - single_rx.claim_date
                and diab.claim_date - single_rx.claim_date <= 365
            group by single_rx.upk_key2, single_rx.claim_date
        ),
        all_rx as (
            select upk_key2, claim_date, cohort from {prefix}_ALL_RX
            union
            select upk_key2, claim_date, 'insulins' as cohort from {prefix}_ALL_MX
            where arrays_overlap(procedure_array, (
                select array_agg(code) from {mx_def} 
                where codetype = 'HCPCS' 
                    and description ilike '%nsulins%'
                ))
        ),
        single_rx_clean as (
            select single_rx.upk_key2, single_rx.claim_date,
                count_if(-365 <= all_rx.claim_date - single_rx.claim_date  
                         and all_rx.claim_date - single_rx.claim_date <= -1 
                         and all_rx.cohort = 'metformin'
                        ) as n_met_b_365_1,
                count_if(-365 <= all_rx.claim_date - single_rx.claim_date 
                         and all_rx.claim_date - single_rx.claim_date <= -1 
                         and all_rx.cohort = 'dpp4'
                        ) as n_dpp4_b_365_1,
                count_if(-365 <= all_rx.claim_date - single_rx.claim_date 
                         and all_rx.claim_date - single_rx.claim_date <= -1 
                         and all_rx.cohort not in ('metformin', 'dpp4')
                        ) as n_multi_b_365_1,
                count_if(0 <= all_rx.claim_date - single_rx.claim_date 
                         and all_rx.claim_date - single_rx.claim_date <= 183 
                         and all_rx.cohort = 'metformin'
                        ) as n_met_f_0_183,
                count_if(0 <= all_rx.claim_date - single_rx.claim_date 
                         and all_rx.claim_date - single_rx.claim_date <= 183
                         and all_rx.cohort = 'dpp4'
                        ) as n_dpp4_f_0_183,
                count_if(0 <= all_rx.claim_date - single_rx.claim_date 
                         and all_rx.claim_date - single_rx.claim_date <= 183
                         and all_rx.cohort not in ('metformin', 'dpp4')
                        ) as n_multi_f_0_183,
                count_if(0 <= all_rx.claim_date - single_rx.claim_date 
                         and all_rx.claim_date - single_rx.claim_date <= 365 
                         and all_rx.cohort = 'metformin'
                        ) as n_met_f_0_365,
                count_if(0 <= all_rx.claim_date - single_rx.claim_date 
                         and all_rx.claim_date - single_rx.claim_date <= 365
                         and all_rx.cohort = 'dpp4'
                        ) as n_dpp4_f_0_365,
                count_if(0 <= all_rx.claim_date - single_rx.claim_date 
                         and all_rx.claim_date - single_rx.claim_date <= 365
                         and all_rx.cohort not in ('metformin', 'dpp4')
                        ) as n_multi_f_0_365
            from single_rx
            inner join all_rx
            on single_rx.upk_key2 = all_rx.upk_key2
                and -365 <= all_rx.claim_date - single_rx.claim_date 
                and all_rx.claim_date - single_rx.claim_date <= 365
            group by single_rx.upk_key2, single_rx.claim_date
        )
        select r.upk_key2, r.claim_date,
            cohort, n_claim,
            count(distinct cohort) over(partition by r.upk_key2, r.claim_date) as n_cohort,
            elig_b_365_1, elig_f_0_183, elig_f_0_365,
            n_met_b_365_1, n_dpp4_b_365_1, n_multi_b_365_1,
            n_met_f_0_183, n_dpp4_f_0_183, n_multi_f_0_183,
            n_met_f_0_365, n_dpp4_f_0_365, n_multi_f_0_365,
            n_diab_b_365_31, n_diab_b_30_1, n_diab_f_0_183, n_diab_f_0_365
        from single_rx r
        left join single_rx_elig e
        on r.upk_key2 = e.upk_key2 and r.claim_date = e.claim_date
        left join single_rx_clean c
        on r.upk_key2 = c.upk_key2 and r.claim_date = c.claim_date
        left join single_rx_diab d
        on r.upk_key2 = d.upk_key2 and r.claim_date = d.claim_date
        ;
    """
    send_query(connection, sql_all_potential, execute, verbose)  
    return
    

In [14]:
# get_all_potential(connection = ctx, execute = True, verbose = False)

In [15]:
### all following steps separate between 183 or 365 at-risk window
def get_first_potential(
    connection = None,
    prefix = 'METDPP4_V4',
    at_risk = 183,
    execute = False,
    verbose = True):
    
    sql_first_potential = f"""  
        create or replace table {prefix}_FIRST_POTENTIAL_{at_risk} as
        select * from {prefix}_ALL_POTENTIAL
        where n_met_b_365_1 = 0 and n_dpp4_b_365_1 = 0 and n_multi_b_365_1 = 0
            and elig_b_365_1 > 0
            -- relax this condition
            -- and n_diab_b_365_31 = 0
            and n_cohort = 1
            and elig_f_0_{at_risk} > 0
            and ((cohort = 'metformin' and n_dpp4_f_0_{at_risk} + n_multi_f_0_{at_risk} = 0)
                or (cohort = 'dpp4' and n_met_f_0_{at_risk} + n_multi_f_0_{at_risk} = 0)
                )
            -- relax this condition
            -- and n_diab_b_30_1 + n_diab_f_0_{at_risk} >= 1
        qualify claim_date = min(claim_date) over (partition by upk_key2)
        ;
    """
    send_query(connection, sql_first_potential, execute, verbose)
    return

In [16]:
# get_first_potential(connection = ctx, execute = True, verbose = False)

In [17]:
### all following steps separate between 183 or 365 at-risk window
def get_adherence(
    connection = None,
    prefix = 'METDPP4_V4',
    at_risk = 183,
    execute = False,
    verbose = True):
    
    sql_adherence = f"""  
        create or replace table {prefix}_ADHERENCE_{at_risk} as
        with 
        input_rx as (
            select upk_key2, cohort, claim_date, days_supply
            from {prefix}_ALL_RX 
            where cohort in ('metformin', 'dpp4')
        ),
        distinct_claims as (
            select distinct e.upk_key2, e.cohort,
                d.claim_date as fill_start_date,
                case
                    when d.days_supply <= 3 then 30
                    when d.days_supply >= 360 then 30
                    else d.days_supply
                end as rx_days_supply_clean
            from {prefix}_FIRST_POTENTIAL_{at_risk} e
            left join input_rx d 
            on e.upk_key2 = d.upk_key2 and e.cohort = d.cohort 
                and e.claim_date <= d.claim_date and e.claim_date + {at_risk} >= d.claim_date
        )
        -- Patient aggregations
        , patient_aggs as (
        select *
             , min(fill_start_date) over (partition by upk_key2) as first_period_date
             , dateadd('day', rx_days_supply_clean, fill_start_date) as fill_end_date
             , fill_end_date - fill_start_date as revised_days_supply
             , max(fill_end_date) over (partition by upk_key2) as last_period_date
             , count(*) over (partition by upk_key2) as n_claims
        from distinct_claims
        )
        -- For each patient, calculate non-overlapping time frames between fill_start_date and fill_end_date
        , pdc_dates as (
            select s1.upk_key2,
                   s1.cohort,
                   s1.first_period_date,
                   s1.fill_start_date,
                   MIN(t1.fill_end_date) as min_fill_end_date,
                   datediff('day', s1.fill_start_date, least(min_fill_end_date, s1.first_period_date + {at_risk} - 1)) as covered_days
            from patient_aggs s1
            inner join patient_aggs t1 on s1.fill_start_date <= t1.fill_end_date
                                          and s1.upk_key2 = t1.upk_key2
            and not exists (select * from patient_aggs t2
                           where t1.fill_end_date >= t2.fill_start_date and t1.fill_end_date < t2.fill_end_date
                           and t1.upk_key2 = t2.upk_key2
                           )
            where not exists (select * from patient_aggs s2
                             where s1.fill_start_date > s2.fill_start_date and s1.fill_start_date <= s2.fill_end_date
                             and s1.upk_key2 = s2.upk_key2
                             )
            group by 1,2,3,4
            order by s1.fill_start_date)
        --Calculate numerator for PDC
        , pdc_results as (
          select upk_key2,
            sum(covered_days) as days_in_period_covered
          from pdc_dates
          group by 1
        )
        select distinct dc.upk_key2, dc.cohort,
            first_period_date as claim_date,
            first_period_date,
            last_period_date,
            datediff('day', first_period_date, last_period_date) as days_in_period,
            {at_risk} as days_in_period2,
            days_in_period_covered,
            sum(revised_days_supply) over (partition by dc.upk_key2) as sum_days_supply,
            round(sum_days_supply/days_in_period,2) as mpr,
            round(days_in_period_covered/days_in_period,2) as pdc,
            round(least(days_in_period_covered, {at_risk})/days_in_period2,2) as pdc2,
            iff(mpr > 0.8, TRUE, FALSE) as mpr_compliant,
            iff(pdc > 0.8, TRUE, FALSE) as pdc_compliant,
            iff(pdc2 > 0.8, TRUE, FALSE) as pdc2_compliant
        from patient_aggs dc
        left join pdc_results pdc on pdc.upk_key2 = dc.upk_key2
        ;
    """
    send_query(connection, sql_adherence, execute, verbose)
    return

In [18]:
# get_adherence(connection = ctx, execute = True, verbose = False)

In [19]:
### all following steps separate between 183 or 365 at-risk window
def get_final_cohort(
    connection = None,
    prefix = 'METDPP4_V4',
    at_risk = 183,
    execute = False,
    verbose = True):
    
    sql_final = f"""  
        create or replace table {prefix}_FINAL_{at_risk} as
        select distinct *, 
            year(claim_date) - year(patient_dob) as age
        from (
            select distinct a.*, c.pdc2,
            first_value(patient_dob) over (partition by a.upk_key2 order by patient_dob nulls last) as patient_dob,
            first_value(patient_gender) over (partition by a.upk_key2 order by patient_gender nulls last) as patient_gender,
            first_value(patient_state) over (partition by a.upk_key2 order by patient_state nulls last) as patient_state,
            first_value(patient_zip) over (partition by a.upk_key2 order by patient_zip nulls last) as patient_zip
        from {prefix}_FIRST_POTENTIAL_{at_risk} a
        left join {prefix}_ALL_BENE b
        on a.upk_key2 = b.upk_key2 and a.claim_date <= b.closed_end_date and a.claim_date >= b.closed_start_date
        left join {prefix}_ADHERENCE_{at_risk} c
        on a.upk_key2 = c.upk_key2
        -- where pdc2 > 0.8 -- not used
        );
    """
    send_query(connection, sql_final, execute, verbose)
    return

In [20]:
# get_final_cohort(connection = ctx, execute = True, verbose = False)

In [25]:
### all following steps separate between 183 or 365 at-risk window
def get_cohort_funnel(
    connection = None,
    prefix = 'METDPP4_V4',
    at_risk = 183,
    by_year = True,
    execute = False,
    verbose = True):
    
    if by_year:
        year_date_str = ', year(claim_date) as year'
        year_by_str = ', year'
        year_name_str = '_by_year'
    else:
        year_date_str = ''
        year_by_str = ''
        year_name_str = ''
    
    keep_str = f"""
        cohort{year_date_str}, count(distinct upk_key2) as n_bene, count(*) as n_rx
    """
    
    sql_funnel = f"""  
        
        create or replace table {prefix}_COHORT_FUNNEL_{at_risk}{year_name_str} as
        
        select 'a. all users in study window' as criteria, {keep_str}
        from {prefix}_ALL_POTENTIAL
        group by cohort{year_by_str}

        union all
        select 'b. continuous enrollment 365 days prior' as criteria, {keep_str}
        from {prefix}_ALL_POTENTIAL
        where elig_b_365_1 > 0
        group by cohort{year_by_str}

        union all
        select 'c. drug cleaning 365 days prior' as criteria, {keep_str}
        from {prefix}_ALL_POTENTIAL
        where elig_b_365_1 > 0
            and n_met_b_365_1 = 0 and n_dpp4_b_365_1 = 0 and n_multi_b_365_1 = 0
        group by cohort{year_by_str}

        union all
        select 'd. no same day two drugs' as criteria, {keep_str}
        from {prefix}_ALL_POTENTIAL
        where elig_b_365_1 > 0
            and n_met_b_365_1 = 0 and n_dpp4_b_365_1 = 0 and n_multi_b_365_1 = 0
            and n_cohort = 1
        group by cohort{year_by_str}

        -- relax this block
        /*
        union all
        select 'e. no diabetes dx 365 to 31 days prior' as criteria, {keep_str}
        from {prefix}_ALL_POTENTIAL
        where elig_b_365_1 > 0
            and n_met_b_365_1 = 0 and n_dpp4_b_365_1 = 0 and n_multi_b_365_1 = 0
            and n_cohort = 1
            and n_diab_b_365_31 = 0
        group by cohort{year_by_str}
        */

        union all
        select 'f. continuous enrollment {at_risk} days after index date' as criteria, {keep_str}
        from {prefix}_ALL_POTENTIAL
        where elig_b_365_1 > 0
            and n_met_b_365_1 = 0 and n_dpp4_b_365_1 = 0 and n_multi_b_365_1 = 0
            and n_cohort = 1
            -- and n_diab_b_365_31 = 0
            and elig_f_0_{at_risk} > 0
        group by cohort{year_by_str}

        union all
        select 'g. no drug switching {at_risk} days after index date' as criteria, {keep_str}
        from {prefix}_ALL_POTENTIAL
        where elig_b_365_1 > 0
            and n_met_b_365_1 = 0 and n_dpp4_b_365_1 = 0 and n_multi_b_365_1 = 0
            and n_cohort = 1
            -- and n_diab_b_365_31 = 0
            and elig_f_0_{at_risk} > 0
            and ((cohort = 'metformin' and n_dpp4_f_0_{at_risk} + n_multi_f_0_{at_risk} = 0) 
                or (cohort = 'dpp4' and n_met_f_0_{at_risk} + n_multi_f_0_{at_risk} = 0))
        group by cohort{year_by_str}

        -- relax this
        /*
        --union all
        --select 'h. w/ diabetes dx 30 days prior to {at_risk} days after' as criteria, {keep_str}
        --from {prefix}_ALL_POTENTIAL
        --where elig_b_365_1 > 0
        --    and n_met_b_365_1 = 0 and n_dpp4_b_365_1 = 0 and n_multi_b_365_1 = 0
        --    and n_cohort = 1
        --    and n_diab_b_365_31 = 0
        --    and elig_f_0_{at_risk} > 0
        --    and ((cohort = 'metformin' and n_dpp4_f_0_{at_risk} + n_multi_f_0_{at_risk} = 0) 
        --        or (cohort = 'dpp4' and n_met_f_0_{at_risk} + n_multi_f_0_{at_risk} = 0))
        --    and n_diab_b_30_1 + n_diab_f_0_{at_risk} >= 1
        --group by cohort{year_by_str}
        */
        
        union all
        select 'i. pick first eligible episode' as criteria, {keep_str}
        from {prefix}_FIRST_POTENTIAL_{at_risk}
        group by cohort{year_by_str}

        union all
        select 'j. PDC > 80%' as criteria, {keep_str}
        from {prefix}_ADHERENCE_{at_risk}
        where pdc2_compliant = TRUE
        group by cohort{year_by_str}
        ;
    """
    send_query(connection, sql_funnel, execute, verbose)
    return

In [27]:
get_cohort_funnel(connection = ctx, execute = True, verbose = False)

In [28]:
get_cohort_funnel(connection = ctx, by_year = False, execute = True, verbose = False)

In [30]:
### all following steps separate between 183 or 365 at-risk window
def get_covariates(
    connection = None,
    prefix = 'METDPP4_V4',
    at_risk = 183,
    cci_version = '20220404',
    execute = False,
    verbose = True):
    
    sql_cov = f"""
        create or replace table {prefix}_COV_{at_risk} as
        select a.upk_key2, a.claim_date, a.cohort,
            count_if(arrays_overlap(DIAGNOSIS_ARRAY, (select array_agg(code) from METDPP4_V4_DEF_CKD))) as ckd,
            count_if(arrays_overlap(DIAGNOSIS_ARRAY, (select array_agg(code) from METDPP4_V4_DEF_OBESITY))) as obesity
        from {prefix}_FINAL_{at_risk} a
        left join {prefix}_ALL_MX b
        on a.upk_key2 = b.upk_key2 and a.claim_date >= b.claim_date and a.claim_date - 365 <= b.claim_date
        group by a.upk_key2, a.claim_date, a.cohort
        ;
    """
    
    sql_er = f"""
        create or replace table {prefix}_ER_{at_risk} as
        select a.upk_key2, a.claim_date, a.cohort, count(distinct b.visit_start_date) as er
        from {prefix}_FINAL_{at_risk} a
        left join {prefix}_ALL_ER b
            on a.upk_key2 = b.upk_key2 and a.claim_date >= b.visit_start_date and a.claim_date - 365 <= b.visit_end_date
        group by a.upk_key2, a.claim_date, a.cohort
        ;
    """
    
    cci_def = f"MAP_VOCABULARY.RXNORM_{cci_version}.CHARLSON_CODE"
    cci_score = f"MAP_VOCABULARY.RXNORM_{cci_version}.CHARLSON_SCORE"
    
    sql_cci = f"""
        create or replace table {prefix}_CCI_{at_risk} as
        with mx as(
            select distinct a.upk_key2, a.claim_date, a.cohort, charlson_category
            from {prefix}_FINAL_{at_risk} a
            left join {prefix}_ALL_MX b
            on a.upk_key2 = b.upk_key2 and a.claim_date >= b.claim_date and a.claim_date - 365 <= b.claim_date
            left join {cci_def} c
            on array_contains(icd::variant, DIAGNOSIS_ARRAY)
            union
            select upk_key2, claim_date, cohort, case 
                when age < 50 then '< 50'
                when age < 60 then '50-59'
                when age < 70 then '60-69'
                when age < 80 then '70-79'
                when age >= 80 then '80+'
                else '< 50'
            end as charlson_category
            from {prefix}_FINAL_{at_risk}
        )
        select upk_key2, claim_date, cohort,
            sum(charlson_score) as charlson_score
        from mx a
        left join {cci_score} d
        on a.CHARLSON_CATEGORY = d.CHARLSON_CATEGORY
        group by a.upk_key2, a.claim_date, a.cohort
        ;
    """
    
    sql_combine = f"""
        create or replace table {prefix}_FINAL_W_COV_{at_risk} as
        select a.*, b.ckd, b.obesity, c.charlson_score, d.er
        from {prefix}_FINAL_{at_risk} a
        left join {prefix}_COV_{at_risk} b
        on a.upk_key2 = b.upk_key2
        left join {prefix}_CCI_{at_risk} c
        on a.upk_key2 = c.upk_key2
        left join {prefix}_ER_{at_risk} d
        on a.upk_key2 = d.upk_key2
        ;
    """
    send_query(connection, sql_cov, execute, verbose)
    send_query(connection, sql_er, execute, verbose)
    send_query(connection, sql_cci, execute, verbose)
    send_query(connection, sql_combine, execute, verbose)     
    return

In [31]:
# get_covariates(connection = ctx, execute = True, verbose = False)

In [32]:
### all following steps separate between 183 or 365 at-risk window
def get_outcome(
    connection = None,
    prefix = 'METDPP4_V4',
    at_risk = 183,
    execute = False,
    verbose = True):
    
    mx_def = 'METDPP4_V4_DEF'
    
    sql_outcome = f"""
        create or replace table {prefix}_OUTCOME_{at_risk} as
        with hypo as (
        select distinct a.upk_key2, 'hypoglycemia' as event, b.claim_date
        from {prefix}_FINAL_{at_risk} a
        left join {prefix}_ALL_MX b
        on a.upk_key2 = b.upk_key2 and a.claim_date < b.claim_date
        where arrays_overlap(DIAGNOSIS_ARRAY, (
            select array_agg(code) 
            from {mx_def} 
            where codetype = 'ICD10CM' 
                and description ilike '%ypoglycemia%'
            ))
        ),
        er as (
        select distinct a.upk_key2, 'er' as event, b.claim_date
        from {prefix}_FINAL_{at_risk} a
        left join {prefix}_ALL_ER b
        on a.upk_key2 = b.upk_key2 and a.claim_date < b.claim_date
        where b.claim_date is not null
        ),
        hypo_er as (
        select distinct hypo.upk_key2, 'er_hypoglycemia' as event, hypo.claim_date
        from hypo
        inner join er
        on hypo.upk_key2 = er.upk_key2 and hypo.claim_date = er.claim_date
        )
        --select upk_key2, event, min(claim_date) as first_event_date from (
        select * from hypo
        union
        select * from er
        union 
        select * from hypo_er
        --) group by upk_key2, event
        ;
    """
        
    send_query(connection, sql_outcome, execute, verbose)
    return

In [33]:
# get_outcome(connection = ctx, execute = True, verbose = False)

In [13]:
# pipeline to run all steps
# parameters
prefix = 'METDPP4_V4'
query_start_date = '2016-01-01'
query_end_date = '2022-12-31'
mx_version = '20220405'
rx_version = '20220409'
cci_version = '20220404'
cohort_start_date = '2017-01-01'
cohort_end_date = '2020-12-31'
grace_period = 45
at_risk = 183 

# # pipeline
# # 1. get all encounters/patient data needed
# # 1.1 get all Rx claims for target drugs
# # 1.2 for patients ever used metformin/dpp4, get Mx claims for cohort condition, covariates and outcome
# # 1.3 for patients ever used metformin/dpp4, get patient table
# get_encounters(
#     prefix = prefix,
#     query_start_date = query_start_date,
#     query_end_date = query_end_date,
#     mx_version = mx_version,
#     rx_version = rx_version,
#     cci_version = cci_version,
#     connection = ctx, 
#     execute = True, 
#     verbose = False)

# # 2. construct eligible cohort
# # 2.1 get continous enrollment table from patient table, using 45 days gap. Run for Mx/Rx separately then combine.
# get_ce(
#     prefix = prefix,
#     grace_period = grace_period,
#     connection = ctx, 
#     execute = True, 
#     verbose = False)

# # 2.2 create a large table for all potential index dates
# # - start from table with all metformin/dpp4 Rx (a quick dedup by date/cohort)
# # - create condition flags

# # n_cohort: number of different cohorts a patient may belong to on same date, so 1 or 2 here.

# # stats:
# # elig_{window}: continously enrolled
# # n_diab_{window}: number of diabetes encounters
# # n_{drug}_{window}: number of {drug} Rx

# # window:
# # {stat}_b_365_1: lookback 365 days to 1 day prior to index date
# # {stat}_b_365_31: lookback 365 days to 31 day prior to index date
# # {stat}_b_30_1: lookback 30 days to 1 day prior to index date
# # {stat}_f_0_183: lookforward 0 day to 183 days after index date
# # {stat}_f_0_365: lookforward 0 day to 365 days after index date

# # all variables created:
# # n_cohort,
# # elig_b_365_1, elig_f_0_183, elig_f_0_365,
# # n_met_b_365_1, n_dpp4_b_365_1, n_multi_b_365_1,
# # n_met_f_0_183, n_dpp4_f_0_183, n_multi_f_0_183,
# # n_met_f_0_365, n_dpp4_f_0_365, n_multi_f_0_365,
# # n_diab_b_365_31, n_diab_b_30_1, n_diab_f_0_183, n_diab_f_0_365

# get_all_potential(
#     prefix = prefix,
#     cohort_start_date = cohort_start_date,
#     cohort_end_date = cohort_end_date,
#     connection = ctx, 
#     execute = True, 
#     verbose = False)

# # 2.3 filter by conditions above to get first_potential index date.
# # following those logic sequentially:

# # where
# # n_met_b_365_1 = 0 and n_dpp4_b_365_1 = 0 and n_multi_b_365_1 = 0 -- no drug use before index date
# # and elig_b_365_1 > 0 -- c.e. before index date
# # --[DEPRECATED] and n_diab_b_365_31 = 0 -- no diabetes Dx (new patient)
# # and n_cohort = 1 -- only use one drug on index date
# # and elig_f_0_{at_risk} > 0 -- c.e. in at-risk window
# # and ((cohort = 'metformin' and n_dpp4_f_0_{at_risk} + n_multi_f_0_{at_risk} = 0)
# # or (cohort = 'dpp4' and n_met_f_0_{at_risk} + n_multi_f_0_{at_risk} = 0) -- no drug switching in at-risk window
# # )
# # --[DEPRECATED] and n_diab_b_30_1 + n_diab_f_0_{at_risk} >= 1 -- at least ONE diagnosis of diabetes
# # qualify claim_date = min(claim_date) over (partition by upk_key2) -- if multiple episodes are eligible, pick first one

# get_first_potential(   
#     prefix = prefix,
#     at_risk = at_risk,
#     connection = ctx, 
#     execute = True, 
#     verbose = False)

# # 2.4 get adherence in at-risk window
# # (this step was an exclusive step, but we relax it to be a step adding adherence as a covariates)
# # adherence logic mainly following AAPI adherence, but modified so that
# # first_period_date is the index date, and last_period_date is index_date + {at_risk} - 1
# # pdc2 is calculated using the modified logic.

# get_adherence(  
#     prefix = prefix,
#     at_risk = at_risk,
#     connection = ctx, 
#     execute = True, 
#     verbose = False)

# # 2.5 get final eligible cohort, this file should be unique by patient. Each patient with one eligible cohort/date.
# # pdc2 is merged as a covariate, not an exclusion.
# # row counts should be same as first_potential file.
# # demographics on index date is added (after dedup).
# get_final_cohort(  
#     prefix = prefix,
#     at_risk = at_risk,
#     connection = ctx, 
#     execute = True, 
#     verbose = False)

# # 3. construct cohort funnel table - by year and overall
# get_cohort_funnel(connection = ctx, by_year = True, execute = True, verbose = False)
# get_cohort_funnel(connection = ctx, by_year = False, execute = True, verbose = False)

# # 4. generate columns for next step analysis (only for the final cohort patients)
# # 4.1 get covariates in baseline period (365 days lookback).
# # - ckd
# # - diabetes
# # - CCI score
# # - number of ER visits

# get_covariates(
#     prefix = prefix,
#     at_risk = at_risk,
#     cci_version = cci_version,
#     connection = ctx, 
#     execute = True, 
#     verbose = False)

# 4.2 get outcomes after index date. Dedup by event/date.
# get_outcome(  
#     prefix = prefix,
#     at_risk = at_risk,
#     connection = ctx, 
#     execute = True, 
#     verbose = False)

# 5. everything else in R.

In [29]:
load_data(ctx, "METDPP4_V4_COHORT_FUNNEL_183")



Unnamed: 0,CRITERIA,COHORT,N_BENE,N_RX
0,a. all users in study window,dpp4,3359292,37287867
1,b. continuous enrollment 365 days prior,dpp4,1356523,14459069
2,c. drug cleaning 365 days prior,dpp4,131355,134147
3,d. no same day two drugs,dpp4,99641,101709
4,f. continuous enrollment 183 days after index ...,dpp4,88950,90751
5,g. no drug switching 183 days after index date,dpp4,47533,48573
6,i. pick first eligible episode,dpp4,45826,45826
7,j. PDC > 80%,dpp4,16968,16968
8,a. all users in study window,metformin,19536288,231942046
9,b. continuous enrollment 365 days prior,metformin,7768254,83806088
