#  Managed Care - 1

- TO DO - 
    - Do Code Review
    - Potential Optimizations ? (remove mp prep outsite loop? etc)
    - double check imported datasets
    - Overall QC

In [None]:
# importing modules
import polars as pl
import gc
import pandas as pd
from datetime import datetime, timedelta,date
import json
import numpy as np

In [None]:
# load variables from JSON
with open('vars_wk.json', 'r') as json_file:
    js = json.load(json_file)

bucket = js['bucket']
data_date = js['data_date']
monthly_data_date = js['monthly_data_date']
QTD = js['QTD']
YTD = js['YTD']

# data_date = '20240802'
# monthly_data_date = '202407'
# QTD = 1
# YTD = 7 
#TODO: CONNECT TO JSON LATER

dflib = f's3://{bucket}/BIT/dataframes/'
pln = f's3://{bucket}/PYADM/weekly/archive/{data_date}/plantrak/' 
mpln = f's3://{bucket}/PYADM/monthly/archive/{monthly_data_date}/plantrak/'

In [None]:
# Utility Functions -
def load(df, lib=dflib):
    globals()[df] = pl.read_parquet(f'{lib}{df}.parquet')
def offload(df, name, lib=dflib, ef = 'NA' ):
    file = f'{dflib}mc/{name}.parquet'

    if ef == 'NA':
        globals()[df].to_pandas().to_parquet(file, index =False)
    else:
        globals()[df][ef].to_pandas().to_parquet(file, index =False)

    print('Exported : ', file)

In [None]:
# Imporing Dependencies
prod_mapping = pl.read_csv(f's3://{bucket}/BIT/docs/productmapping_pybit.txt',separator='|')
geo_code_mapper = pl.from_pandas(pd.read_excel(f's3://{bucket}/BIT/docs/geo_id_full.xlsx'))
load('mp_spec_seg_dec')
load('MASTER_UNI')
fetch_products = ['LI1','LI2','LI3','TRU','AMT','LAC','MOT','LUB','IRL']

---

## Importing Raw Data

### Formulary
- _Using Both Weekly and Monthly for payer name list_
- _only using Monthly for plan_type and plan_class_

In [None]:
# Processing Formulary Datasets -
columns_to_read = ['IMS_PLAN_ID','GROUP_TYPE','FORMULARY_GROUP_STATUS','PFAM_CD','PFAM_NAME','IRWD_FGN_NAME','BRAND']

fm_monthly = pl.read_parquet(
    mpln+'FORMULARY.parquet',columns = columns_to_read
)

fm_weekly = pl.read_parquet(
    pln+'FORMULARY.parquet',columns = columns_to_read
)

# Consolidating list of Unique Payer Names -
payer_names = (
    fm_monthly.select('IRWD_FGN_NAME')
    .vstack(fm_weekly.select('IRWD_FGN_NAME'))
    .unique()
    .sort('IRWD_FGN_NAME')
    .with_row_index(offset=1)
    .rename({'index':'payer_id'})
)

#FORMULARY
group_type_mapping = {
    'HIX' : 'Commercial','Com' : 'Commercial','Cash' : 'Cash','Voucher':'Voucher',
    'FFS' : 'FFS','Mgd Medicaid' : 'Mgd Medicaid','Part D' : 'Part D','MAC A' : 'Others',
}

def classify_plan_class(status):
    status = status.upper()
    if status[:7] == "COVERED" or status[:6] == "ON PDL":
        return "COVERED"
    elif status[:9] == "PREFERRED":
        return "PREFERRED"
    elif status[:13] == "NON-PREFERRED":
        return "NON PREFERRED"
    elif status[:7] == "NON-PDL" or status[:11] == "NOT COVERED":
        return "NOT COVERED"
    else:
        return "N_A"

fm = fm_monthly.with_columns(
        pl.when(pl.col('BRAND')=='IBR')
        .then(pl.lit('IRL'))
        .otherwise(pl.col('BRAND'))
        .alias('BRAND')
)

fm = fm.filter((pl.col('PFAM_CD')==(pl.col('BRAND'))) | (pl.col('BRAND')==''))

fm = (
    fm
    .with_columns(
        pl.col('GROUP_TYPE').map_elements(lambda x: group_type_mapping.get(x,'Others'), return_dtype=pl.Utf8) #NOTE : IF new plan types flow , they will go to Others by default
        .fill_null('Others')
        .alias('plan_type'),
        pl.col('IMS_PLAN_ID').cast(pl.Int64)
    )
    .rename({'IMS_PLAN_ID':'PlanID'})
    .drop('GROUP_TYPE')
    .with_columns(pl.col('FORMULARY_GROUP_STATUS').fill_null(pl.lit('N_A')))
    .with_columns(pl.col('FORMULARY_GROUP_STATUS').map_elements(classify_plan_class,return_dtype=pl.String).alias('plan_class'))
    .drop('FORMULARY_GROUP_STATUS')
    .unique()
)

###############
# HARD CODED - 
fm = fm.with_columns(pl.when(pl.col("PlanID") == 13670614).then(pl.lit('Others')).otherwise(pl.col("plan_type")).alias("plan_type"))
###############
fm2 = (
    fm
    .select('PFAM_CD','IRWD_FGN_NAME','plan_class').unique()
    .group_by(['IRWD_FGN_NAME','PFAM_CD'])
    .agg(
        pl.col('plan_class').unique().str.concat(' / ').alias('plan_class')
    )
    .with_columns(pl.col('plan_class').str.to_titlecase())
)

### Plantrak

In [None]:
# Import and prepare Raw data - # INPUT : Adm Files # OUTPUT : ln
ln = (
    pl.read_parquet(mpln+'LAX_N.parquet',columns=['IID','MonthKey','PFAM_CD','PROD_CD','PlanID','TUF','TRX','TUN']) 
    .rename({'MonthKey':'PeriodKey'})
    .filter(pl.col('PROD_CD').is_in(fetch_products)) #only keep data for BIT products
    .with_columns(pl.col('PeriodKey').cast(pl.Utf8).str.to_date("%Y%m%d")) #Convert Categorical column Back to date
)
date_list = ln['PeriodKey'].unique().sort(descending=True)

# Any PlanIds startign with -0000002 should be excluded
ln = (
    ln
    .with_columns(pl.col('PlanID').cast(pl.Utf8).str.zfill(10).alias('planid_chr'))
    .filter(~pl.col('planid_chr').str.starts_with('000002'))
    .drop('planid_chr')
)

ln = ln.join(
    (pl.DataFrame(date_list).with_row_index(offset = 1).rename({'index':'num_month'})),
    on = 'PeriodKey', how = 'left'
)

ln = (
    ln
    .join(fm.select(['PlanID','IRWD_FGN_NAME']).unique(),on='PlanID',how='left')
)

In [None]:
# GENETATOR FUNCTION FOR DATACUTS  # INPUT : ln # OUTPUT : ln1

#helper dict object - 
filter_cond_dict = {
    '1c' : pl.col('num_month')==1,'1p' : pl.col('num_month')==2,
    '3c' : pl.col('num_month').is_in([1,2,3]),'3p' : pl.col('num_month').is_in([4,5,6]),
    '6c' : pl.col('num_month').is_in([1,2,3,4,5,6]),'6p' : pl.col('num_month').is_in([7,8,9,10,11,12]),
    '12c' : pl.col('num_month').is_in([i for i in range(1,13)]),'12p' : pl.col('num_month').is_in([i for i in range(13,25)]),
    'qtdc' : pl.col('num_month').is_in([i for i in range(1,QTD+1)]),'qtdp' : pl.col('num_month').is_in([i for i in range(4,4+QTD)]),
    'ytdc' : pl.col('num_month').is_in([i for i in range(1,YTD+1)]),'ytdp' : pl.col('num_month').is_in([i for i in range(13,13+YTD)])
}

