### Importing Required Packages

In [0]:
from pyspark.sql.functions import *
import pyspark.sql.functions as f
import pyspark.sql.types as t
import warnings
warnings.filterwarnings('ignore')
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
from sklearn.ensemble import RandomForestClassifier
import os



In [0]:
from sklearn.preprocessing import LabelEncoder, OneHotEncoder,StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import KFold,cross_val_score
import sklearn.metrics as sk_metrics
from sklearn.metrics import roc_auc_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import f1_score

In [0]:
#Setting to prevent OoM issue while handling large chunk of data
spark.conf.set("spark.sql.execution.arrow.enabled", "false")
spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","false")

### Input Parameters

In [0]:
dbutils.widgets.removeAll()

In [0]:
dbutils.widgets.text("Country Name", "Germany")
dbutils.widgets.text("Product Name", "TRELEGY")
dbutils.widgets.text("Target DB Name", "opportunity_engine")
dbutils.widgets.text("Target Table Name Prefix", "opportunityengine")
dbutils.widgets.text("Start Yearmonth", "202103")
dbutils.widgets.text("End Yearmonth", "202209")
# dbutils.widgets.text("First Yearmonth", "202210")
# dbutils.widgets.text("Second Yearmonth", "202211")
# dbutils.widgets.text("Third Yearmonth", "202212")

In [0]:
country_name = dbutils.widgets.get("Country Name")
product_name = dbutils.widgets.get("Product Name")
tgt_dbname = dbutils.widgets.get("Target DB Name")
tgt_tblname_prefix = dbutils.widgets.get("Target Table Name Prefix")
start_yearmonth = dbutils.widgets.get("Start Yearmonth")
end_yearmonth = dbutils.widgets.get("End Yearmonth")
# first_yearmonth = dbutils.widgets.get("First Yearmonth")
# second_yearmonth = dbutils.widgets.get("Second Yearmonth")
# third_yearmonth = dbutils.widgets.get("Third Yearmonth")

In [0]:
Train1 = spark.sql(f"select * from {tgt_dbname}.{tgt_tblname_prefix.lower()}_{country_name.lower()}_{product_name.lower()}_v3 where yearmonth <= {end_yearmonth}")
Test = spark.sql(f"select * from {tgt_dbname}.{tgt_tblname_prefix.lower()}_{country_name.lower()}_{product_name.lower()}_v3 where yearmonth > {end_yearmonth}")

In [0]:
#Get the split of the Switch to F2F in Train and Test test
print(Train1.filter(f.col("MONTHS_SINCE_DIGITAL_to_F2F_SWITCH").isNotNull()).count())
print(Test.filter(f.col("MONTHS_SINCE_DIGITAL_to_F2F_SWITCH").isNotNull()).count())
print(Train1.filter(f.col("MONTHS_SINCE_NO_ACTIVITY_to_F2F_SWITCH").isNotNull()).count())
print(Test.filter(f.col("MONTHS_SINCE_NO_ACTIVITY_to_F2F_SWITCH").isNotNull()).count())

9780
132
13783
337


In [0]:
Train1 = Train1.withColumn(
    'Switch_toF2F',
    f.when(((f.col("MONTHS_SINCE_DIGITAL_to_F2F_SWITCH").isNotNull()) | (f.col("MONTHS_SINCE_NO_ACTIVITY_to_F2F_SWITCH").isNotNull())), 1)\
    .otherwise(0)
)

# Train = Train1.filter((f.col("Switch_toF2F") == 1) | ((f.col("YEARMONTH") == end_yearmonth) & (f.col("INITIATION_CANDIDATE") == 1)) )
Train = Train1.filter((f.col("Switch_toF2F") == 1) | ((f.col("YEARMONTH") == end_yearmonth)) )

Test = Test.withColumn(
    'Switch_toF2F',
    f.when((f.col("MONTHS_SINCE_DIGITAL_to_F2F_SWITCH").isNotNull()) | (f.col("MONTHS_SINCE_NO_ACTIVITY_to_F2F_SWITCH").isNotNull()), 1)\
    .otherwise(0)
)


