<h1 style="text-align: center;">mandatory use</h1>

## import libraries, set constants, define helper functions

In [None]:
import polars as pl
from Levenshtein import ratio
from datetime import datetime, timedelta, date
# for using tableau API:
# import tableauserverclient as TSC
# import toml

constants

In [None]:
RATIO = .7              # patient name similarity between rx and search to give search credit
PARTIAL_RATIO = .5      # patient name similarity between rx and search to give search credit for partial searches
DAYS_BEFORE = 7         # max number of days before an rx was written where searching should receive credit
FILTER_VETS = True      # remove veterinarians from the data

TESTING = False         # save progress and detail files such as search_results, dispensations_results, overlaps_active
# ------------------------------------------------------------------------------------------------------------------------------------
SUPPLEMENT = True       # add additional information to the results: overlapping dispensations, opioids to opioid naive patients, etc

OVERLAP_RATIO = .9      # patient name similarity between opioid and benzo prescriptions to confirm overlap
OVERLAP_TYPE = 'last'   # last, part, both; how overlaps are counted, see readme
NAIVE_RATIO = .7        # patient name similarity between 2 opioid rx to confirm patient is NOT opioid naive
MME_THRESHOLD = 90      # mme threshold for single rx
# ------------------------------------------------------------------------------------------------------------------------------------
TABLEAU_API = False     # pull tableau files using the API, requires a secrets.toml file, see TABAPIREADME.md
WORKBOOK_NAME = 'mu'    # workbook name in tableau
AUTO_DATE = True        # pull data based on last month, if False, set the values below:
FIRST_WRITTEN_DATE = date(2024, 4, 1) 
LAST_WRITTEN_DATE = date(2024, 4, 30)

In [None]:
if SUPPLEMENT:
    if OVERLAP_TYPE not in ['last', 'part', 'both']:
        raise Exception(f'{OVERLAP_TYPE} is not one of last, part, both, please set OVERLAP_TYPE accordingly')

helper functions

In [None]:
def add_days(n, d = datetime.today()):
  return d + timedelta(n)

In [None]:
def filter_vets(df):
    if FILTER_VETS:
        df = (
            df
            .filter(
                (pl.col('animal_name') == 'Unspecified') |
                (pl.col('animal_name') == '~')
            )
            .drop('animal_name')
        )
    else:
        df = df.drop('animal_name')

    return df