def get_data_cuts(df,fl):
    result = pl.DataFrame()
    for period,cond in filter_cond_dict.items():
        df_filter = df.filter(cond)
        df_filter = (
            df_filter
            .group_by(['IID','IRWD_FGN_NAME','PFAM_CD','PROD_CD'])
            .agg(
                pl.col('TUF').sum().alias(f'TUF_{period}'),
                pl.col('TRX').sum().alias(f'TRX_{period}'),
                pl.col('TUN').sum().alias(f'TUN_{period}')
            )
        )

        if period == '1c':
            result = df_filter
        else:
            result = result.join(df_filter,on =['IID','IRWD_FGN_NAME','PFAM_CD','PROD_CD'],how = 'outer_coalesce')

    # Pulling in Plan Type -
    result = (
        result
        .join(
            fm.select(['IRWD_FGN_NAME','PFAM_CD','plan_type']).unique(),
            on = ['IRWD_FGN_NAME', 'PFAM_CD'], how = 'left'
        )
        .with_columns(
            pl.col('plan_type').fill_null(pl.lit('Others')),
        )
    )

    # Pulling in Plan Class
    result = (
        result.join(fm2, on=['IRWD_FGN_NAME', 'PFAM_CD'], how='left')
        .with_columns(
            pl.col('plan_class').fill_null(pl.lit('N_a'))
        )
    )
    if fl == 1:
        # Dropping Records with Voucher , FFS , Medicaid
        result = result.filter(
            ~(pl.col('plan_type').is_in(['Voucher','Mgd Medicaid','FFS']))
        )

    #Joining Payer ID-
    result = result.join(payer_names, on ='IRWD_FGN_NAME', how = 'left')

    # adding product_id
    result = (
        result
        .join(
            prod_mapping.select(['code','product_id','parent_product_id']),
            left_on = 'PROD_CD', right_on='code', how = 'left'
        )
    )

    return (result)

ln1 = get_data_cuts(ln,1)
ln1_pmix = get_data_cuts(ln,0) #this will have medicad ffs and voucher

In [None]:
# Adding Parent Product Rows - # INPUT ln1 # OUTPUT : ln2
data_cut_list = [f'TUF_{p}' for p in filter_cond_dict.keys()] + [f'TRX_{p}' for p in filter_cond_dict.keys()] + [f'TUN_{p}' for p in filter_cond_dict.keys()]
def get_parent_rows(ln1):
    prod_agg_expn_list = {
        col : pl.col(col).sum() for col in data_cut_list
    }
    prod_agg_expn_list.update({'plan_type':pl.col('plan_type').first()})
    
    #lin and amt-
    
    ln1_235 = (
        ln1
        .filter(pl.col('parent_product_id').is_in([2,35]))
        .group_by(['IID','IRWD_FGN_NAME','payer_id','parent_product_id'])
        .agg(
            **{**prod_agg_expn_list,'plan_class':pl.col('plan_class').first()}
        )
        .rename({'parent_product_id':'product_id'})
    )
    
    
    #for lax mkt - 
    ln1_1 = (
        ln1
        .group_by(['IID','IRWD_FGN_NAME','payer_id'])
        .agg(**prod_agg_expn_list)
        .with_columns(pl.lit(1).alias('product_id').cast(pl.Int64),pl.lit('N_a').alias('plan_class'))
        .select(ln1_235.columns)
    )
    
    ln2 = (
        ln1.select(ln1_235.columns)
        .vstack(ln1_235)
        .vstack(ln1_1)
    )
    
    # Adding Geography Information and Removing Plans not present in Formulary & any White Space HCPs-
    ln2 = (
        ln2
        .join(mp_spec_seg_dec[['IID','geography_id']],on='IID',how='left')
        .join(geo_code_mapper,on = 'geography_id', how = 'left')
        .filter(pl.col('payer_id').is_not_null())
        .filter(pl.col('geography_id').is_not_null()) 
        .fill_null(0.0) # Filling Nulls inside Data Cuts for Consistency.
    
        # DTYPE FIXES 
        .with_columns(
            pl.col('IID').cast(pl.Int64),
            pl.col('payer_id').cast(pl.Int64),
            pl.col('geography_id').cast(pl.Int64),
            pl.col('region_geography_id').cast(pl.Int64),
            pl.col('area_geography_id').cast(pl.Int64),
            pl.col('nation_geography_id').cast(pl.Int64),
        )
    )
    return (ln2)
    
ln2 = get_parent_rows(ln1)
ln2_pmix = get_parent_rows(ln1_pmix)

In [None]:
# Generator Function for datacuts : #Input :  ln #Output : ln1_planid
def get_data_cuts_planid(df):
    result = pl.DataFrame()
    for period,cond in filter_cond_dict.items():
        df_filter = df.filter(cond)
        df_filter = (df_filter.group_by(['IID','PlanID','PFAM_CD','PROD_CD']).agg(pl.col('TUF').sum().alias(f'TUF_{period}')))
        if period == '1c':
            result = df_filter
        else:
            result = result.join(df_filter,on =['IID','PlanID','PFAM_CD','PROD_CD'],how = 'outer_coalesce')

    result = (
        result
        # Pulling Payer Name
        #.join(fm.select(['PlanID','IRWD_FGN_NAME']).unique(),on='PlanID',how='left') 
        # dropping PlanIDs not present in Formulary
        #.filter(pl.col('IRWD_FGN_NAME').is_not_null())
        # Pulling Plan Type
        .join(fm.select(['PlanID','PFAM_CD','plan_type']).unique(),on = ['PlanID', 'PFAM_CD'], how = 'left') 
        #.with_columns(pl.col('plan_type').fill_null(pl.lit('Others')))
        .filter(pl.col('plan_type').is_not_null())
        #Pulling Plan Class
        .join(fm[['PlanID','PFAM_CD','plan_class']], on=['PlanID', 'PFAM_CD'], how='left') 
        #.with_columns(pl.col('plan_class').fill_null(pl.lit('N_a')))
        # Dropping Records with Voucher , FFS , Medicaid
        .filter(~(pl.col('plan_type').is_in(['Voucher','Mgd Medicaid','FFS'])))
        #Joining Payer ID-
        #.join(payer_names, on ='IRWD_FGN_NAME', how = 'left')
        # adding product_id
        .join(prod_mapping.select(['code','product_id','parent_product_id']),left_on = 'PROD_CD', right_on='code', how = 'left')
    )
    return (result)

ln1_planid = get_data_cuts_planid(ln)

In [None]:
# Adding Parent Product Rows - # INPUT ln1_planid # OUTPUT : ln2_planid
prod_agg_expn_list = {
    col : pl.col(col).sum() for col in data_cut_list[0:12]
}

ln2_planid = ln1_planid.drop(['PFAM_CD','PROD_CD','plan_type'])

#lin and amt-
ln2_planid_2_35 = (
    ln2_planid
    .filter(pl.col('parent_product_id').is_in([2,35]))
    .group_by(['IID','PlanID','parent_product_id','plan_class'])
    .agg(**prod_agg_expn_list)
    .rename({'parent_product_id':'product_id'})
)

#for lax mkt - 
ln2_planid_1 = (
    ln2_planid
    .group_by(['IID','PlanID','plan_class'])
    .agg(**prod_agg_expn_list)
    .with_columns(pl.lit(1).alias('product_id').cast(pl.Int64))
)

ln2_planid = (
    ln2_planid.select(ln2_planid_2_35.columns)
    .vstack(ln2_planid_2_35)
    .vstack(ln2_planid_1.select(ln2_planid_2_35.columns))
)

#Adding Columns -
ln2_planid = (
    ln2_planid
    # Adding Geography Information
    .join(mp_spec_seg_dec[['IID','geography_id']],on='IID',how='left')
    .join(geo_code_mapper,on = 'geography_id', how = 'left')
    # Pulling Payer Name
    .join(fm.select(['PlanID','IRWD_FGN_NAME']).unique(),on='PlanID',how='left')
    # Plan_Type
    .join(fm.select(['PlanID','plan_type']).unique(), on ='PlanID',how='left')
    #Joining Payer ID-
    .join(payer_names, on ='IRWD_FGN_NAME', how = 'left')
    # Removing Whitespace HCPs - 
    .filter(pl.col('geography_id').is_not_null()) 
    .fill_null(0.0) # Filling Nulls inside Data Cuts for Consistency.

    # DTYPE FIXES 
    .with_columns(
        pl.col('IID').cast(pl.Int64),
        pl.col('PlanID').cast(pl.Int64),
        pl.col('payer_id').cast(pl.Int64),
        pl.col('geography_id').cast(pl.Int64),
        pl.col('region_geography_id').cast(pl.Int64),
        pl.col('area_geography_id').cast(pl.Int64),
        pl.col('nation_geography_id').cast(pl.Int64),
    )

    # Sequence -
    .select(
        ['IID','PlanID','product_id','plan_class','plan_type','payer_id','IRWD_FGN_NAME','geography_id','region_geography_id','area_geography_id','nation_geography_id'] + data_cut_list[0:12]
    )
)

In [None]:
# FOR MEMORY CONSERVATION 
del ln
del ln1
del ln1_pmix

# offload('ln2','ln2')
# offload('ln2_planid','ln2_planid')

