In [1]:
# import packages
import os
from pyspark import SparkContext, SparkConf
import numpy as np
# import seaborn as sns
# sns.set(rc={'figure.figsize':(15,5)},font_scale=1.2)
# color = ["#e74c3c", "#2ecc71"]
# sns.set_palette(color)
# sns.set_style("whitegrid")
import matplotlib.pyplot as plt

import pandas as pd
pd.set_option('display.max_columns', None) 
pd.set_option('display.max_rows', None)


print(os.environ['SPARK_HOME'])
from pyspark.sql import SparkSession, HiveContext, SQLContext
from pyspark.sql.functions import when, desc
from pyspark.sql.types import *
from pyspark.sql import Row, functions as F
from pyspark.sql.functions import to_date, trunc, unix_timestamp, date_format
from pyspark.sql.window import Window
from datetime import datetime
from pyspark.storagelevel import StorageLevel

In [2]:
ivr_call_disp_cd_dictionary = spark.read.csv('adl://prodrxperso.azuredatalakestore.net/rxperso/dev/extrnl/ivr_call_disp_cd_dictionary.csv', header = True)

rr_ivr_call_disp_cd_dictionary = ivr_call_disp_cd_dictionary\
                                    .select('ivr_call_disp_cd', 'ivr_file_typ_cd', 'ivr_call_disp_dsc','ivr_gnrl_disp_cd')\
                                    .filter(F.col('ivr_file_typ_cd') == 'RR')

In [3]:
  #rxoppty_script_final data prep
cols = ['rxc_pat_id', 'store_nbr', 'rx_nbr', 'rx_fill_nbr', 'run_dt', 'pgm_cd',
        'sub_pgm_cd', 'con_track_typ', 'dispnd_ndc', 'antcptd_refill_due_dt', 'fill_dt']

rxoppty_target = spark.table('rx_rtl_base.rxoppty_script_final') \
        .withColumn('sub_pgm_cd', \
                    F.when((F.col('sub_pgm_cd') == 'RR-IVR') | (F.col('sub_pgm_cd') == 'RR-CC'), 'RR') \
                    .when(F.col('sub_pgm_cd') == 'RR_ARDD', 'ARDD') \
                    .when(F.col('sub_pgm_cd') == 'RR_ARDD2', 'POST2_ARDD') \
                    .when(F.col('sub_pgm_cd') == 'RR_EXP', 'EXP') \
                    .otherwise(F.col('sub_pgm_cd'))) \
        .filter(F.col('sub_pgm_cd').isin(['RR', 'ARDD', 'POST2_ARDD']))\
        .withColumn('day', date_format('run_dt', 'EEEE')) \
        .filter(F.col('run_dt') >= datetime(2018, 6, 1, 0, 0)) \
        .filter(F.col('run_dt') <= datetime(2019, 1, 31, 0, 0)) \
        .filter(F.col('pgm_cd') == 'RR') \
        .filter(F.col('con_track_typ') == 0) \
        .select(*cols) \
        .distinct() 

rxoppty_target = rxoppty_target\
                  .withColumnRenamed('rxc_pat_id', 'rx_rxc_pat_id')\
                  .repartition(300)\
                  .persist(StorageLevel.MEMORY_ONLY_SER)

# ivr_disposition data prep
cols = ['rxc_pat_id', 'store_nbr', 'rx_nbr', 'rx_fill_nbr', 'ivr_src_file_crt_dt', 'ivr_file_typ_cd',
        'ivr_call_disp_cd', 'ivr_call_disp_dsc', 'ivr_gnrl_disp_cd', 'sub_pgm_cd', 'channel', 'disp_flg', 
        'outreach_typ_cd','mobile_number','ivr_refl_req_cd']