In [None]:
if TABLEAU_API:
    with open('secrets.toml', 'r') as f:
            secrets = toml.load(f)

    server = secrets['tableau']['server']
    site = secrets['tableau']['site']
    token_name = secrets['tableau']['token_name']
    token_value = secrets['tableau']['token_value']

    tableau_auth = TSC.PersonalAccessTokenAuth(token_name, token_value, site)
    tableau_server = TSC.Server(server, use_server_version=True, http_options={'verify':False})

    def csv_from_view_id(file_name:str, view_id:str, filters:dict|None=None) -> None: 
        if filters:
            options = TSC.CSVRequestOptions()
            for k,v in filters.items():
                options.vf(k,v)
        else:
            options = None
        view = tableau_server.views.get_by_id(view_id)
        tableau_server.views.populate_csv(view, options)
        with open(f'data/{file_name}.csv', 'wb') as f:
            f.write(b''.join(view.csv))

    def find_view_luid(view_name:str, workbook_name:str) -> str:
        all_workbooks = list(TSC.Pager(tableau_server.workbooks))
        searched_workbook = [workbook for workbook in all_workbooks if workbook.name==workbook_name][0]
        tableau_server.workbooks.populate_views(searched_workbook)
        views = searched_workbook.views
        searched_view = [view for view in views if view.name==view_name][0]
        return searched_view.id
    
    with tableau_server.auth.sign_in(tableau_auth):
        print('finding dispensations luid...')
        disp_luid = find_view_luid('dispensations', WORKBOOK_NAME)
        print(disp_luid)
        print('finding searches luid...')
        searches_luid = find_view_luid('searches', WORKBOOK_NAME)
        print(searches_luid)
        print('finding id luid...')
        id_luid = find_view_luid('ID', WORKBOOK_NAME)
        print(id_luid)
        print('finding active_rx luid...')
        active_rx_luid = find_view_luid('active_rx', WORKBOOK_NAME)
        print(active_rx_luid)
        print('finding naive_rx luid...')
        naive_rx_luid = find_view_luid('naive_rx', WORKBOOK_NAME)
        print(naive_rx_luid)
        
        if AUTO_DATE:
            today = date.today()
            last_of_month = add_days(-1, today.replace(day=1))
            first_of_month = last_of_month.replace(day=1)
        else:
            last_of_month = LAST_WRITTEN_DATE
            first_of_month = FIRST_WRITTEN_DATE
        last_for_search = add_days(1, last_of_month)
        first_for_search = add_days(-DAYS_BEFORE, first_of_month)

        filters = {
            'first_of_month':first_of_month, 'last_of_month':last_of_month, 
            'first_for_search':first_for_search, 'last_for_search':last_for_search 
        }

        print('pulling dispensations_data...')
        csv_from_view_id('dispensations_data', disp_luid, filters)
        print('wrote data/dispensations_data.csv')
        print('pulling searches_data...')
        csv_from_view_id('searches_data', searches_luid, filters)
        print('wrote data/searches_data.csv')
        print('pulling ID_data...')
        csv_from_view_id('ID_data', id_luid, filters)
        print('wrote data/ID_data.csv')
        print('pulling active_rx...')
        csv_from_view_id('active_rx_data', active_rx_luid, filters)
        print('wrote data/active_rx.csv')
        print('pulling naive_rx...')
        csv_from_view_id('naive_rx_data', naive_rx_luid, filters)
        print('wrote data/naive_rx.csv')

## read in and prepare data for processing

In [None]:
users = (
    pl.scan_csv('data/ID_data.csv', infer_schema_length=10000)
    .rename({
        'Associated DEA Number(s)':'dea_number(s)', 'User ID':'true_id', 'User Full Name':'user_full_name', 'State Professional License':'license_number',
        'Specialty Level 1':'specialty_1', 'Specialty Level 2':'specialty_2', 'Specialty Level 3':'specialty_3'
    }) 
)

# each user dea gets its own row so a prescriber gets credit for searches on prescriptions with any of their registered deas
users_explode = (
    users
    .with_columns(
        pl.col('dea_number(s)').str.to_uppercase().str.strip_chars().str.split(',').alias('dea_number')
    )
    .explode('dea_number')
    .select('true_id', 'dea_number')
)

In [None]:
pattern = r'^[A-Za-z]{2}\d{7}$' # 2 letters followed by 7 digits
dispensations = (
    pl.scan_csv('data/dispensations_data.csv', infer_schema_length=10000)
    .rename({'Month, Day, Year of Patient Birthdate': 'disp_dob', 'Month, Day, Year of Written At': 'written_date', 
             'Month, Day, Year of Filled At': 'filled_date', 'Month, Day, Year of Dispensations Created At': 'disp_created_date',
             'Prescriber First Name': 'prescriber_first_name', 'Prescriber Last Name': 'prescriber_last_name', 
             'Orig Patient First Name': 'patient_first_name', 'Orig Patient Last Name': 'patient_last_name',
             'Prescriber DEA': 'prescriber_dea', 'Generic Name':'generic_name', 'Prescription Number':'rx_number',
             'AHFS Description':'ahfs', 'Daily MME':'mme', 'Days Supply':'days_supply', 'Animal Name':'animal_name'})
    .with_columns(
        pl.col(['disp_dob', 'written_date', 'filled_date', 'disp_created_date']).str.to_date('%B %d, %Y'),
        pl.col('prescriber_dea').str.to_uppercase().str.strip_chars(),
        (pl.col('patient_first_name') + ' ' + pl.col('patient_last_name')).str.to_uppercase().alias('patient_name'),
        (pl.col('prescriber_first_name') + ' ' + pl.col('prescriber_last_name')).str.to_uppercase().alias('prescriber_name')
    )
    .filter(
        (pl.col('prescriber_dea').str.contains(pattern))
    )
    .join(users_explode, how='left', left_on='prescriber_dea', right_on='dea_number', coalesce=True)
    .collect()
    .with_columns(
        (pl.col('written_date').dt.offset_by(f'-{DAYS_BEFORE}d')).alias('start_date'),
        (pl.col('written_date').dt.offset_by('1d')).alias('end_date')   # to account for bamboo's issues handling of UTC
    )
    .drop('patient_first_name', 'patient_last_name', 'prescriber_first_name', 'prescriber_last_name')
)