In [0]:
print(Train.count())
print(Test.count())

87958
194316


#### Data Preprocessing

In [0]:
def convert_toPandas(sparkdf):
  sparkdf_pd= sparkdf.toPandas()
  sparkdf_pd = sparkdf_pd.dropna(subset=['Switch_toF2F'])
  sparkdf_pd.replace(r'\s+', np.nan, regex=True)
  sparkdf_pd.fillna(value = np.nan,inplace=True) 
  sparkdf.unpersist()
  return sparkdf_pd
  

In [0]:
Train_df = convert_toPandas(Train)
Test_df = convert_toPandas(Test)

In [0]:
def data_conversion_split(df):
  X=df[[ ]]
  Y=df[['Switch_toF2F' ]]
  pd.set_option('mode.chained_assignment', None)  
  X[['CORE_GSK_DO_NOT_CONTACT_STATUS_CHANGED', 'CORE_GSK_EMAIL_CONSENT_DECLINED','RX_GSK_DO_NOT_MAIL_IND', 'RX_GSK_DO_NOT_PHONE_IND', 'CORE_GSK_PRIVACY_NOTICES_SERVED', 'RX_ACCESS_INFORMATION','GENDER', 'RX_ACADEMIC_TITLE', 'RX_HCP_ACCOUNT_TYPE', 'RX_MEDICAL_INTEREST', 'SALUTATION', 'SPECIALTY_1', 'SPECIALTY_2','RX_PRESCRIBER_IND', 'RX_GOVERNMENT_OFFICIAL_IND', 'RX_CLINICAL_INVESTIGATOR_IND', 'RX_DISPENSING_HCP_IND', 'RX_EE_IND', 'SPEAKER_IND', 'PRODUCT_ID','RX_ATTITUDINAL_SEGMENTATION', 'HAS_ACTIVE_PLAN', 'MASS_EMAIL_CONSENT', 'CRM_EMAIL_CONSENT' ,'RX_SEGMENT','RX_CUSTOMER_JOURNEY','CURRENCY_CODE']]=X[['CORE_GSK_DO_NOT_CONTACT_STATUS_CHANGED', 'CORE_GSK_EMAIL_CONSENT_DECLINED','RX_GSK_DO_NOT_MAIL_IND', 'RX_GSK_DO_NOT_PHONE_IND', 'CORE_GSK_PRIVACY_NOTICES_SERVED', 'RX_ACCESS_INFORMATION','GENDER', 'RX_ACADEMIC_TITLE', 'RX_HCP_ACCOUNT_TYPE', 'RX_MEDICAL_INTEREST', 'SALUTATION', 'SPECIALTY_1', 'SPECIALTY_2','RX_PRESCRIBER_IND', 'RX_GOVERNMENT_OFFICIAL_IND', 'RX_CLINICAL_INVESTIGATOR_IND', 'RX_DISPENSING_HCP_IND', 'RX_EE_IND', 'SPEAKER_IND', 'PRODUCT_ID','RX_ATTITUDINAL_SEGMENTATION', 'HAS_ACTIVE_PLAN', 'MASS_EMAIL_CONSENT', 'CRM_EMAIL_CONSENT' ,'RX_SEGMENT','RX_CUSTOMER_JOURNEY','CURRENCY_CODE']].astype(str)
  return X,Y

In [0]:
X_Train,Y_Train = data_conversion_split(Train_df)
X_Test,Y_Test=data_conversion_split(Test_df)

In [0]:
#Get the shape of Train and Test set
print(X_Train.shape)
print(X_Test.shape)

(87958, 56)
(194316, 56)


#### Define the preprocessing pipeline

In [0]:
def preprocessing_pipeline(numeric_features,categorical_features):
  numeric_transformer = Pipeline(
  steps=[("imputer_num", SimpleImputer(strategy='constant',fill_value=0)), 
         ("scaler", StandardScaler())]
                                )

  categorical_transformer = Pipeline(
  steps=[
    ("imputer_cat", SimpleImputer(strategy='constant',fill_value='missing')),  
    ("encoder", OneHotEncoder(handle_unknown="ignore"))]
                                    )
  preprocessor = ColumnTransformer(
  transformers=[
      ("cat", categorical_transformer, categorical_features),
      ("num", numeric_transformer, numeric_features)
      
                ],remainder='passthrough'
                                  )
  return preprocessor