rx_disp = spark.table('aoscore_stg.ivr_disposition') \
        .withColumn('sub_pgm_cd', \
                    F.when((F.col('outreach_typ_cd') == 'Fill0RTS') |
                           (F.col('outreach_typ_cd') == 'RTS_RR'), 'Fill0RTS') \
                    .otherwise(F.col('outreach_typ_cd'))) \
        .withColumn('channel', F.when(F.col('ivr_call_disp_cd').like('%SMS%'), 'SMS') \
                    .when(F.col('ivr_call_disp_cd').like('%CON01%'), 'SMS')\
                    .when(F.col('ivr_call_disp_cd').isNull(), 'Not Outreached') \
                    .when(F.col('ivr_call_disp_cd').like('B%'), 'Bypassed').otherwise('IVR')) \
        .withColumn('disp_flg', F.when(F.col('ivr_call_disp_cd').substr(0, 1) == 'B', 'Bypassed') \
                    .when(F.col('ivr_call_disp_cd').isNull(), 'Not Sent') \
                    .otherwise('Outreached')) \
        .filter(F.col('ivr_src_file_crt_dt') >= datetime(2018, 6, 1, 0, 0)) \
        .filter(F.col('ivr_src_file_crt_dt') <= datetime(2019, 1, 31, 0, 0)) \
        .filter(F.col('ivr_file_typ_cd') == 'RR') \
        .withColumn('mobile_number', F.concat(F.lit('001'), F.col('phone_nbr')))\
        .filter(F.col('sub_pgm_cd').isin(['RR','ARDD','POST2_ARDD'])) \
        .join(rr_ivr_call_disp_cd_dictionary, ['ivr_file_typ_cd','ivr_call_disp_cd'], 'left') \
        .drop(rr_ivr_call_disp_cd_dictionary.ivr_file_typ_cd) \
        .drop(rr_ivr_call_disp_cd_dictionary.ivr_call_disp_cd) \
        .select(*cols) \
        .distinct() \
        .filter(F.col('disp_flg') == 'Outreached')



In [4]:
rx_disp = spark.read.parquet('/opt/prod/common/rx_personal/users/c067715/rx_disp')

In [5]:
rx_disp.show(2)

In [6]:
# window functions 
w = Window.partitionBy('store_nbr', 'rx_nbr', 'rx_fill_nbr').orderBy(F.col('run_dt'))
multi_rx_w = Window.partitionBy('disp_rxc_pat_id', 'run_dt').orderBy()

# mark NA as NULL to show in ptnt_response_seq
rx_disp = rx_disp.na.fill('NULL')
rx_disp = rx_disp.withColumnRenamed('ivr_src_file_crt_dt', 'run_dt')\
                    .withColumnRenamed('rxc_pat_id','disp_rxc_pat_id')\
                    .withColumn('ivr_refl_req_cd', F.when(F.col('ivr_refl_req_cd').isNull(),'NULL').otherwise(F.col('ivr_refl_req_cd')))\
                    .withColumn('ptnt_response_seq', 
                                F.concat_ws(", ", F.collect_list(F.col('ivr_refl_req_cd')).over(w))) \
                    .withColumn('attempt_seq', 
                                F.concat_ws(", ", F.collect_list(F.col('sub_pgm_cd')).over(w))) \
                    .withColumn('channel_seq', 
                                F.concat_ws(", ", F.collect_list(F.col('channel')).over(w))) \
                    .withColumn('rx_unique', 
                                F.concat('store_nbr', 'rx_nbr', 'rx_fill_nbr')) \
                    .withColumn('num_rx', F.size(F.collect_set('rx_unique').over(multi_rx_w)))\
                    .withColumn('multi_rx_flg', F.when(F.col('num_rx')>1, 1).otherwise(0))


# rr_target joining oppty, disp and drug
df_rx_disp_join_rxoppty = rx_disp \
                            .join(rxoppty_target, ['store_nbr', 'rx_nbr', 'rx_fill_nbr',
                                                   'run_dt', 'sub_pgm_cd'], 'inner') \
                            .withColumnRenamed('dispnd_ndc', 'ndc') \
                            .withColumn('rxc_pat_id', F.coalesce(F.col('disp_rxc_pat_id'), F.col('rx_rxc_pat_id'))) \
                            .drop('disp_rxc_pat_id', 'rx_rxc_pat_id')

drug_df = spark.read.parquet('adl://prodrxperso.azuredatalakestore.net/rxperso/dev/dl/dl_drug')
drug_df = drug_df \
            .select('pas_drug_gcn', 'ndc_final', 'gpi4_drug_class') \
            .withColumnRenamed('ndc_final', 'ndc') \
            .withColumnRenamed('pas_drug_gcn', 'gcn')