dispensations = filter_vets(dispensations)

In [None]:
#for filtering searches to only the days we could potentially need
first_of_month = dispensations['written_date'].min()
last_of_month = dispensations['written_date'].max()
min_date = add_days(-DAYS_BEFORE, first_of_month)
max_date = add_days(1, last_of_month)

searches = (
    pl.scan_csv('data/searches_data.csv', infer_schema_length=10000)
    .rename({'Month, Day, Year of Search Creation Date': 'created_date', 'Month, Day, Year of Searched DOB':
            'search_dob', 'Searched First Name': 'first_name', 'Searched Last Name': 'last_name',
            'Partial First Name?': 'partial_first', 'Partial Last Name?': 'partial_last', 'True ID': 'true_id'})
    .with_columns(
        pl.col(['search_dob', 'created_date']).str.to_date('%B %d, %Y'),
        (pl.col('first_name') + ' ' + pl.col('last_name')).alias('full_name').str.to_uppercase(),
        (pl.col('partial_first') | pl.col('partial_last')).alias('partial')
    )
    .filter(
        pl.col('created_date').is_between(min_date, max_date) &
        pl.col('true_id').is_in(dispensations['true_id'])
    )
    .collect()
    .with_columns(
        (pl.col('partial').map_elements(lambda x: PARTIAL_RATIO if x else RATIO, return_dtype=pl.Float32)).alias('ratio_check')
    )
    .drop('first_name', 'last_name', 'partial_first', 'partial_last')
    .lazy()
)

## process dispensations for searches

In [None]:
dispensations_with_searches = (
    dispensations
    .lazy()
    .join(searches, how='left', on='true_id', coalesce=True)
    .filter(
        (pl.col('created_date').is_between(pl.col('start_date'), pl.col('end_date'))) &
        (pl.col('disp_dob') == pl.col('search_dob'))
    )
    .with_columns(
        pl.struct(['full_name', 'patient_name'])
        .map_elements(lambda x: ratio(x['full_name'], x['patient_name']), return_dtype=pl.Float32).alias('ratio')
    )
    .collect(streaming=True)
    .filter(
        pl.col('ratio') >= pl.col('ratio_check')
    )
    .unique(subset=['rx_number','prescriber_dea','written_date'])
    .select('rx_number','prescriber_dea','written_date')
    .with_columns(
        pl.lit(True).alias('search')
    )
)

In [None]:
final_dispensations = (
    dispensations
    .join(dispensations_with_searches, how='left', on=['rx_number','prescriber_dea','written_date'], coalesce=True)
    .fill_null(False)
    .unique(subset=['rx_number','prescriber_dea','written_date'])
    .with_columns(
        pl.col('true_id').fill_null(pl.col('prescriber_dea')).alias('final_id')
    )
)

pattern_cap = r'^([A-Za-z]{2}\d{7})$' # 2 letters followed by 7 digits
deas = dispensations.select('prescriber_dea', 'prescriber_name').lazy()
results = (
    final_dispensations
    .group_by(['final_id'])
    .agg([pl.len(), pl.col('search').sum()])
    .with_columns(
        ((pl.col('search') / pl.col('len')) * 100).alias('rate'),
        (pl.col('final_id').str.to_integer(base=10, strict=False).cast(pl.Int64)).alias('true_id'),
        (pl.col('final_id').str.extract(pattern_cap)).alias('unreg_dea')
    )
    .rename({'len':'dispensations', 'search':'searches'})
    .lazy()
    .join(users, how='left', on='true_id', coalesce=True)
    .join(deas, how='left', left_on='unreg_dea', right_on='prescriber_dea', coalesce=True)
    .unique('final_id')
    .with_columns(
        pl.col('user_full_name').fill_null(pl.col('prescriber_name')),
        pl.col('dea_number(s)').fill_null(pl.col('unreg_dea')),
        pl.col('true_id').is_not_null().alias('registered')
    )
    .drop('prescriber_name')
    .rename({'user_full_name':'prescriber_name'})
    .select(
        'final_id', 'prescriber_name', 'dea_number(s)', 'license_number', 'specialty_1', 'specialty_2', 'specialty_3',
        'dispensations', 'searches', 'rate', 'registered'
    )
    .collect()
)