In [0]:
numeric_features = [ 'SHARE_ELEBRATO_ELLIPTA', 'SHARE_TRELEGY','SHARE_TRIMBOW','SHARE_RELVAR', 'SHARE_SITT', 'SHARE_LABA/LAMA', 'SHARE_ICS/LABA','SHARE_ONCE_DAILY', 'SHARE_TWICE_DAILY', 'SHARE_MDI','SHARE_DPI','VOL_ELEBRATO_ELLIPTA','VOL_TRELEGY','VOL_SITT','VOL_TRIMBOW','VOL_LABA/LAMA','VOL_TRIMBOW_NEXTHALER_+_PI','VOL_TRIXEO','AVG_SALES','MONTHS_SINCE_DIGITAL_to_F2F_SWITCH','MONTHS_SINCE_NO_ACTIVITY_to_F2F_SWITCH','MONTHS_SINCE_F2F_to_DIGITAL_SWITCH','TOTAL_F2F_CALL','TOTAL_EDETAIL_CALLS_IPAD','TOTAL_EDETAIL_CALLS','AVG_F2F_CALLS_OUT_OF_TOTAL_CALLS','AVG_SAMPLE_DROPS_PER_F2F_CALL','AVG_F2F_CALLS_WITH_EDETAIL','AVG_SLIDE_VIEWS_EDETAIL_CALLS']

categorical_features=['CORE_GSK_DO_NOT_CONTACT_STATUS_CHANGED', 'CORE_GSK_EMAIL_CONSENT_DECLINED','RX_GSK_DO_NOT_MAIL_IND', 'RX_GSK_DO_NOT_PHONE_IND', 'CORE_GSK_PRIVACY_NOTICES_SERVED', 'RX_ACCESS_INFORMATION','GENDER', 'RX_ACADEMIC_TITLE', 'RX_HCP_ACCOUNT_TYPE', 'RX_MEDICAL_INTEREST', 'SALUTATION','SPECIALTY_1','SPECIALTY_2','RX_PRESCRIBER_IND', 'RX_GOVERNMENT_OFFICIAL_IND', 'RX_CLINICAL_INVESTIGATOR_IND', 'RX_DISPENSING_HCP_IND', 'RX_EE_IND', 'SPEAKER_IND', 'PRODUCT_ID','RX_ATTITUDINAL_SEGMENTATION', 'HAS_ACTIVE_PLAN', 'MASS_EMAIL_CONSENT', 'CRM_EMAIL_CONSENT' ,'RX_SEGMENT','RX_CUSTOMER_JOURNEY','CURRENCY_CODE']

preprocess= preprocessing_pipeline(numeric_features,categorical_features)

In [0]:
#Convert to Numpy Array after call to preprocess function
X_Train_transformed=(preprocess.fit_transform(X_Train)).toarray()
X_Test_transformed=(preprocess.transform(X_Test)).toarray()
Y_Train_transformed = (Y_Train).to_numpy()
Y_Test_transformed = (Y_Test).to_numpy()

#### Perform K-Fold Cross validation

In [0]:
# #Create an instance of RandomForestClassifier
# models= RandomForestClassifier()

In [0]:
# # Validate the RandomForest model using K-Fold Cross Validation
# kfold = KFold(n_splits=3, shuffle=True, random_state=11)
# scores1 = []
# for train_index, test_index in kfold.split(X_TrainCV_transformed):
#     Xtrain, Xcv = X_TrainCV_transformed[train_index], X_TrainCV_transformed[test_index]
#     Ytrain, Ycv = Y_TrainCV_transformed[train_index], Y_TrainCV_transformed[test_index]
#     models.fit(Xtrain, Ytrain.ravel())
#     score1 = models.score(Xcv, Ycv)
#     scores1.append(score1)
# mean1=np.mean(scores1)
# std1=np.std(scores1)
# print(f'Score during Cross-validation are:{scores1}')
# print(f"\nMean Score with standard deviation is: {mean1:.2f} +/- {std1:.2f}")