# join ivr_rxoppty with drug table for rr_target
ivr_rxoppty_drug = df_rx_disp_join_rxoppty.join(drug_df, 'ndc', 'inner')

w = Window.partitionBy('store_nbr', 'rx_nbr', 'rx_fill_nbr').orderBy(F.col('run_dt'))
w_o = Window.partitionBy('store_nbr', 'rx_nbr', 'rx_fill_nbr', 'disp_flg').orderBy(F.col('run_dt'))

ivr_rxoppty_drug_unique = ivr_rxoppty_drug \
        .select('disp_flg', 'store_nbr', 'rx_nbr', 'rx_fill_nbr', 'run_dt', 'sub_pgm_cd', 'ivr_call_disp_cd') \
        .withColumn('rr_max_run_dt_outreached', F.when(F.col('disp_flg') == 'Outreached', F.max('run_dt').over(w_o))) \
        .withColumn('rr_first_run_dt_outreached', F.when(F.col('disp_flg') == 'Outreached', F.min('run_dt').over(w_o))) \
        .withColumn('rr_first_sub_pgm_cd_outreached', F.when(F.col('disp_flg') == 'Outreached', F.first('sub_pgm_cd').over(w_o))) \
        .withColumn('rr_attempt_nbr_outreached', F.when(F.col('disp_flg') == 'Outreached', 
                                                        (F.sum(F.lit(1)).over(w_o)).cast("decimal(1,0)"))) \
        .withColumn('rr_max_run_dt', F.max('run_dt').over(w)) \
        .withColumn('rr_first_run_dt', (F.min('run_dt').over(w))) \
        .withColumn('rr_attempt_nbr', (F.sum(F.lit(1)).over(w)).cast("decimal(1,0)"))

ivr_rxoppty_drug_v1 = ivr_rxoppty_drug \
                                .join(ivr_rxoppty_drug_unique,
                                      ['store_nbr', 'rx_nbr', 'rx_fill_nbr', 'run_dt',
                                       'sub_pgm_cd', 'ivr_call_disp_cd', 'disp_flg'], 'left') 

ivr_rxoppty_drug_grp_max_atmpt_nbr = ivr_rxoppty_drug_v1 \
                                                .groupBy('store_nbr', 'rx_nbr', 'rx_fill_nbr') \
                                                .agg(F.max('rr_attempt_nbr').alias('rr_max_attempt_nbr'),
                                                     F.max('run_dt').alias('rr_max_run_dt'))\
                                                .repartition(300)\
                                                .persist(StorageLevel.MEMORY_ONLY_SER)

ivr_rxoppty_drug_v1\
    .repartition(400)\
    .write.parquet('/opt/prod/common/rx_personal/users/c083365/ivr_rxoppty_drug_v1', mode = 'overwrite')

ivr_rxoppty_drug_v1 = spark.read.parquet('/opt/prod/common/rx_personal/users/c083365/ivr_rxoppty_drug_v1')
ivr_rxoppty_drug_v2 = ivr_rxoppty_drug_v1 \
                                    .join(ivr_rxoppty_drug_grp_max_atmpt_nbr,
                                          ['store_nbr', 'rx_nbr', 'rx_fill_nbr'], 'left') \
                                    .drop(ivr_rxoppty_drug_v1.rr_attempt_nbr) \
                                    .drop(ivr_rxoppty_drug_v1.rr_max_run_dt)

wo = Window.partitionBy('store_nbr', 'rx_nbr', 'rx_fill_nbr').orderBy(F.col('rr_max_run_dt_outreached').desc())
w = Window.partitionBy('store_nbr', 'rx_nbr', 'rx_fill_nbr').orderBy(F.col('rr_max_run_dt').desc())

ivr_rxoppty_drug_v1_latest = ivr_rxoppty_drug_v2 \
                                    .withColumn('rowNum',  (F.when((F.col('disp_flg') == 'Outreached'), 
                                                                   F.row_number().over(wo)))) \
                                    .filter((F.col('rowNum') == 1))

w1 = Window.partitionBy('store_nbr', 'rx_nbr','rx_fill_nbr').orderBy(F.col('disp_flg').desc())
ivr_rxoppty_drug_v1_latest = ivr_rxoppty_drug_v1_latest.withColumn('rowNum2',(F.row_number().over(w1)))\
                                                            .filter((F.col('rowNum2') == 1))