write progress to csvs

In [None]:
if TESTING:
    results.write_csv('search_results.csv')
    final_dispensations.write_csv('dispensations_results.csv')

## supplementary information (overlap, over MME threshold, opioid naive, etc)

### opi and benzo counts

In [None]:
opi_count = (
    final_dispensations
    .lazy()
    .filter(pl.col('ahfs').str.contains('OPIOID'))
    .with_columns(
    (pl.col('filled_date') + pl.duration(days='days_supply')).alias('opi_end_date'),
    (pl.col('disp_created_date') + pl.duration(days=1)).alias('opi_start_date')
    )
    .rename({
        'written_date':'opi_written_date', 'filled_date':'opi_filled_date', 
        'patient_name':'opi_patient_name', 'disp_created_date':'opi_disp_created_date'
    })
    .collect()
    .group_by('final_id')
    .len()
    .rename({'len':'opi_rx'})
)

benzo_count = (
    final_dispensations
    .lazy()
    .filter(pl.col('ahfs').str.contains('BENZO'))
    .with_columns(
        (pl.col('filled_date') + pl.duration(days='days_supply')).alias('benzo_end_date'),
        (pl.col('disp_created_date') + pl.duration(days=1)).alias('benzo_start_date')
    )
    .rename({
        'written_date':'benzo_written_date', 'filled_date':'benzo_filled_date', 
        'patient_name':'benzo_patient_name', 'disp_created_date':'benzo_disp_created_date'
    })
    .collect()
    .group_by('final_id')
    .len()
    .rename({'len':'benzo_rx'})
)

# add counts of opi and benzo disps
results = (
    results
    .join(opi_count, how='left', on='final_id', coalesce=True)
    .with_columns(
        pl.col('opi_rx').fill_null(0)
    )
    .join(benzo_count, how='left', on='final_id', coalesce=True)
    .with_columns(
        pl.col('benzo_rx').fill_null(0)
    )
)

### overlap

In [None]:
if SUPPLEMENT:
    active = (
        pl.scan_csv('data/active_rx_data.csv', infer_schema_length=10000)
        .rename({
            'Month, Day, Year of Patient Birthdate':'dob', 'Month, Day, Year of Filled At':'filled_date',
            'Month, Day, Year of Dispensations Created At':'create_date', 'Month, Day, Year of Written At':'written_date',
            'Orig Patient First Name':'patient_first_name', 'Orig Patient Last Name':'patient_last_name', 'Prescriber DEA':'dea',
            'AHFS Description':'ahfs', 'Month, Day, Year of rx_end':'rx_end', 'Animal Name':'animal_name'
        })
        .join(users_explode, how='left', left_on='dea', right_on='dea_number', coalesce=True)
        .with_columns(
            pl.col(['filled_date', 'create_date', 'written_date', 'rx_end', 'dob']).str.to_date('%B %d, %Y'),
            pl.col('true_id').fill_null(pl.col('dea')).alias('final_id'),
            (pl.col('patient_first_name') + ' ' + pl.col('patient_last_name')).str.to_uppercase().alias('patient_name'),
        )
        .drop('true_id', 'patient_first_name', 'patient_last_name')
    )

    active = filter_vets(active)

    benzo_active = (
        active
        .filter(
            pl.col('ahfs').str.contains('BENZO')
        )
    )

    opi_active = (
        active
        .filter(
            pl.col('ahfs').str.contains('OPIOID')
        )
    )