# for i in range(4):
#     offload('top_payers',f'top_payers_{i}',ef=i)
#     offload('top_hcps',f'top_hcps_{i}',ef=i)

---

## Ranking

1. top_plans  - For a given Geography ID and given Payer Type : Top 10 Payer IDs [Based on IBSC 6m Volume]
2. top_hcps - For a given payer Top 30 HCPs.

In [None]:
#Top 10 Payers For a Given Geography and PlanType -> #INPUT : ln2 # OUTPUT : top_payers

levels = ['geography_id','region_geography_id','area_geography_id','nation_geography_id']
def get_top_payers(ln2,g):

    ln2 = ln2.filter(product_id = 1) # Only Keeping IBSC Market Volume.
    
    df = (
        ln2
        .group_by([g,'plan_type','payer_id'])
        .agg(TUF = pl.col('TUF_6c').sum())
        .with_columns(
            pl.col('TUF')
            .rank("ordinal",descending=True)
            .over([g,'plan_type'])
            .alias("rank")
        )
        .filter(pl.col('rank') <= 10)
    )
    
    df_total = (
        ln2
        .group_by([g,'payer_id'])
        .agg(TUF = pl.col('TUF_6c').sum())
        .with_columns(plan_type = pl.lit('Total'))
        .with_columns(
            pl.col('TUF')
            .rank("ordinal",descending=True)
            .over([g,'plan_type'])
            .alias("rank")
        )
        .filter(pl.col('rank') <= 10)
        .select(df.columns)
    )
    
    df_pdc = (
        ln2
        .filter(pl.col('plan_type').is_in(['Part D', 'Commercial']))
        .group_by([g,'payer_id'])
        .agg(TUF = pl.col('TUF_6c').sum())
        .with_columns(plan_type = pl.lit('Part D and Commercial'))
        .with_columns(
            pl.col('TUF')
            .rank("ordinal",descending=True)
            .over([g,'plan_type'])
            .alias("rank")
        )
        .filter(pl.col('rank') <= 20)
        .select(df.columns)
    )
    
    df = df.vstack(df_total).vstack(df_pdc).sort(by = [g,'plan_type','rank']).drop(['TUF','rank'])
    
    return (df)

# Consolidating results for all Geography Levels - 
top_payers = [ 
    get_top_payers(ln2,levels[0]),
    get_top_payers(ln2,levels[1]),
    get_top_payers(ln2,levels[2]),
    get_top_payers(ln2,levels[3])
]

In [None]:
#Top 30 HCPs For a Given Geography and PlanType and Payer_ID -> #INPUT : ln2 # OUTPUT : top_hcps | needs top_payers to be in memory
def get_top_hcps(ln2,g,i):
    
    # Pick Up LN2 - >
    ln2 = (
        ln2
        .filter(product_id = 1)
        .with_columns(
            pl.lit('Total').alias('plan_type_group1'),
            pl.when(pl.col('plan_type').is_in(['Part D', 'Commercial'])).then(pl.lit('Part D and Commercial')).otherwise(None).alias('plan_type_group2')
        )
    )
    
    # Join LN2 with top_payers to limit dataset
    ln2_filter = (
        ln2.join(top_payers[i],on = [g,'plan_type','payer_id'],how = 'inner')
    )
    
    ln2_filter_t = (
        ln2.join(
            top_payers[i],
            left_on = [g,'plan_type_group1','payer_id'],
            right_on = [g,'plan_type','payer_id'],how = 'inner'
        )
    )
    
    ln2_filter_pdc = (
        ln2.join(
            top_payers[i],
            left_on = [g,'plan_type_group2','payer_id'],
            right_on = [g,'plan_type','payer_id'],how = 'inner'
        )
    )
    
    # Top 30 HCPs -
    df = (
        ln2_filter
        .group_by([g,'plan_type','payer_id','IID'])
        .agg(TUF = pl.col('TUF_6c').sum())
        .with_columns(pl.col('TUF').rank("ordinal",descending=True).over([g,'plan_type','payer_id']).alias("rank"))
        .filter(pl.col('rank') <= 30)
    )
    
    df_total = (
        ln2_filter_t
        .group_by([g,'plan_type_group1','payer_id','IID'])
        .agg(TUF = pl.col('TUF_6c').sum())
        .with_columns(pl.col('TUF').rank("ordinal",descending=True).over([g,'plan_type_group1','payer_id']).alias("rank"))
        .filter(pl.col('rank') <= 30)
        .rename({'plan_type_group1':'plan_type'})
    )
    
    df_pdc = (
        ln2_filter_pdc
        .group_by([g,'plan_type_group2','payer_id','IID'])
        .agg(TUF = pl.col('TUF_6c').sum())
        .with_columns(pl.col('TUF').rank("ordinal",descending=True).over([g,'plan_type_group2','payer_id']).alias("rank"))
        .filter(pl.col('rank') <= 30)
        .rename({'plan_type_group2':'plan_type'})
    )
    
    df = (
        df
        .vstack(df_total)
        .vstack(df_pdc)
        .sort(by = [g,'plan_type','payer_id','rank'])
        .drop(['TUF','rank'])
    )
    
    return (df)

def get_top_hcps_total(df,g):
    df = (
        df
        .filter(product_id = 1)
        .with_columns(
            pl.lit('Total').alias('plan_type_group1'),
            pl.when(pl.col('plan_type').is_in(['Part D', 'Commercial'])).then(pl.lit('Part D and Commercial')).otherwise(None).alias('plan_type_group2')
        )
    )
    df1 = (
        df
        .group_by([g,'plan_type','IID']).agg(TUF = pl.col('TUF_6c').sum())
        .with_columns(pl.col('TUF').rank("ordinal",descending=True).over([g,'plan_type']).alias("rank"))
        .filter(pl.col('rank') <= 30)
    )
    df1_total = (
        df
        .group_by([g,'plan_type_group1','IID']).agg(TUF = pl.col('TUF_6c').sum())
        .with_columns(pl.col('TUF').rank("ordinal",descending=True).over([g,'plan_type_group1']).alias("rank"))
        .filter(pl.col('rank') <= 30)
        .rename({'plan_type_group1':'plan_type'})
    )
    df1_pdc = (
        df
        .filter(pl.col('plan_type_group2').is_not_null())
        .group_by([g,'plan_type_group2','IID']).agg(TUF = pl.col('TUF_6c').sum())
        .with_columns(pl.col('TUF').rank("ordinal",descending=True).over([g,'plan_type_group2']).alias("rank"))
        .filter(pl.col('rank') <= 30)
        .rename({'plan_type_group2':'plan_type'})
    )
    df1 = (
        df1
        .vstack(df1_total)
        .vstack(df1_pdc)
        .sort(by = [g,'plan_type','rank'])
        .drop(['TUF','rank'])
        .with_columns(pl.lit(-1).cast(pl.Int64).alias('payer_id')) # Setting this to -1 instaed of 'Total' , cannot vstack without it , str wont match with payer_id of other dataset.
        .select([g,'plan_type','payer_id','IID'])
    )
    return (df1)

top_hcps = [
    get_top_hcps(ln2,levels[0],0).vstack(get_top_hcps_total(ln2,levels[0])),
    get_top_hcps(ln2,levels[1],1).vstack(get_top_hcps_total(ln2,levels[1])),
    get_top_hcps(ln2,levels[2],2).vstack(get_top_hcps_total(ln2,levels[2])),
    get_top_hcps(ln2,levels[3],3).vstack(get_top_hcps_total(ln2,levels[3]))
]

In [None]:
#adding columns to facilitate filter joins 
ln2 = (
    ln2
    .with_columns(
        pl.lit('Total').alias('plan_type_group1'),
        pl.when(pl.col('plan_type').is_in(['Part D', 'Commercial'])).then(pl.lit('Part D and Commercial')).otherwise(None).alias('plan_type_group2')
    )
)

ln2_planid = (
    ln2_planid
    .with_columns(
        pl.lit('Total').alias('plan_type_group1'),
        pl.when(pl.col('plan_type').is_in(['Part D', 'Commercial'])).then(pl.lit('Part D and Commercial')).otherwise(None).alias('plan_type_group2')
    )
)

In [None]:
# Storing data for re-use in MC -2 
[offload('top_hcps',f'top_hcps_{i}',ef = i) for i in range(4)]
[offload('top_payers',f'top_payers_{i}',ef = i) for i in range(4)]
print('Top Payers and Plans Exported !')

---

First Drill-down functions -