def prefix_columns(prefix, df):
    l = df.columns
    cols = [prefix + c for c in l]
    return df.toDF(*cols)

ivr_rxoppty_drug_v1_latest = prefix_columns('o_', ivr_rxoppty_drug_v1_latest)

ivr_rxoppty_drug_v1_latest\
    .repartition(400)\
    .write.parquet('/opt/prod/common/rx_personal/users/c083365/ivr_rxoppty_drug_v1_latest', mode = 'overwrite')

curr_join_drug = spark.read.parquet('adl://prodrxperso.azuredatalakestore.net/rxperso/dev/dl/dl_rxfill_base') \
                        .filter((F.col('fill_dt') >= datetime(2018, 6, 1, 0, 0))) \
                        .filter((F.col('pkup_dt') >= datetime(2018, 6, 1, 0, 0)) | (F.col('pkup_dt').isNull())) \
                        .filter((F.col('pkup_dt') >= F.col('fill_dt')) | (F.col('pkup_dt').isNull()))

store_rts = spark.read.csv('adl://prodrxperso.azuredatalakestore.net/rxperso/dev/extrnl/store_configuration.csv', header = True)\
                     .select('store_nbr', 'rts_days_in_bin_nbr')

join_con_rts = [ivr_rxoppty_drug_v1_latest.o_store_nbr == store_rts.store_nbr]
ivr_oppty_final = ivr_rxoppty_drug_v1_latest.join(store_rts, join_con_rts, 'left') \
                                     .drop('store_nbr')
                                
# joining prescription/oppty/disp table
join_cond = [ivr_oppty_final.o_rxc_pat_id == curr_join_drug.rxc_pat_id,
             ivr_oppty_final.o_gcn == curr_join_drug.pas_drug_gcn,
             ivr_oppty_final.o_gpi4_drug_class == curr_join_drug.gpi4_drug_class]

oppty_to_script = ivr_oppty_final \
            .join(curr_join_drug, join_cond, 'inner') \
            .withColumn('RR_target_filter', (F.col('o_rr_max_run_dt_outreached') <= F.col('fill_dt'))) \
            .filter((F.col('RR_target_filter') == 1)) \
            .filter(F.col('fill_dt') <= F.date_add(F.to_date(F.col('o_antcptd_refill_due_dt')), 24)) \
            .withColumn('rr_time_to_fill_after_reached',   F.datediff(F.to_date(F.col('fill_dt')),  
                                                                      F.to_date(F.col('o_rr_max_run_dt_outreached'))))\
            .withColumn('rr_time_to_pickup_after_fill', F.datediff(F.to_date(F.col('pkup_dt')), 
                                                                   F.to_date(F.col('fill_dt')))) \
            .withColumn('rr_target', F.when(((F.col('rr_time_to_pickup_after_fill') <= F.col('rts_days_in_bin_nbr')) &
                                           (F.col('rr_time_to_pickup_after_fill').isNotNull())), 1).otherwise(0)) \
            .withColumn('rr_time_to_pickup_after_reached',F.datediff(F.to_date(F.col('pkup_dt')), 
                                                                     F.to_date(F.col('o_rr_max_run_dt_outreached')))) \
            .withColumn('rr_time_to_pickup_rel_ardd',F.datediff(F.to_date(F.col('pkup_dt')), 
                                                                F.to_date(F.col('o_antcptd_refill_due_dt')))) \
            .withColumn('rr_time_to_fill_rel_ardd', F.datediff(F.to_date(F.col('fill_dt')), 
                                                               F.to_date(F.col('o_antcptd_refill_due_dt'))))


w = Window.partitionBy('o_store_nbr', 'o_rx_nbr', 'o_rx_fill_nbr')\
            .orderBy(F.col('fill_dt'), F.col('pkup_dt'))

df_1 = oppty_to_script \
                .withColumn('rowNum_dups', F.row_number().over(w)) \
                .filter(F.col('rowNum_dups') == 1)\
                .drop('rowNum_dups')

df_2 = oppty_to_script \
                .groupBy('o_store_nbr', 'o_rx_nbr', 'o_rx_fill_nbr') \
                .agg(F.sum(F.lit(1)).alias('n_records_for_target_idenf'))