In [None]:
if SUPPLEMENT:
    if OVERLAP_TYPE in ['part', 'both']:
        overlap_active = (
            benzo_active
            .join(opi_active, how='left', on='dob', suffix='_opi', coalesce=True)
            .filter(
                ((pl.col('written_date_opi').is_between(pl.col('filled_date'), pl.col('rx_end'))) |
                (pl.col('written_date').is_between(pl.col('filled_date_opi'), pl.col('rx_end_opi'))))
            )
            .with_columns(
                pl.struct(['patient_name_opi', 'patient_name'])
                .map_elements(lambda x: ratio(x['patient_name_opi'], x['patient_name']), return_dtype=pl.Float32).alias('ratio')
            )
            .filter(
                pl.col('ratio') >= OVERLAP_RATIO
            )
            .collect(streaming=True)
        )

        if TESTING:
            overlap_active.write_csv('overlaps_part.csv')

        benzo_dispensations_overlap = (
            overlap_active
            .filter(
                (pl.col('written_date').is_between(first_of_month, last_of_month))
            )
            .select('final_id')
            .group_by('final_id')
            .len()
        )

        opi_dispensations_overlap = (
            overlap_active
            .filter(
                (pl.col('written_date_opi').is_between(first_of_month, last_of_month))
            )
            .select('final_id_opi')
            .rename({'final_id_opi':'final_id'})
            .group_by('final_id')
            .len()
        )

        all_overlaps = (
            pl.concat([benzo_dispensations_overlap, opi_dispensations_overlap])
            .group_by('final_id')
            .sum()
            .rename({'len':'overlapping_rx_part'})  
        )

        # add count of overlapping rx to the results
        results = (
            results
            .join(all_overlaps, how='left', on='final_id', coalesce=True)
            .with_columns(
                pl.col('overlapping_rx_part').fill_null(0)
            )
        )

    if OVERLAP_TYPE in ['last', 'both']:
        overlap_active = (
            benzo_active
            .join(opi_active, how='left', on='dob', suffix='_opi', coalesce=True)
            .filter(
                # using create_date + 1 day for start date, adjust for reporting frequency
                ((pl.col('written_date_opi').is_between((pl.col('create_date') + pl.duration(days=1)), pl.col('rx_end'))) |
                (pl.col('written_date').is_between((pl.col('create_date_opi') + pl.duration(days=1)), pl.col('rx_end_opi'))))
            )
            .with_columns(
                pl.struct(['patient_name_opi', 'patient_name'])
                .map_elements(lambda x: ratio(x['patient_name_opi'], x['patient_name']), return_dtype=pl.Float32).alias('ratio')
            )
            .filter(
                pl.col('ratio') >= OVERLAP_RATIO
            )
            .collect(streaming=True)
        )

        if TESTING:
            overlap_active.write_csv('overlaps_last.csv')

        benzo_dispensations_overlap = (
            overlap_active
            .filter(
                (pl.col('written_date').is_between(first_of_month, last_of_month)) &
                (pl.col('written_date') > pl.col('written_date_opi'))
            )
            .select('final_id')
            .group_by('final_id')
            .len()
        )

        opi_dispensations_overlap = (
            overlap_active
            .filter(
                (pl.col('written_date_opi').is_between(first_of_month, last_of_month)) &
                (pl.col('written_date') < pl.col('written_date_opi'))
            )
            .select('final_id_opi')
            .rename({'final_id_opi':'final_id'})
            .group_by('final_id')
            .len()
        )

        all_overlaps = (
            pl.concat([benzo_dispensations_overlap, opi_dispensations_overlap])
            .group_by('final_id')
            .sum()
            .rename({'len':'overlapping_rx_last'})  
        )

        # add count of overlapping rx to the results
        results = (
            results
            .join(all_overlaps, how='left', on='final_id', coalesce=True)
            .with_columns(
                pl.col('overlapping_rx_last').fill_null(0)
            )
        )

### over MME threshold