In [None]:
# cur_vol , pri_vol , vol_change, prc_vol_growth, vol_change_ind, cur_trx, cur_tun
def process_1():
    res = []
    for i in range(4):
        g = levels[i]
        source_df = (
            ln2
            .select([g,'plan_type','plan_type_group1','plan_type_group2','payer_id','product_id',f'TUF{period}c',f'TUF{period}p',f'TRX{period}c',f'TUN{period}c'])
            .rename({f'TUF{period}c':'cur_vol',f'TUF{period}p':'pri_vol',f'TRX{period}c':'cur_trx',f'TUN{period}c':'cur_tun'})
        )
        agg_expn = {
			'cur_vol':pl.col('cur_vol').sum(),'pri_vol':pl.col('pri_vol').sum(),
			'cur_trx':pl.col('cur_trx').sum(),'cur_tun':pl.col('cur_tun').sum()
		}
        df = (source_df.group_by([g,'plan_type','product_id']).agg(**agg_expn))
        df_t = (source_df.group_by([g,'plan_type_group1','product_id']).agg(**agg_expn).rename({'plan_type_group1' : 'plan_type'}).select(df.columns))
        df_pdc = (
            source_df.filter(pl.col('plan_type_group2').is_not_null()).group_by([g,'plan_type_group2','product_id'])
            .agg(**agg_expn).rename({'plan_type_group2' : 'plan_type'}).select(df.columns)
        )
        df = df.vstack(df_t).vstack(df_pdc)

        df = (
            df
            .with_columns(
                vol_change = pl.col('cur_vol') - pl.col('pri_vol'),
                prc_vol_growth = ((pl.col('cur_vol')/pl.col('pri_vol'))-1).replace([np.inf,np.nan],[None,None]),
                avg_trx_size = (pl.col('cur_tun')/pl.col('cur_trx')).replace([np.inf,np.nan],[None,None])
            )
            .with_columns(
                pl.when(pl.col('vol_change')/pl.col('pri_vol') > 0.02).then(pl.lit('P'))
                .when(pl.col('vol_change')/pl.col('pri_vol') < -0.02).then(pl.lit('Q'))
                .when(pl.col('vol_change')==0).then(None)
                .otherwise(None).alias('vol_change_ind')
            )
            .drop(['cur_trx','cur_tun'])
        )
        res.append(df)
    return (res)


In [None]:
# sales_dist
def process_2(df):
    for i in range(4):
        g = levels[i]
        f = df[i]
        f_total = (
            f
            .filter(pl.col('plan_type')=='Total').select([g,'product_id','cur_vol'])
            .rename({'cur_vol':'Total_cur_vol'})
        )
        f = (
            f
            .join(f_total,on = [g,'product_id'],how='left')
            .with_columns((pl.col('cur_vol')/pl.col('Total_cur_vol')).replace(np.nan,0).alias('sales_dist'))
            .drop('Total_cur_vol')
        )
        df[i] = f
    return (df)

In [None]:
# sales_dist_bnch
def process_3(df):
    for i in range(4):
        g = levels[i]
        f = df[i]
        if i != 3:
            f_parent = (
                df[i+1]
                .select([levels[i+1],'plan_type','product_id','sales_dist'])
                .rename({'sales_dist':'sales_dist_bnch'})
            )
            f = (
                f
                .join(geo_code_mapper.select(levels[i],levels[i+1]).unique(),on = g , how='left')
                .join(f_parent, on = [levels[i+1],'plan_type','product_id'],how = 'left')
                .drop(levels[i+1])
            )
        else:
            f = (
                f
                .with_columns(sales_dist_bnch = pl.col('sales_dist'))
            )
        df[i] = f
    return (df)

In [None]:
# prc_vol_growth_bnch
def process_4(df):
    for i in range(4):
        g = levels[i]
        f = df[i]
        # for terr ->
        f_region = (
            df[1].select([levels[1],'plan_type','product_id','prc_vol_growth']).rename({'prc_vol_growth':'prc_vol_growth_bnch'})
        )
        # for Region, Area ->
        f_nation = (
            df[3].select([levels[3],'plan_type','product_id','prc_vol_growth']).rename({'prc_vol_growth':'prc_vol_growth_bnch'})
        )

        if i == 0:
            f = (
                f
                .join(geo_code_mapper.select(levels[i],levels[i+1]).unique(),on = g , how='left')
                .join(f_region, on = [levels[i+1],'plan_type','product_id'],how = 'left')
                .drop(levels[i+1])
            )
        elif (( i==1 ) | (i ==2)):
            f = (
                f
                .join(geo_code_mapper.select(levels[i],levels[3]).unique(),on = g , how='left')
                .join(f_nation, on = [levels[3],'plan_type','product_id'],how = 'left')
                .drop(levels[3])
            )
        else:
            f = (
                f
                .with_columns(prc_vol_growth_bnch = pl.col('prc_vol_growth'))
            )
        df[i] = f

    return (df)

In [None]:
# cur_shr, pri_shr, shr_change, prc_shr_growth, shr_change_ind
def process_5(df):
    for i in range(4):
        g = levels[i]
        f = df[i]
        f_ibsc = (
            f
            .filter(product_id = 1)
            .select([g,'plan_type','cur_vol','pri_vol'])
            .rename({'cur_vol':'lax_cur_vol','pri_vol':'lax_pri_vol'})
        )
        f = (
            f
            .join(f_ibsc,on = [g,'plan_type'],how = 'left')
            .with_columns(
                (pl.col('cur_vol') / pl.col('lax_cur_vol')).alias('cur_shr'),
                (pl.col('pri_vol') / pl.col('lax_pri_vol')).alias('pri_shr')
            )
            .with_columns(
                shr_change = pl.col('cur_shr') - pl.col('pri_shr'),
                prc_shr_growth = ((pl.col('cur_shr')/pl.col('pri_shr'))-1).replace([np.inf,np.nan],[None,None])
            )
            .with_columns(
                pl.when(pl.col('shr_change')/pl.col('pri_shr') > 0.02).then(pl.lit('P'))
                .when(pl.col('shr_change')/pl.col('pri_shr') < -0.02).then(pl.lit('Q'))
                .when(pl.col('shr_change')==0).then(None)
                .otherwise(None).alias('shr_change_ind')
            )
            .drop(['lax_cur_vol','lax_pri_vol'])
        )
        df[i] = f
    return (df)

In [None]:
# prc_shr_growth_bnch
def process_6(df):
    for i in range(4):
        g = levels[i]
        f = df[i]
        # for terr ->
        f_region = (
            df[1].select([levels[1],'plan_type','product_id','prc_shr_growth']).rename({'prc_shr_growth':'prc_shr_growth_bnch'})
        )
        # for Region, Area ->
        f_nation = (
            df[3].select([levels[3],'plan_type','product_id','prc_shr_growth']).rename({'prc_shr_growth':'prc_shr_growth_bnch'})
        )

        if i == 0:
            f = (
                f
                .join(geo_code_mapper.select(levels[i],levels[i+1]).unique(),on = g , how='left')
                .join(f_region, on = [levels[i+1],'plan_type','product_id'],how = 'left')
                .drop(levels[i+1])
            )
        elif (( i==1 ) | (i ==2)):
            f = (
                f
                .join(geo_code_mapper.select(levels[i],levels[3]).unique(),on = g , how='left')
                .join(f_nation, on = [levels[3],'plan_type','product_id'],how = 'left')
                .drop(levels[3])
            )
        else:
            f = (
                f
                .with_columns(prc_shr_growth_bnch = pl.col('prc_shr_growth'))
            )
        df[i] = f

    return (df)

In [None]:
# prc_vol_growth_ind ,prc_shr_growth_ind
def process_7(df):
    for i in range(4):
        g = levels[i]
        f = df[i]
        f = (
            f
            .with_columns(
                pl.when((pl.col('prc_vol_growth') >  pl.col('prc_vol_growth_bnch'))).then(pl.lit('L')).otherwise(pl.lit('\\N')).alias('prc_vol_growth_ind'),
                pl.when((pl.col('prc_shr_growth') >  pl.col('prc_shr_growth_bnch'))).then(pl.lit('L')).otherwise(pl.lit('\\N')).alias('prc_shr_growth_ind')
            )
        )
        df[i] = f
    return (df)