oppty_to_script_final_v0 = df_1.join(df_2, ['o_store_nbr', 'o_rx_nbr', 'o_rx_fill_nbr'], 'inner')\
                                   .select('o_store_nbr', 'o_rx_nbr', 'o_rx_fill_nbr',
                                           'n_records_for_target_idenf')
oppty_to_script_final = df_1.join(oppty_to_script_final_v0, ['o_store_nbr', 'o_rx_nbr', 'o_rx_fill_nbr'], 
                                  'inner')  
                                
# join back the target script to the ivr opps final table
oppty_to_script_unique_rx = oppty_to_script_final.select('o_store_nbr','o_rx_nbr','o_rx_fill_nbr','ndc','rxc_pat_id',
                                                         'store_nbr','rx_nbr','rx_fill_nbr','pkup_dt','fill_dt',
                                                         'gpi4_drug_class','rr_time_to_fill_after_reached',
                                                         'rr_time_to_pickup_after_fill',
                                                         'rr_time_to_pickup_after_reached',
                                                         'rr_time_to_pickup_rel_ardd','rr_time_to_fill_rel_ardd',
                                                         'rr_target','n_records_for_target_idenf')

df_final = ivr_rxoppty_drug_v1_latest.join(oppty_to_script_unique_rx, ['o_store_nbr', 'o_rx_nbr', 'o_rx_fill_nbr'], 'left')
df_final = df_final \
                .withColumn('rr_target', F.when(F.col('rr_target').isNull(), 0).otherwise(F.col('rr_target'))) \
                .repartition(400)    

df_final.repartition(300).write.parquet('/opt/prod/common/rx_personal/users/c083365/df_final', mode = 'overwrite')

In [7]:
print rx_disp.count()

In [8]:
df_final.printSchema()

In [9]:
df_final = df_final\
            .withColumn('less_than_3_attempt_flg', F.when(F.col('o_rr_attempt_nbr_outreached')<=3,1).otherwise(0))\

In [10]:
df_final = df_final\
            .withColumn('less_than_3_attempt_flg', F.when(F.col('o_rr_attempt_nbr_outreached')<=3,1).otherwise(0))\
            .withColumn('valid_fill_flg', F.when(F.col('rr_time_to_fill_rel_ardd')<= 24, 1).otherwise(0))\
            .withColumn('first_response', F.when((F.col('o_rr_attempt_nbr_outreached')>=2) & 
                                                (F.col('o_ptnt_response_seq').like('N,%')), 'N')\
                                           .when((F.col('o_rr_attempt_nbr_outreached')>=2) & 
                                                (F.col('o_ptnt_response_seq').like('NULL,%')), 'NULL')\
                                           .when((F.col('o_rr_attempt_nbr_outreached')>=2) & 
                                                (F.col('o_ptnt_response_seq').like('Y,%')), 'Y')\
                                           .when((F.col('o_rr_attempt_nbr_outreached')>=2) & 
                                                (F.col('o_ptnt_response_seq').like('S,%')), 'S')\
                                           .otherwise(F.col('o_ptnt_response_seq')))\
            .withColumn('last_response', F.when((F.col('o_rr_attempt_nbr_outreached')>=2) & 
                                                (F.col('o_ptnt_response_seq').like('%, N')), 'N')\
                                           .when((F.col('o_rr_attempt_nbr_outreached')>=2) & 
                                                 (F.col('o_ptnt_response_seq').like('%, NULL')), 'NULL')\
                                           .when((F.col('o_rr_attempt_nbr_outreached')>=2) & 
                                                 (F.col('o_ptnt_response_seq').like('%, Y')), 'Y')\
                                           .when((F.col('o_rr_attempt_nbr_outreached')>=2) & 
                                                 (F.col('o_ptnt_response_seq').like('%, S')), 'S')\
                                           .otherwise(F.col('o_ptnt_response_seq')))\
            .withColumn('first_pgm', F.when((F.col('o_rr_attempt_nbr_outreached')>=2) & 
                                                (F.col('o_attempt_seq').like('ARDD,%')), 'ARDD')\
                                           .when((F.col('o_rr_attempt_nbr_outreached')>=2) & 
                                                (F.col('o_attempt_seq').like('RR,%')), 'RR')\
                                           .when((F.col('o_rr_attempt_nbr_outreached')>=2) & 
                                                (F.col('o_attempt_seq').like('POST2_ARDD,%')), 'POST2_ARDD')\
                                           .otherwise(F.col('o_attempt_seq')))\
            .withColumn('last_pgm', F.when((F.col('o_rr_attempt_nbr_outreached')>=2) & 
                                                (F.col('o_attempt_seq').like('%, ARDD')), 'ARDD')\
                                           .when((F.col('o_rr_attempt_nbr_outreached')>=2) & 
                                                (F.col('o_attempt_seq').like('%, RR')), 'RR')\
                                           .when((F.col('o_rr_attempt_nbr_outreached')>=2) & 
                                                (F.col('o_attempt_seq').like('%, POST2_ARDD')), 'POST2_ARDD')\
                                           .otherwise(F.col('o_attempt_seq')))\
            .withColumn('first_channel', F.when((F.col('o_rr_attempt_nbr_outreached')>=2) & 
                                                (F.col('o_channel_seq').like('IVR,%')), 'IVR')\
                                           .when((F.col('o_rr_attempt_nbr_outreached')>=2) & 
                                                (F.col('o_channel_seq').like('SMS,%')), 'SMS')\
                                           .otherwise(F.col('o_channel_seq')))\
            .withColumn('last_channel', F.when((F.col('o_rr_attempt_nbr_outreached')>=2) & 
                                                (F.col('o_channel_seq').like('%, IVR')), 'IVR')\
                                           .when((F.col('o_rr_attempt_nbr_outreached')>=2) & 
                                                 (F.col('o_channel_seq').like('%, SMS')), 'SMS')\
                                           .otherwise(F.col('o_channel_seq')))