#### Train and Predict using RandomForest

In [0]:
# Train the RandomForest model
models= RandomForestClassifier(n_estimators=2000)
models.fit(X_Train_transformed, Y_Train_transformed.ravel())
Y_pred = models.predict_proba(X_Test_transformed)[:, 1]
# Y_pred = models.predict(X_Test_transformed)
print('ROC_AUC score is:',roc_auc_score(Y_Test_transformed, Y_pred))

ROC_AUC score is: 0.9937056837177867


In [0]:
#Set the filter criteria
Predicted= np.array([0]*len(Y_pred))
for i in range(len(Predicted)):
            if Y_pred[i] >0.5:   #If we set threshold to 0.01 we get ROC of more than 0.95 but doesn`t solve purpose
                Predicted[i] = 1

In [0]:
#Build the Confusion Matrix,ROC_AUC and check number of 1s predicted
print('Confusion Matrix:\n',confusion_matrix(Y_Test_transformed, Predicted))
print('\nROC_AUC score is:',roc_auc_score(Y_Test_transformed, Predicted))
count = 0
for i in range(len(Y_pred)):
  if Predicted[i] ==1:
    count+=1
print('\nNumber of 1 is:',count)

Confusion Matrix:
 [[193834     13]
 [   177    292]]

ROC_AUC score is: 0.8112671080591851

Number of 1 is: 305


####Map the Rep. data with the rest of data

In [0]:
# #Read the data and convert it to desired format
# rephcp = spark.read.table(f"{tgt_dbname}.RepHCP_mapping_{country_name.lower()}_{product_name.lower()}")
# rephcp_df =rephcp.toPandas()
# rephcp.unpersist()

Out[26]: DataFrame[ACCOUNT_ID: bigint, USER_ID: string]

#### Store the results in database

In [0]:
Account_df = Test_df[['ACCOUNT_ID' , 'PRODUCT_NAME' ,'RX_EU5_GSK_NANO_BRICK' ,'NANOBRICK' , 'RX_SEGMENT','RX_CUSTOMER_JOURNEY','YEARMONTH']]
Initiation_df =pd.DataFrame(np.c_[Y_pred] ,columns= ["INITIATION_PROBABILITY"])
Account_df.reset_index(drop=True, inplace=True)
Initiation_df.reset_index(drop=True, inplace=True)
Initiation= pd.concat([Account_df,Initiation_df], axis=1, ignore_index=False)
Initiation = Initiation[Initiation["INITIATION_PROBABILITY"]>0.5]
# Initiation_joined = Initiation.merge(rephcp_df, on='ACCOUNT_ID', how='left')
# print(Initiation_joined.head())
# print(len(Initiation_joined))

In [0]:
mySchema = t.StructType([ t.StructField("ACCOUNT_ID", t.LongType(), True)\
                       ,t.StructField("PRODUCT_NAME", t.StringType(), True)\
                      ,t.StructField("RX_EU5_GSK_NANO_BRICK", t.StringType(), True)\
                      ,t.StructField("NANOBRICK", t.StringType(), True)\
                      ,t.StructField("RX_SEGMENT", t.StringType(), True)\
                      ,t.StructField("RX_CUSTOMER_JOURNEY", t.StringType(), True)\
                       ,t.StructField("YEARMONTH", t.DoubleType(), True)\
                       ,t.StructField("INITIATION_PROBABILITY", t.FloatType(), True)\
                      #,t.StructField("USER_ID", t.StringType(), True) 
                      ])
Py_Initiation =  spark.createDataFrame(Initiation,schema=mySchema)
Py_Initiation.write.mode("overwrite").option("mergeSchema", "true").saveAsTable(
    f"{tgt_dbname}.initiation_new_{country_name.lower()}_{product_name.lower()}"
)

In [0]:
# spark.sql('drop table opportunity_engine.initiation_new_germany_trelegy')