In [None]:
# For Payer Access
def process_8(df):
    for i in range(4):
        g = levels[i]
        f = df[i]

        source_df = (
            ln2_planid
            .select(['PlanID','payer_id','product_id',g,'plan_class','plan_type','plan_type_group1','plan_type_group2',f'TUF{period}c'])
            .rename({f'TUF{period}c':'cur_vol'})
        )
        agg_expn = {'cur_vol':pl.col('cur_vol').sum()}

        source_df1 = (
            source_df
            .group_by([g,'plan_type','plan_class','product_id'])
            .agg(**agg_expn)
            .pivot(columns = 'plan_class',index = [g,'plan_type','product_id'],values = 'cur_vol',aggregate_function = 'sum',maintain_order = True)
        )

        source_df1_t = (
            source_df
            .group_by([g,'plan_type_group1','plan_class','product_id']).agg(**agg_expn).rename({'plan_type_group1' : 'plan_type'})
            .pivot(columns = 'plan_class',index = [g,'plan_type','product_id'],values = 'cur_vol',aggregate_function = 'sum',maintain_order = True)
            .select(source_df1.columns)
        )

        source_df1_pdc = (
            source_df.filter(pl.col('plan_type_group2').is_not_null())
            .group_by([g,'plan_type_group2','plan_class','product_id']).agg(**agg_expn).rename({'plan_type_group2' : 'plan_type'})
            .pivot(columns = 'plan_class',index = [g,'plan_type','product_id'],values = 'cur_vol',aggregate_function = 'sum',maintain_order = True)
            .select(source_df1.columns)
        )

        source_df1 = source_df1.vstack(source_df1_t).vstack(source_df1_pdc).fill_null(0)

        f = f.join(source_df1,on=[g,'plan_type','product_id'],how='left')
        df[i] = f
    return (df)

Second Drill-down functions -

In [None]:
# cur_vol , pri_vol , vol_change, prc_vol_growth, vol_change_ind, cur_trx, cur_tun
def process_9():
    res = []
    for i in range(4):
        g = levels[i]
        source_df = (
            ln2
            .select([g,'plan_type','plan_type_group1','plan_type_group2','payer_id','product_id',f'TUF{period}c',f'TUF{period}p',f'TRX{period}c',f'TUN{period}c'])
            .rename({f'TUF{period}c':'cur_vol',f'TUF{period}p':'pri_vol',f'TRX{period}c':'cur_trx',f'TUN{period}c':'cur_tun'})
        )
        agg_expn = {
			'cur_vol':pl.col('cur_vol').sum(),'pri_vol':pl.col('pri_vol').sum(),
			'cur_trx':pl.col('cur_trx').sum(),'cur_tun':pl.col('cur_tun').sum()
		}
        df = (
            source_df
            .join(top_payers[i],on = [g,'plan_type','payer_id'],how = 'inner')
            .drop(['plan_type_group1','plan_type_group2'])
            .group_by([g,'plan_type','product_id','payer_id']).agg(**agg_expn)
        )
        df_t = (
            source_df
            .join(
                top_payers[i].filter(plan_type = 'Total'),
                left_on = [g,'plan_type_group1','payer_id'],right_on = [g,'plan_type','payer_id'],how = 'inner'
            )
            .drop(['plan_type_group2','plan_type']).rename({'plan_type_group1' : 'plan_type'}).select(df.columns)
            .group_by([g,'plan_type','product_id','payer_id']).agg(**agg_expn)
        )
        df_pdc = (
            source_df
            .join(
                top_payers[i].filter(plan_type = 'Part D and Commercial'),
                left_on = [g,'plan_type_group2','payer_id'],right_on = [g,'plan_type','payer_id'],how = 'inner'
            )
            .drop(['plan_type_group1','plan_type']).rename({'plan_type_group2' : 'plan_type'}).select(df.columns)
            .group_by([g,'plan_type','product_id','payer_id']).agg(**agg_expn)
        )
        df = df.vstack(df_t).vstack(df_pdc)
        df = (
            df
            .with_columns(
                vol_change = pl.col('cur_vol') - pl.col('pri_vol'),
                prc_vol_growth = ((pl.col('cur_vol')/pl.col('pri_vol'))-1).replace([np.inf,np.nan],[None,None]),
                avg_trx_size = (pl.col('cur_tun')/pl.col('cur_trx')).replace([np.inf,np.nan],[None,None])
            )
            .with_columns(
                pl.when(pl.col('vol_change')/pl.col('pri_vol') > 0.02).then(pl.lit('P'))
                .when(pl.col('vol_change')/pl.col('pri_vol') < -0.02).then(pl.lit('Q'))
                .when(pl.col('vol_change')==0).then(None)
                .otherwise(None).alias('vol_change_ind')
            )
            .drop(['cur_trx','cur_tun'])
        )

        res.append(df)
    return (res)

In [None]:
# sales_dist , sales_dist_bnch ,prc_vol_growth_bnch ,prc_shr_growth_bnch
def process_10(df):
    for i in range(4):
        g = levels[i]
        f = df[i]

        source_df = (
            temp1[i].filter(plan_type = 'Total').select([g,'product_id','cur_vol']).rename({'cur_vol':'Total_cur_vol'})
        )
        source_df2 = (
            temp1[i].select([g,'plan_type','product_id','sales_dist','prc_vol_growth','prc_shr_growth'])
            .rename({'sales_dist':'sales_dist_bnch','prc_vol_growth':'prc_vol_growth_bnch','prc_shr_growth':'prc_shr_growth_bnch'})
        )
        f = (
            f
            .join(source_df,on = [g,'product_id'],how='left')
            .join(source_df2,on = [g,'plan_type','product_id'],how='left')
            .with_columns((pl.col('cur_vol')/pl.col('Total_cur_vol')).replace(np.nan,0).alias('sales_dist'))
            .drop('Total_cur_vol')
        )
        df[i] = f
    return (df)

In [None]:
# cur_shr, pri_shr, shr_change, prc_shr_growth, shr_change_ind
def process_11(df):
    for i in range(4):
        g = levels[i]
        f = df[i]
        f_ibsc = (
            f
            .filter(product_id = 1)
            .select([g,'plan_type','payer_id','cur_vol','pri_vol'])
            .rename({'cur_vol':'lax_cur_vol','pri_vol':'lax_pri_vol'})
        )
        f = (
            f
            .join(f_ibsc,on = [g,'plan_type','payer_id'],how = 'left')
            .with_columns(
                (pl.col('cur_vol') / pl.col('lax_cur_vol')).alias('cur_shr'),
                (pl.col('pri_vol') / pl.col('lax_pri_vol')).alias('pri_shr')
            )
            .with_columns(
                shr_change = pl.col('cur_shr') - pl.col('pri_shr'),
                prc_shr_growth = ((pl.col('cur_shr')/pl.col('pri_shr'))-1).replace([np.inf,np.nan],[None,None])
            )
            .with_columns(
                pl.when(pl.col('shr_change')/pl.col('pri_shr') > 0.02).then(pl.lit('P'))
                .when(pl.col('shr_change')/pl.col('pri_shr') < -0.02).then(pl.lit('Q'))
                .when(pl.col('shr_change')==0).then(None)
                .otherwise(None).alias('shr_change_ind')
            )
            .drop(['lax_cur_vol','lax_pri_vol'])
        )
        df[i] = f
    return (df)

In [None]:
# For Payer Acess -
def process_12(df):
    for i in range(4):
        g = levels[i]
        f = df[i]
        source_df = (
            ln2_planid
            .select(['PlanID','payer_id','product_id',g,'plan_class','plan_type','plan_type_group1','plan_type_group2',f'TUF{period}c'])
            .rename({f'TUF{period}c':'cur_vol'})
        )
        agg_expn = {'cur_vol':pl.col('cur_vol').sum()}
        source_df1 = (
            source_df
            .join(top_payers[i],on = [g,'plan_type','payer_id'],how = 'inner')
            .drop(['plan_type_group1','plan_type_group2'])
            .group_by([g,'plan_type','product_id','plan_class','payer_id']).agg(**agg_expn)
            .pivot(columns = 'plan_class',index = [g,'plan_type','product_id','payer_id'],values = 'cur_vol',aggregate_function = 'sum',maintain_order = True)
        )
        source_df1_t = (
            source_df
            .join(top_payers[i],left_on = [g,'plan_type_group1','payer_id'],right_on = [g,'plan_type','payer_id'],how = 'inner')
            .drop(['plan_type','plan_type_group2']).rename({'plan_type_group1' : 'plan_type'})
            .group_by([g,'plan_type','product_id','plan_class','payer_id']).agg(**agg_expn)
            .pivot(columns = 'plan_class',index = [g,'plan_type','product_id','payer_id'],values = 'cur_vol',aggregate_function = 'sum',maintain_order = True)
            .select(source_df1.columns)
        )
        source_df1_pdc = (
            source_df
            .filter(pl.col('plan_type_group2').is_not_null())
            .join(top_payers[i],left_on = [g,'plan_type_group2','payer_id'],right_on = [g,'plan_type','payer_id'],how = 'inner')
            .drop(['plan_type','plan_type_group1']).rename({'plan_type_group2' : 'plan_type'})
            .group_by([g,'plan_type','product_id','plan_class','payer_id']).agg(**agg_expn)
            .pivot(columns = 'plan_class',index = [g,'plan_type','product_id','payer_id'],values = 'cur_vol',aggregate_function = 'sum',maintain_order = True)
            .select(source_df1.columns)
        )
        source_df1 = source_df1.vstack(source_df1_t).vstack(source_df1_pdc).fill_null(0)
        f = f.join(source_df1,on=[g,'plan_type','product_id','payer_id'],how='left')
        df[i] = f
    return (df)