df_final_clean = df_final.filter(F.col('less_than_3_attempt_flg') == 1)\
                         .filter(F.col('o_attempt_seq').isin(['RR','ARDD','POST2_ARDD',
                                                             'RR, ARDD', 'RR, POST2_ARDD', 'ARDD, POST2_ARDD',
                                                             'RR, ARDD, POST2_ARDD']))\
                         .withColumn('switch', F.when(F.col('first_response') == F.col('last_response'), 0).otherwise(1))\
                         .withColumn('switch_channel', F.when(F.col('first_channel') == F.col('last_channel'), 0).otherwise(1))

In [11]:
%python
import scipy
import pkg_resources
pkg_resources.get_distribution('scipy').version
dbutils.library.list()




In [12]:
dbutils.library.installPyPI('scipy','1.2.0')
dbutils.library.restartPython()

In [13]:
dbutils.library.installPyPI('quinn')
dbutils.library.restartPython()

In [14]:
%python
import scipy
import pkg_resources
pkg_resources.get_distribution('quinn').version
dbutils.library.list()

In [15]:
from quinn.extensions import *

In [16]:
print df_final.count(), 
print df_final.select('o_store_nbr', 'o_rx_nbr', 'o_rx_fill_nbr').distinct().count(), 
print df_final.select('store_nbr', 'rx_nbr', 'rx_fill_nbr').distinct().count()

In [17]:
print df_final_clean.count()

In [18]:
print rxoppty_target.count()

In [19]:
rxoppty_target.printSchema()

In [20]:
rxoppty_target.groupby('sub_pgm_cd').count().show()

In [21]:
print rx_disp.count()

In [22]:
rx_disp.groupby('disp_flg').count().show()

In [23]:
rx_disp_a = rx_disp.groupby('disp_flg').count()

In [24]:
rx_disp.filter(F.col('disp_flg')=='Outreached')\
       .groupby('sub_pgm_cd').count()\
       .sort(desc('count')).show()

In [25]:
print rx_disp.select('store_nbr', 'rx_nbr', 'rx_fill_nbr').distinct().count()

In [26]:
print df_final.select('store_nbr', 'rx_nbr', 'rx_fill_nbr').distinct().count()

In [27]:
print df_final.count()

In [28]:
df_final.printSchema()

In [29]:
df_final.filter(F.col('o_rr_attempt_nbr_outreached')==3).select('o_rr_attempt_nbr_outreached','o_ptnt_response_seq').show(2)