In [None]:
over_mme = (
    final_dispensations
    .select('final_id', 'mme')
    .filter(
        pl.col('mme') >= MME_THRESHOLD
    )
    .group_by('final_id')
    .len()
    .rename({'len':'rx_over_mme_threshold'})
)

# add count of rx over the mme threshold to the results
results = (
    results
    .join(over_mme, how='left', on='final_id', coalesce=True)
    .with_columns(
        pl.col('rx_over_mme_threshold').fill_null(0)
    )
)

### opioid naive

In [None]:
if SUPPLEMENT:
    naive = (
        pl.scan_csv('data/naive_rx_data.csv', infer_schema_length=10000)
        .rename({
            'Orig Patient First Name':'patient_first_name', 'Orig Patient Last Name':'patient_last_name', 'Max. naive_end':'naive_end',
            'Month, Day, Year of Patient Birthdate':'dob', 'Month, Day, Year of Filled At':'naive_filled_date', 'Animal Name':'animal_name'
        })
        .with_columns(
            pl.col(['dob', 'naive_filled_date']).str.to_date('%B %-d, %Y'),
            pl.col('naive_end').str.to_date('%-m/%-d/%Y'),
            (pl.col('patient_first_name') + ' ' + pl.col('patient_last_name')).str.to_uppercase().alias('naive_patient_name')
        )
        .drop('patient_first_name', 'patient_last_name')
    )

    naive = filter_vets(naive)

In [None]:
if SUPPLEMENT:
    naive_disps = (
        final_dispensations
        .lazy()
        .filter(pl.col('ahfs').str.contains('OPIOID'))
        .join(naive, how='left', left_on='disp_dob', right_on='dob', coalesce=True)
        .filter(
            pl.col('written_date').is_between(pl.col('naive_filled_date'), pl.col('naive_end'))
        )
        .with_columns(
            pl.struct(['patient_name', 'naive_patient_name'])
            .map_elements(lambda x: ratio(x['patient_name'], x['naive_patient_name']), return_dtype=pl.Float32).alias('ratio')
        )
        .filter(
            pl.col('ratio') >= NAIVE_RATIO
        )
        .with_columns(
            pl.lit(False).alias('opi_naive')
        )
        .unique(subset=['final_id', 'rx_number'])
    )

    naive_disps = (
        final_dispensations
        .lazy()
        .join(naive_disps, how='left', on=['final_id', 'rx_number'], coalesce=True)
        .select('final_id', 'ahfs', 'opi_naive')
        .with_columns(
            pl.col('opi_naive').fill_null(True),
            ((pl.col('ahfs').str.contains('OPIOID')) & pl.col('opi_naive')).fill_null(True).alias('opi_to_opi_naive')
        )
        .select('final_id', 'opi_to_opi_naive')
        .filter(
            pl.col('opi_to_opi_naive')
        )
        .group_by('final_id').len()
        .rename({'len':'opi_to_opi_naive'})
        .collect()
    )

    # add number of opioid dispensations to opioid naive patients to results
    results = (
        results
        .join(naive_disps, how='left', on='final_id', coalesce=True)
        .with_columns(
            pl.col('opi_to_opi_naive').fill_null(0)
        )

    )

## results

In [None]:
results = (
    results
    .sort(['searches', 'dispensations'], descending=[False, True])
)

import calendar
start_month = calendar.month_name[first_of_month.month].lower()
start_year = first_of_month.year
end_month = calendar.month_name[last_of_month.month].lower()
end_year = last_of_month.year
result_file_name = 'results_full.csv'

tail = 'full' if SUPPLEMENT else 'base'

if start_month == end_month:
    result_file_name = f'{start_month}{start_year}_mandatory_use_{tail}.csv'
else:
    result_file_name = f'{start_month}{start_year}-{end_month}{end_year}_mandatory_use_{tail}.csv'

results.write_csv(result_file_name)
f'{result_file_name} saved'

In [None]:
stats = (
    results
    .drop('rate')
    .sum()
    .with_columns(
        ((pl.col('searches') / pl.col('dispensations')) * 100).round(2).alias('rate')
    )
    .select('dispensations', 'searches', 'rate')
)

stats