Third Drill down Functions -

In [None]:
 # cur_vol , pri_vol , vol_change, prc_vol_growth, vol_change_ind, cur_trx, cur_tun , avg_trx_size
def process_13():
    res = []
    for i in range(4):
        g = levels[i]
        source_df = (
            ln2
            .select([g,'plan_type','plan_type_group1','plan_type_group2','payer_id','IID','product_id',f'TUF{period}c',f'TUF{period}p',f'TRX{period}c',f'TUN{period}c'])
            .rename({f'TUF{period}c':'cur_vol',f'TUF{period}p':'pri_vol',f'TRX{period}c':'cur_trx',f'TUN{period}c':'cur_tun'})
            .with_columns(pl.lit(-1).cast(pl.Int64).alias('payer_id_group1'))
        )

        agg_expn = {
            'cur_vol':pl.col('cur_vol').sum(),'pri_vol':pl.col('pri_vol').sum(),
            'cur_trx':pl.col('cur_trx').sum(),'cur_tun':pl.col('cur_tun').sum()
        }

        # Limit data to keep just top 30 HCPs 
        # For 4 main plan_type
        # for top 10 Payers
        df_1a = (
            source_df
            .join(top_hcps[i],on = [g,'plan_type','payer_id','IID'], how='inner')
            .drop(['plan_type_group1','plan_type_group2'])
            .group_by([g,'product_id','plan_type','payer_id','IID']).agg(**agg_expn)
        )
        # For 'total' payer_id
        df_1b = (
            source_df
            .join(top_hcps[i],left_on = [g,'plan_type','payer_id_group1','IID'],right_on = [g,'plan_type','payer_id','IID'], how='inner')
            .drop(['plan_type_group1','plan_type_group2','payer_id'])
            .rename({'payer_id_group1':'payer_id'})
            .group_by([g,'product_id','plan_type','payer_id','IID']).agg(**agg_expn)
        )
        df_1 = df_1a.vstack(df_1b)

        # For plan_type = 'Total'
        # For top 10 Payers
        df_2a = (
            source_df
            .join(top_hcps[i],left_on = [g,'plan_type_group1','payer_id','IID'],right_on = [g,'plan_type','payer_id','IID'],how='inner')
            .drop(['plan_type_group2','plan_type']).rename({'plan_type_group1' : 'plan_type'})
            .group_by([g,'product_id','plan_type','payer_id','IID']).agg(**agg_expn)
        )

        # For 'total' payer_id
        df_2b = (
            source_df
            .join(top_hcps[i],left_on = [g,'plan_type_group1','payer_id_group1','IID'],right_on = [g,'plan_type','payer_id','IID'],how='inner')
            .drop(['plan_type_group2','plan_type','payer_id']).rename({'plan_type_group1' : 'plan_type','payer_id_group1':'payer_id'})
            .group_by([g,'product_id','plan_type','payer_id','IID']).agg(**agg_expn)
        )
        df_2 = df_2a.vstack(df_2b)

        # For plan_type = 'Part D and Com'
        # For top 10 Payers
        df_3a = (
            source_df
            .join(top_hcps[i],left_on = [g,'plan_type_group2','payer_id','IID'],right_on = [g,'plan_type','payer_id','IID'],how='inner')
            .drop(['plan_type_group1','plan_type']).rename({'plan_type_group2' : 'plan_type'})
            .group_by([g,'product_id','plan_type','payer_id','IID']).agg(**agg_expn)
        )
        # For 'total' payer_id
        df_3b = (
            source_df
            .filter(pl.col('plan_type_group2').is_not_null())
            .join(top_hcps[i],left_on = [g,'plan_type_group2','payer_id_group1','IID'],right_on = [g,'plan_type','payer_id','IID'],how='inner')
            .drop(['plan_type_group1','plan_type','payer_id']).rename({'plan_type_group2' : 'plan_type','payer_id_group1':'payer_id'})
            .group_by([g,'product_id','plan_type','payer_id','IID']).agg(**agg_expn)
        )
        df_3 = df_3a.vstack(df_3b)
        df_4 = df_1.vstack(df_2).vstack(df_3)
        #######################################################################
        df = (
            df_4
            .with_columns(
                vol_change = pl.col('cur_vol') - pl.col('pri_vol'),
                prc_vol_growth = ((pl.col('cur_vol')/pl.col('pri_vol'))-1).replace([np.inf,np.nan],[None,None]),
                avg_trx_size = (pl.col('cur_tun')/pl.col('cur_trx')).replace([np.inf,np.nan],[None,None])
            )
            .with_columns(
                pl.when(pl.col('vol_change')/pl.col('pri_vol') > 0.02).then(pl.lit('P'))
                .when(pl.col('vol_change')/pl.col('pri_vol') < -0.02).then(pl.lit('Q'))
                .when(pl.col('vol_change')==0).then(None)
                .otherwise(None).alias('vol_change_ind')
            )
            .drop(['cur_trx','cur_tun'])
        )
        res.append(df)
    return (res)

In [None]:
# sales_dist , sales_dist_bnch ,prc_vol_growth_bnch ,prc_shr_growth_bnch
def process_14(df):
    for i in range(4):
        p,pt,pi,g = 'product_id','plan_type','payer_id',levels[i]
        f = df[i]
        source_df1 = (
            temp2[i].select([g,p,pt,pi,'cur_vol','sales_dist','prc_vol_growth','prc_shr_growth'])
            .vstack(
                temp1[i].with_columns(pl.lit(-1).cast(pl.Int64).alias(pi)).select([g,p,pt,pi,'cur_vol','sales_dist','prc_vol_growth','prc_shr_growth'])
            )
            .rename({
                'cur_vol':'Total_cur_vol','sales_dist':'sales_dist_bnch','prc_vol_growth':'prc_vol_growth_bnch',
                'prc_shr_growth':'prc_shr_growth_bnch'
            })
        )
        f = (
            f
            .join(source_df1,on =[g,p,pt,pi],how='left')
            .with_columns((pl.col('cur_vol')/pl.col('Total_cur_vol')).replace(np.nan,0).alias('sales_dist'))
            .drop('Total_cur_vol')
        )
        df[i] = f
    return (df)

In [None]:
# cur_shr, pri_shr, shr_change, prc_shr_growth, shr_change_ind
def process_15(df):
    for i in range(4):
        p,pt,pi,g = 'product_id','plan_type','payer_id',levels[i]
        f = df[i]
        f_ibsc = (
            f
            .filter(product_id = 1)
            .select([g,'plan_type','payer_id','IID','cur_vol','pri_vol'])
            .rename({'cur_vol':'lax_cur_vol','pri_vol':'lax_pri_vol'})
        )
        f = (
            f
            .join(f_ibsc,on = [g,'plan_type','payer_id','IID'],how = 'left')
            .with_columns(
                (pl.col('cur_vol') / pl.col('lax_cur_vol')).alias('cur_shr'),
                (pl.col('pri_vol') / pl.col('lax_pri_vol')).alias('pri_shr')
            )
            .with_columns(
                shr_change = pl.col('cur_shr') - pl.col('pri_shr'),
                prc_shr_growth = ((pl.col('cur_shr')/pl.col('pri_shr'))-1).replace([np.inf,np.nan],[None,None])
            )
            .with_columns(
                pl.when(pl.col('shr_change')/pl.col('pri_shr') > 0.02).then(pl.lit('P'))
                .when(pl.col('shr_change')/pl.col('pri_shr') < -0.02).then(pl.lit('Q'))
                .when(pl.col('shr_change')==0).then(None)
                .otherwise(None).alias('shr_change_ind')
            )
            .drop(['lax_cur_vol','lax_pri_vol'])
        )
        df[i] = f
    return(df)