In [30]:
df_final.filter(F.col('o_rr_attempt_nbr_outreached')==3).select('o_rr_attempt_nbr_outreached','o_ptnt_response_seq').show(10)

In [31]:
rx_disp.printSchema()

In [32]:
rx_disp.groupby('ptnt_response_seq').count()\
       .sort(desc('count')).show()

In [33]:
rx_disp.filter((F.col('ptnt_response_seq') == 'Y, Y')).select('store_nbr', 'rx_nbr', 'rx_fill_nbr','run_dt').show()


In [34]:
rx_disp.filter(F.col('store_nbr') == 8) \
        .filter(F.col('rx_nbr') == '2154191').show(1000)

In [35]:
print rx_disp.filter(F.col('store_nbr') == 8)\
       .filter(F.col('rx_nbr') == '2154191')\
       .filter(F.col('rx_fill_nbr') == 0 ).select('store_nbr','rx_nbr','rx_fill_nbr','run_dt','ivr_refl_req_cd','ptnt_response_seq').show()

In [36]:
print rx_disp.filter(F.col('store_nbr') == 8)\
       .filter(F.col('rx_nbr') == '2154191')\
       .filter(F.col('rx_fill_nbr') == 1 ).select('store_nbr','rx_nbr','rx_fill_nbr','run_dt','ivr_refl_req_cd','ptnt_response_seq').show()

In [37]:
rx_disp.filter((F.col('ptnt_response_seq') == 'N, N, N')).select('store_nbr', 'rx_nbr', 'rx_fill_nbr','run_dt').show(100)

In [38]:
print rx_disp.filter(F.col('store_nbr') == 3)\
       .filter(F.col('rx_nbr') == '722470')\
       .filter(F.col('rx_fill_nbr') == 0 ).select('store_nbr','rx_nbr','rx_fill_nbr','run_dt','ivr_refl_req_cd','ptnt_response_seq').show()

In [39]:
print rx_disp.filter(F.col('store_nbr') == 47)\
       .filter(F.col('rx_nbr') == '324996')\
       .filter(F.col('rx_fill_nbr') == 4 ).select('store_nbr','rx_nbr','rx_fill_nbr','run_dt','ivr_refl_req_cd','ptnt_response_seq').show()

In [40]:
df_final.printSchema()

In [41]:


print df_final.filter(F.col('o_store_nbr') == 47)\
       .filter(F.col('o_rx_nbr') == '324996')\
       .filter(F.col('o_rx_fill_nbr') == 4 ).select('o_store_nbr','o_rx_nbr','o_rx_fill_nbr','o_rr_max_run_dt','o_ivr_refl_req_cd','o_ptnt_response_seq').show()

In [42]:
print df_final.count()

In [43]:
df_final\
    .groupby('o_ptnt_response_seq')\
    .agg((F.count('*')).alias('count'), ((F.count('*')/df_final.count()).cast("decimal(18,2)")).alias('percentage'))\
    .sort(desc('o_ptnt_response_seq')).show(1000)

In [44]:
df_final\
    .groupby('o_ptnt_response_seq').count()\
    .sort(desc('o_ptnt_response_seq')).show(1000)

In [45]:
df_final\
    .filter(F.col('less_than_3_attempt_flg') == 1)\
    .filter(F.col('o_rr_attempt_nbr_outreached') == 1)\
    .groupby('o_ptnt_response_seq','less_than_3_attempt_flg', 'o_rr_attempt_nbr_outreached')\
    .agg(F.count('*').alias('count'), 
         (F.when(F.col('o_rr_attempt_nbr_outreached') == 1, (F.count('*')/48937081).cast("decimal(18,2)"))\
           .when(F.col('o_rr_attempt_nbr_outreached') == 2, (F.count('*')/24571819).cast("decimal(18,2)"))\
           .when(F.col('o_rr_attempt_nbr_outreached') == 3, (F.count('*')/9893367).cast("decimal(18,2)"))).alias('percentage'), 
         (F.avg('valid_fill_flg').cast("decimal(18,2)")).alias('valid_fill_rate'),  
         (F.avg('rr_target').cast("decimal(18,2)")).alias('pickup_rate'))\
    .sort('o_ptnt_response_seq').show(1000)