In [None]:
# For Payer Access - 
def process_16(df):
    for i in range(4):
        p,pt,pi,g = 'product_id','plan_type','payer_id',levels[i]
        f = df[i]
        source_df = (
            ln2_planid
            .select(['IID','PlanID','payer_id','product_id',g,'plan_class','plan_type','plan_type_group1','plan_type_group2',f'TUF{period}c'])
            .rename({f'TUF{period}c':'cur_vol'})
            .with_columns(pl.lit(-1).cast(pl.Int64).alias('payer_id_group1'))
        )
        agg_expn = {'cur_vol':pl.col('cur_vol').sum()}
        df_1a = (
            source_df
            .join(top_hcps[i],on = [g,'plan_type','payer_id','IID'], how='inner')
            .drop(['plan_type_group1','plan_type_group2'])
            .group_by([g,'plan_type','product_id','plan_class','payer_id','IID']).agg(**agg_expn)
            .pivot(columns = 'plan_class',index = [g,'plan_type','product_id','payer_id','IID'],values = 'cur_vol',aggregate_function = 'sum',maintain_order = True)
        )
        df_1b = (
            source_df
            .join(top_hcps[i],left_on = [g,'plan_type','payer_id_group1','IID'],right_on = [g,'plan_type','payer_id','IID'], how='inner')
            .drop(['plan_type_group1','plan_type_group2','payer_id'])
            .rename({'payer_id_group1':'payer_id'})
            .group_by([g,'plan_type','product_id','plan_class','payer_id','IID']).agg(**agg_expn)
            .pivot(columns = 'plan_class',index = [g,'plan_type','product_id','payer_id','IID'],values = 'cur_vol',aggregate_function = 'sum',maintain_order = True)
            .select(df_1a.columns)
        )
        df_1 = df_1a.vstack(df_1b)
        
        df_2a = (
            source_df
            .join(top_hcps[i],left_on = [g,'plan_type_group1','payer_id','IID'],right_on = [g,'plan_type','payer_id','IID'],how='inner')
            .drop(['plan_type_group2','plan_type']).rename({'plan_type_group1' : 'plan_type'})
            .group_by([g,'plan_type','product_id','plan_class','payer_id','IID']).agg(**agg_expn)
            .pivot(columns = 'plan_class',index = [g,'plan_type','product_id','payer_id','IID'],values = 'cur_vol',aggregate_function = 'sum',maintain_order = True)
        )
        df_2b = (
            source_df
            .join(top_hcps[i],left_on = [g,'plan_type_group1','payer_id_group1','IID'],right_on = [g,'plan_type','payer_id','IID'],how='inner')
            .drop(['plan_type_group2','plan_type','payer_id']).rename({'plan_type_group1' : 'plan_type','payer_id_group1':'payer_id'})
            .group_by([g,'plan_type','product_id','plan_class','payer_id','IID']).agg(**agg_expn)
            .pivot(columns = 'plan_class',index = [g,'plan_type','product_id','payer_id','IID'],values = 'cur_vol',aggregate_function = 'sum',maintain_order = True)
            .select(df_2a.columns)
        )
        df_2 = df_2a.vstack(df_2b)
        df_3a = (
            source_df
            .join(top_hcps[i],left_on = [g,'plan_type_group2','payer_id','IID'],right_on = [g,'plan_type','payer_id','IID'],how='inner')
            .drop(['plan_type_group1','plan_type']).rename({'plan_type_group2' : 'plan_type'})
            .group_by([g,'plan_type','product_id','plan_class','payer_id','IID']).agg(**agg_expn)
            .pivot(columns = 'plan_class',index = [g,'plan_type','product_id','payer_id','IID'],values = 'cur_vol',aggregate_function = 'sum',maintain_order = True)
        )
        df_3b = (
            source_df
            .filter(pl.col('plan_type_group2').is_not_null())
            .join(top_hcps[i],left_on = [g,'plan_type_group2','payer_id_group1','IID'],right_on = [g,'plan_type','payer_id','IID'],how='inner')
            .drop(['plan_type_group1','plan_type','payer_id']).rename({'plan_type_group2' : 'plan_type','payer_id_group1':'payer_id'})
            .group_by([g,'plan_type','product_id','plan_class','payer_id','IID']).agg(**agg_expn)
            .pivot(columns = 'plan_class',index = [g,'plan_type','product_id','payer_id','IID'],values = 'cur_vol',aggregate_function = 'sum',maintain_order = True)
            .select(df_3a.columns)
        )
        df_3 = df_3a.vstack(df_3b)
        df_4 = df_1.vstack(df_2.select(df_1.columns)).vstack(df_3.select(df_1.columns)).fill_null(0)
        
        #######################################################################
        
        f = f.join(df_4,on=[g,'plan_type','product_id','payer_id','IID'],how='left').fill_null(0)
        df[i] = f
    return (df)

In [None]:
# For Payer Mix -
def process_17(df):
    col_expn = []
    for c in ['Part D','Mgd Medicaid','Commercial','Cash','FFS','Others']:
        expression = (pl.col(c)/pl.col('total')).replace(np.nan,0).alias(f'prc_{c}')
        col_expn.append(expression)
    iid_pmix_data = (
        ln2_pmix
        .select(['IID','product_id','plan_type',f'TUF{period}c'])
        .rename({f'TUF{period}c':'TUF'})
        .group_by(['IID','product_id','plan_type']).agg(TUF = pl.col('TUF').sum())
        .pivot(
            values = 'TUF',index = ['IID','product_id'],columns = 'plan_type',aggregate_function='sum',maintain_order=True
        ).fill_null(0)
        .with_columns(pl.sum_horizontal(['Part D','Mgd Medicaid','Commercial','Cash','FFS','Others','Voucher']).alias('total'))
        .with_columns(*col_expn)
        .drop(['Part D','Mgd Medicaid','Commercial','Cash','FFS','Others','Voucher','total'])
    )
    for i in range(4):
        f = df[i]
        f = (f.join(iid_pmix_data,on = ['IID','product_id'],how = 'left'))
        df[i] = f
    return (df)

Miscellaneous Functions -

In [None]:
# inputs : temp1, temp2, temp3 # output : temp4 -> all levels combined , all geos levels also combined 
def dataframe_reorg_util():
    temp1_full = []
    temp2_full = []
    temp3_full = []
    # For Layer 1 ->
    for i in range(4):
        column_order = temp3[i].columns
        f = temp1[i]
        f = (
            f
            .vstack(
                f.filter(plan_type = 'Total').with_columns(pl.lit('\\N').alias('plan_type'))
            )
            .with_columns(*[pl.lit('\\N').alias(c) for c in ['IID','payer_id','prc_Cash','prc_Commercial','prc_FFS','prc_Mgd Medicaid','prc_Others','prc_Part D'] ])
            .select(column_order)
        )
        temp1_full.append(f)
    
    cd = [[],[],[],[]] # For Matching dtype for vstack
    for i in range(4):
        for c,t in zip(temp1_full[i].columns,temp1_full[0].dtypes):
            expression = pl.col(c).cast(t).alias(c)
            cd[i].append(expression)
    
    # For Layer 2->
    for i in range(4):
        column_order = temp3[i].columns
        f = temp2[i]
        f = (
            f
            .vstack(
                temp1[i].with_columns(pl.lit(-1).cast(pl.Int64).alias('payer_id')).select(temp2[i].columns)
            )
            .with_columns(*[pl.lit('\\N').alias(c) for c in ['IID','prc_Cash','prc_Commercial','prc_FFS','prc_Mgd Medicaid','prc_Others','prc_Part D'] ])
            .select(column_order)
            .with_columns(*cd[i])
        )
        temp2_full.append(f)
    
    # For Layer 3- >
    for i in range(4):
        f = temp3[i]
        f = f.with_columns(*cd[i])
        temp3_full.append(f)
    
    temp_main = []
    for i in range(4):
        temp_main.append(
            temp1_full[i]
            .vstack(temp2_full[i])
            .vstack(temp3_full[i])
        )
    temp_final = (
        temp_main[0]
        .vstack(temp_main[1].rename({levels[1]:levels[0]}).select(temp_main[0].columns))
        .vstack(temp_main[2].rename({levels[2]:levels[0]}).select(temp_main[0].columns))
        .vstack(temp_main[3].rename({levels[3]:levels[0]}).select(temp_main[0].columns))
    )
    return(temp_final)