In [46]:
df_final\
    .filter(F.col('o_ptnt_response_seq').like("N%"))\
    .groupby('o_ptnt_response_seq')\
    .agg((F.count('*')).alias('count'), ((F.count('*')/df_final.count()).cast("decimal(18,2)")).alias('percentage'))\
    .sort(desc('o_ptnt_response_seq')).show(1000)

In [47]:
df_final\
    .filter(F.col('o_ptnt_response_seq').like("N%"))\
    .groupby('o_ptnt_response_seq').count()\
    .sort(desc('o_ptnt_response_seq')).show(1000)

In [48]:
df_final\
    .filter(F.col('less_than_3_attempt_flg') == 1)\
    .filter(F.col('o_rr_attempt_nbr_outreached') == 3)\
    .groupby('first_response', 'last_response','o_ptnt_response_seq','o_rr_attempt_nbr_outreached')\
    .agg(F.count('*').alias('count'), 
         (F.when(F.col('o_rr_attempt_nbr_outreached') == 1, (F.count('*')/48937081).cast("decimal(18,2)"))\
           .when(F.col('o_rr_attempt_nbr_outreached') == 2, (F.count('*')/24571819).cast("decimal(18,2)"))\
           .when(F.col('o_rr_attempt_nbr_outreached') == 3, (F.count('*')/9893367).cast("decimal(18,2)"))).alias('percentage'), 
         (F.avg('valid_fill_flg').cast("decimal(18,2)")).alias('valid_fill_rate'),  
         (F.avg('rr_target').cast("decimal(18,2)")).alias('pickup_rate'))\
    .sort('first_response','last_response')\
    .show(1000)

In [49]:
print df_final_clean.count(), 
print df_final_clean.select('o_store_nbr', 'o_rx_nbr', 'o_rx_fill_nbr').distinct().count(), 
print df_final_clean.select('store_nbr', 'rx_nbr', 'rx_fill_nbr').distinct().count()

In [50]:
df_final\
    .groupby('less_than_3_attempt_flg')\
    .agg((F.count('*')).alias('count'), ((F.count('*')/df_final.count()).cast("decimal(18,2)")).alias('percentage'), 
         (F.avg('valid_fill_flg').cast("decimal(18,2)")).alias('valid_fill_rate'),  
         (F.avg('rr_target').cast("decimal(18,2)")).alias('pickup_rate'))\
    .sort(desc('less_than_3_attempt_flg')).show()

In [51]:
df_final.agg(F.min('o_run_dt'),F.max('o_run_dt')).show()

In [52]:
df_final_clean.groupby('o_rr_attempt_nbr_outreached')\
        .agg(F.count('*').alias('count'),
             (F.count('*')/df_final_clean.count()).cast("decimal(18,2)").alias('percentage'),
             (F.avg('valid_fill_flg').cast("decimal(18,2)")).alias('valid_fill_rate'),  
             (F.avg('rr_target').cast("decimal(18,2)")).alias('pickup_rate'))\
        .sort('o_rr_attempt_nbr_outreached')\
        .show()

In [53]:
df_final\
    .filter(F.col('less_than_3_attempt_flg') == 1)\
    .filter(F.col('o_rr_attempt_nbr_outreached') == 1)\
    .groupby('o_ptnt_response_seq','less_than_3_attempt_flg', 'o_rr_attempt_nbr_outreached')\
    .agg(F.count('*').alias('count'), 
         (F.when(F.col('o_rr_attempt_nbr_outreached') == 1, (F.count('*')/48937081).cast("decimal(18,2)"))\
           .when(F.col('o_rr_attempt_nbr_outreached') == 2, (F.count('*')/24571819).cast("decimal(18,2)"))\
           .when(F.col('o_rr_attempt_nbr_outreached') == 3, (F.count('*')/9893367).cast("decimal(18,2)"))).alias('percentage'), 
         (F.avg('valid_fill_flg').cast("decimal(18,2)")).alias('valid_fill_rate'),  
         (F.avg('rr_target').cast("decimal(18,2)")).alias('pickup_rate'))\
    .sort('o_ptnt_response_seq').show(1000)

In [54]:
dbutils.fs.ls("dbfs:/prod/rxperso/pyperso-1.3-py2.7.egg")