In [None]:
# Feed Creation - >
def get_feed(df):
    rename_mapping = {'geography_id' : 'GEOGRAPHY_ID',
    'product_id' : 'PRODUCT_ID',
    'plan_type' : 'PAYERTYPE',
    'payer_id' : 'PAYER_ID',
    'IID' : 'PHYSICIAN_ID',
    'cur_vol' : 'CURRENT_VOL',
    'pri_vol' : 'PRIOR_VOL',
    'vol_change' : 'VOL_CHANGE',
    'prc_vol_growth' : 'PRC_VOL_GROWTH',
    'avg_trx_size' : 'AVG_TRX_SIZE',
    'vol_change_ind' : 'VOL_CHANGE_IND',
    'sales_dist_bnch' : 'SALES_DISTRIBUTION_BENCHMARK',
    'prc_vol_growth_bnch' : 'PRC_BENCHMARK_VOL_GROWTH',
    'prc_shr_growth_bnch' : 'PRC_BENCHMARK_SHR_GROWTH',
    'sales_dist' : 'SALES_DISTRIBUTION',
    'cur_shr' : 'CURRENT_SHR',
    'pri_shr' : 'PRIOR_SHR',
    'shr_change' : 'SHR_CHANGE',
    'prc_shr_growth' : 'PRC_SHR_GROWTH',
    'shr_change_ind' : 'SHR_CHANGE_IND',
    'prc_vol_growth_ind' : 'VOL_GROWTH_IND',
    'prc_shr_growth_ind' : 'SHR_GROWTH_IND',
    'PREFERRED' : 'PREFERRED',
    'COVERED' : 'COVERED',
    'N_A' : 'NOT_AVAILABLE',
    'NOT COVERED' : 'NOT_COVERED',
    'prc_Part D' : 'MEDICARE_PART_D',
    'prc_Mgd Medicaid' : 'MANAGED_MEDICAID',
    'prc_Commercial' : 'COMMERCIAL',
    'prc_Cash' : 'CASH',
    'prc_FFS' : 'FFS',
    'prc_Others' : 'OTHER'}
    
    export_order = ['PRODUCT_ID','GEOGRAPHY_ID','PAYERTYPE','PAYER_ID','PHYSICIAN_ID','REPORTTYPE','PERIOD','PAYER_NAME',
    'PHYSICIAN_NAME','SALES_DISTRIBUTION','SALES_DISTRIBUTION_BENCHMARK','CURRENT_VOL','PRIOR_VOL','VOL_CHANGE',
    'VOL_CHANGE_IND','PRC_VOL_GROWTH','PRC_BENCHMARK_VOL_GROWTH','VOL_GROWTH_IND','CURRENT_SHR',
    'PRIOR_SHR','SHR_CHANGE','SHR_CHANGE_IND','PRC_SHR_GROWTH','PRC_BENCHMARK_SHR_GROWTH','SHR_GROWTH_IND',
    'PREFERRED','COVERED','NOT_COVERED','NOT_AVAILABLE','COMMERCIAL','MEDICARE_PART_D','MANAGED_MEDICAID',
    'FFS','CASH','OTHER','PREFERRED_MARKET_ACCESS','PREFERRED_MARKET_SHARE','AVG_TRX_SIZE','COVERED_PA_ST',
    'UNKNOWN','NOT_APPLICABLE']
    
    mp = (
        MASTER_UNI
        .with_columns(pl.concat_str([pl.col('FirstName'),pl.col('LastName')],separator=' ',ignore_nulls=True).alias('PHYSICIAN_NAME'))
        .select(['IID','PHYSICIAN_NAME']).rename({'IID':'PHYSICIAN_ID'})
        .with_columns(pl.col('PHYSICIAN_ID').cast(pl.Utf8))
    )
    
    df = (
        df
        .rename(rename_mapping) # Getting Feed Column Names
        .with_columns(pl.col('PAYER_ID').replace('-1','TOTAL')) # Fixing payer_id for total rows in layer 2 (coud not do before for vstack purposes)
        .with_columns(
            pl.when(pl.col('PAYERTYPE')=='Others').then(pl.lit('OTHERS'))
            .when(pl.col('PAYERTYPE')=='Commercial').then(pl.lit('COMMERCIAL'))
            .when(pl.col('PAYERTYPE')=='Cash').then(pl.lit('CASH'))
            .when(pl.col('PAYERTYPE')=='Total').then(pl.lit('TOTAL'))
            .when(pl.col('PAYERTYPE')=='Part D').then(pl.lit('PART D'))
            .when(pl.col('PAYERTYPE')=='Part D and Commercial').then(pl.lit('PARTANDCOM'))
            .otherwise(pl.col('PAYERTYPE'))
            .alias('PAYERTYPE')
        )# Fixing Payertype Values to match sas feed.
        .with_columns(
            REPORTTYPE = pl.lit('MONTHLY'),
            PERIOD = pl.lit(period_col_values[period_num])
        )# Adding Report Type and Period Column
        .join(payer_names.with_columns(pl.col('payer_id').cast(pl.Utf8).alias('PAYER_ID')).rename({'IRWD_FGN_NAME':'PAYER_NAME'}),on = 'PAYER_ID',how='left') # Adding Payer Names Back. (IRWD FGN NAME)
        .with_columns(pl.when(pl.col('PAYER_ID')=='TOTAL').then(pl.lit('Total')).otherwise(pl.col('PAYER_NAME')).alias('PAYER_NAME')) # Accounting a payer name for 'total' row in layer 2
        .join(mp,on='PHYSICIAN_ID',how='left').with_columns(pl.col('PHYSICIAN_NAME').fill_null('\\N')) # Adding HCP name
        .with_columns(*[pl.lit('\\N').alias(c) for c in ['PREFERRED_MARKET_ACCESS','PREFERRED_MARKET_SHARE','COVERED_PA_ST','UNKNOWN','NOT_APPLICABLE']]) # Adding Blank Columns (Not calcualted)
        .select(export_order) # Resetting Table Sequence
    )
    return (df)

## Period Loop

In [None]:
# MAIN EXECUTIVE LOOP - >
period_col_values = {1:'1-MONTH',3:'3-MONTHS',6:'6-MONTHS',12:'12-MONTHS','qtd':'QTD','ytd':'YTD'}
for period_num,PN in zip([1,3,6,12,'qtd','ytd'],[1,2,3,4,5,7]):
    period = f'_{period_num}'
    # LAYER 1
    temp1 = process_1() # cur_vol , pri_vol , vol_change, prc_vol_growth, vol_change_ind, cur_trx, cur_tun , avg_trx_size
    temp1 = process_2(temp1) # sales dist
    temp1 = process_3(temp1) # sales dist_bnch
    temp1 = process_4(temp1) # prc_vol_growth_bnch
    temp1 = process_5(temp1)# cur_shr, pri_shr, shr_change, prc_shr_growth, shr_change_ind
    temp1 = process_6(temp1) # prc_shr_growth_bnch
    temp1 = process_7(temp1)# prc_vol_growth_ind ,prc_shr_growth_ind
    temp1 = process_8(temp1)# Payer Access

    # LAYER 2
    temp2 = process_9() # cur_vol , pri_vol , vol_change, prc_vol_growth, vol_change_ind, cur_trx, cur_tun , avg_trx_size
    temp2 = process_10(temp2) # sales_dist , sales_dist_bnch ,prc_vol_growth_bnch ,prc_shr_growth_bnch
    temp2 = process_11(temp2) # cur_shr, pri_shr, shr_change, prc_shr_growth, shr_change_ind
    temp2 = process_7(temp2) # prc_vol_growth_ind ,prc_shr_growth_ind
    temp2 = process_12(temp2) # Payer Acess -

    # LAYER 3
    temp3 = process_13()  # cur_vol , pri_vol , vol_change, prc_vol_growth, vol_change_ind, cur_trx, cur_tun , avg_trx_size
    temp3 = process_14(temp3) # sales_dist , sales_dist_bnch ,prc_vol_growth_bnch ,prc_shr_growth_bnch
    temp3 = process_15(temp3) # cur_shr, pri_shr, shr_change, prc_shr_growth, shr_change_ind
    temp3 = process_7(temp3) # prc_vol_growth_ind ,prc_shr_growth_ind
    temp3 = process_16(temp3) # For Payer Access - 
    temp3 = process_17(temp3) # For Payer Mix -

    # Consolidate -
    temp4 = dataframe_reorg_util()
    feed_dataset = get_feed(temp4)

    outfile = f's3://vortex-staging-a65ced90/BIT/output/ManagedCare/Monthly_ManagedCare_SalesPerformance_P{PN}_Feed.txt'

    feed_dataset.to_pandas().to_csv(outfile,sep='|', lineterminator='\r\n',index=False)

    print('Exported Feed : ',PN)

In [None]:
#97 Seconds for Data Prep 526 for loop

---