In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import *
import pandas

In [None]:
features1 = spark.sql("select * from VZW_SOI_PRD_TBLS.vzsoi_intl_call_base_tbl_v1")

In [None]:
features = features1.select('mtn','call_start_tm_stmp','ethnicity','age','tenure','pplan_eff_dt','prepaid_ind','access_amt','data_access_amt','feat_amt','late_pymnt_chrg_amt','occ_amt','bill_6_mth_avg','bill_curr_amt','msg_allow_share_ind','data_allow_share_ind','voice_allow_share_ind', 'bill_cycle_dt','free_plan','ccd1','primary_topic','call_ts1','tvl1','cl_desc1','primary1_topic','primary2_topic','primary3_topic','primary4_topic','primary5_topic','line_type','tvl_pass_included','can_mex_free_plan','prev_tvl_count','prev_call_count')

In [None]:
features=features.dropna(subset=['primary_topic'])

In [None]:
features = features.withColumn('age', features['age'].cast(DoubleType()))
features = features.withColumn('prev_tvl_count', features['prev_tvl_count'].cast(DoubleType()))
features = features.withColumn('prev_call_count', features['prev_call_count'].cast(DoubleType()))
features = features.withColumn('access_amt', features['access_amt'].cast(FloatType()))
features = features.withColumn('data_access_amt', features['data_access_amt'].cast(FloatType()))
features = features.withColumn('feat_amt', features['feat_amt'].cast(FloatType()))
features = features.withColumn('late_pymnt_chrg_amt', features['late_pymnt_chrg_amt'].cast(FloatType()))
features = features.withColumn('occ_amt', features['occ_amt'].cast(FloatType()))
features = features.withColumn('bill_6_mth_avg', features['bill_6_mth_avg'].cast(FloatType()))
features = features.withColumn('bill_curr_amt', features['bill_curr_amt'].cast(FloatType()))

In [None]:
features = features.withColumn('bill_6_mth_avg', when(features['bill_6_mth_avg'].isNull(), 0).otherwise(col('bill_6_mth_avg')))
features = features.withColumn('bill_curr_amt', when(features['bill_curr_amt'].isNull(), 0).otherwise(col('bill_curr_amt')))
features = features.withColumn('prev_tvl_count', when(features['prev_tvl_count'].isNull(), 0).otherwise(col('prev_tvl_count')))
features = features.withColumn('prev_call_count', when(features['prev_call_count'].isNull(), 0).otherwise(col('prev_call_count')))

In [None]:
features=features.withColumn('free_plan',lower(features['free_plan']))
features = features.withColumn('free_plan', when(features['free_plan'].isNull(), 'paid').otherwise(col('free_plan')))

In [None]:
features=features.withColumn('access_data_diff',features['access_amt']-features['data_access_amt'])
features=features.withColumn('bill_shock',(features['bill_curr_amt']-features['bill_6_mth_avg'])/features['bill_6_mth_avg'])

In [None]:
## filter numeric cols
num_cols = [col_type[0] for col_type in filter(lambda dtype: dtype[1] in {"bigint", "double", "float"}, features.dtypes)]
### Compute a dict with <col_name, median_value>
median_dict = dict()
for c in num_cols:
    median_dict[c] = features.stat.approxQuantile(c, [0.5], 0.001)[0]

In [None]:
features = features.na.fill(median_dict)

In [None]:
features=features.withColumn('tvl1_call1',datediff('call_start_tm_stmp','tvl1'))
features=features.withColumn('bill_call1',datediff('call_start_tm_stmp','bill_cycle_dt'))

In [None]:
features = features.withColumn('tvl1_call1', when(features['tvl1_call1'].isNull(), 60).otherwise(col('tvl1_call1')))
features = features.withColumn('bill_call1', when(features['bill_call1'].isNull(), 60).otherwise(col('bill_call1')))

In [None]:
features = features.withColumn('ethnicity', when(features['ethnicity'].isNull(), 'Unknown').otherwise(col('ethnicity')))
features = features.withColumn('msg_allow_share_ind', when(features['msg_allow_share_ind'].isNull(), 'Unknown').otherwise(col('msg_allow_share_ind')))
features = features.withColumn('data_allow_share_ind', when(features['data_allow_share_ind'].isNull(), 'Unknown').otherwise(col('data_allow_share_ind')))
features = features.withColumn('voice_allow_share_ind', when(features['voice_allow_share_ind'].isNull(), 'Unknown').otherwise(col('voice_allow_share_ind')))
features = features.withColumn('ccd1', when(features['ccd1'].isNull(), 'Unknown').otherwise(col('ccd1')))
features = features.withColumn('free_plan', when(features['free_plan'].isNull(), 'Unknown').otherwise(col('free_plan')))
features = features.withColumn('cl_desc1', when(features['cl_desc1'].isNull(), 'Unknown').otherwise(col('cl_desc1')))
features = features.withColumn('line_type', when(features['line_type'].isNull(), 'Unknown').otherwise(col('line_type')))
features = features.withColumn('tvl_pass_included', when(features['tvl_pass_included'].isNull(), 'Unknown').otherwise(col('tvl_pass_included')))
features = features.withColumn('can_mex_free_plan', when(features['can_mex_free_plan'].isNull(), 'Unknown').otherwise(col('can_mex_free_plan')))

In [None]:
# Converting Fraud and Stolen scenario calls in to plan & travel pass
features = features.withColumn('ccd1', when((col('ccd1') =='BHS')| (col('ccd1') =='DOM'),'CARR').otherwise(col('ccd1')))
features = features.withColumn('ccd1', when((col('ccd1') =='ITA')| (col('ccd1') =='FRA')| (col('ccd1') =='GBR'),'EU').otherwise(col('ccd1')))

In [None]:
features = features.withColumn('ccd1', when((col('ccd1') !='ITA')& (col('ccd1') !='FRA')& (col('ccd1') !='GBR')
                                                    & (col('ccd1') !='MEX')& (col('ccd1') !='CAN')& (col('ccd1') !='CSP'),
                                                    'Others').otherwise(col('ccd1')))

In [None]:
age_bucketizer = Bucketizer(splits=[float('-Inf'), 0,24,39, 54, 74,float('Inf') ],inputCol="age", outputCol="age_buckets")
tvl1_call1_bucketizer = Bucketizer(splits=[float('-Inf'), 0,1,2,7,30,float('Inf') ],inputCol="tvl1_call1", outputCol="tvl1_call1_buckets")
bill_call1_bucketizer = Bucketizer(splits=[float('-Inf'), 0,1,2,7,30,float('Inf') ],inputCol="bill_call1", outputCol="bill_call1_buckets")
access_amt_bucketizer = Bucketizer(splits=[float('-Inf'), 0,10,50,100,150,float('Inf') ],inputCol="access_amt", outputCol="access_amt_buckets")
access_data_diff_bucketizer = Bucketizer(splits=[float('-Inf'),0,10,float('Inf') ],inputCol="access_data_diff", outputCol="access_data_diff_buckets")
feat_amt_bucketizer = Bucketizer(splits=[float('-Inf'), 0,10,20,100,float('Inf') ],inputCol="feat_amt", outputCol="feat_amt_buckets")
late_pymnt_chrg_amt_bucketizer = Bucketizer(splits=[float('-Inf'), 0,6,10,float('Inf') ],inputCol="late_pymnt_chrg_amt", outputCol="late_pymnt_chrg_amt_buckets")
occ_amt_bucketizer = Bucketizer(splits=[float('-Inf'), -10,0,10,float('Inf') ],inputCol="occ_amt", outputCol="occ_amt_buckets")
prev_tvl_count_bucketizer = Bucketizer(splits=[float('-Inf'), 0,1,2,3,5,10,float('Inf') ],inputCol="prev_tvl_count", outputCol="prev_tvl_count_buckets")
prev_call_count_bucketizer = Bucketizer(splits=[float('-Inf'), 0,1,2,3,5,10,float('Inf') ],inputCol="prev_call_count", outputCol="prev_call_count_buckets")

In [None]:
features= age_bucketizer.setHandleInvalid("keep").transform(features)
features= tvl1_call1_bucketizer.setHandleInvalid("keep").transform(features)
features= bill_call1_bucketizer.setHandleInvalid("keep").transform(features)
features= access_amt_bucketizer.setHandleInvalid("keep").transform(features)
features= access_data_diff_bucketizer.setHandleInvalid("keep").transform(features)
features= feat_amt_bucketizer.setHandleInvalid("keep").transform(features)
features= late_pymnt_chrg_amt_bucketizer.setHandleInvalid("keep").transform(features)
features= occ_amt_bucketizer.setHandleInvalid("keep").transform(features)
features= prev_tvl_count_bucketizer.setHandleInvalid("keep").transform(features)
features= prev_call_count_bucketizer.setHandleInvalid("keep").transform(features)

In [None]:
#'call_start_tm_stmp','ethnicity','age','tenure','pplan_eff_dt','prepaid_ind','access_amt','data_access_amt','feat_amt','late_pymnt_chrg_amt','occ_amt','bill_6_mth_avg','bill_curr_amt','msg_allow_share_ind','data_allow_share_ind','voice_allow_share_ind', 'bill_cycle_dt','free_plan','ccd1','primary_topic','call_ts1','tvl1'
features.where(features['primary_topic']=='PHONE SETTINGS').count()

In [None]:
features = features.withColumn('primary_topic', when((col('primary_topic') =='FRAUD')| (col('primary_topic') =='STOLEN & DAMAGED'),'PLAN & TRAVEL PASS').otherwise(col('primary_topic')))
features = features.withColumn('primary_topic', when(col('primary_topic') =='PHONE SETTINGS','phone_settings').otherwise(col('primary_topic')))
features = features.withColumn('primary_topic', when(col('primary_topic') =='BILLING','billing').otherwise(col('primary_topic')))
features = features.withColumn('primary_topic', when(col('primary_topic') =='CRUISE','cruise').otherwise(col('primary_topic')))
features = features.withColumn('primary_topic', when(col('primary_topic') =='PLAN & TRAVEL PASS','plan_travelpass').otherwise(col('primary_topic')))

In [None]:
features = features.withColumn('primary1_topic', when(features['primary1_topic'].isNull(), 'Unknown').otherwise(col('primary1_topic')))
features = features.withColumn('primary1_topic', when((col('primary1_topic') =='FRAUD')| (col('primary1_topic') =='STOLEN & DAMAGED'),'PLAN & TRAVEL PASS').otherwise(col('primary1_topic')))
features = features.withColumn('primary1_topic', when(col('primary1_topic') =='PHONE SETTINGS','phone_settings').otherwise(col('primary1_topic')))
features = features.withColumn('primary1_topic', when(col('primary1_topic') =='BILLING','billing').otherwise(col('primary1_topic')))
features = features.withColumn('primary1_topic', when(col('primary1_topic') =='CRUISE','cruise').otherwise(col('primary1_topic')))
features = features.withColumn('primary1_topic', when(col('primary1_topic') =='PLAN & TRAVEL PASS','plan_travelpass').otherwise(col('primary1_topic')))

In [None]:
features = features.withColumn('primary2_topic', when(features['primary2_topic'].isNull(), 'Unknown').otherwise(col('primary2_topic')))
features = features.withColumn('primary2_topic', when((col('primary2_topic') =='FRAUD')| (col('primary2_topic') =='STOLEN & DAMAGED'),'PLAN & TRAVEL PASS').otherwise(col('primary2_topic')))
features = features.withColumn('primary2_topic', when(col('primary2_topic') =='PHONE SETTINGS','phone_settings').otherwise(col('primary2_topic')))
features = features.withColumn('primary2_topic', when(col('primary2_topic') =='BILLING','billing').otherwise(col('primary2_topic')))
features = features.withColumn('primary2_topic', when(col('primary2_topic') =='CRUISE','cruise').otherwise(col('primary2_topic')))
features = features.withColumn('primary2_topic', when(col('primary2_topic') =='PLAN & TRAVEL PASS','plan_travelpass').otherwise(col('primary2_topic')))

In [None]:
features = features.withColumn('primary3_topic', when(features['primary3_topic'].isNull(), 'Unknown').otherwise(col('primary3_topic')))
features = features.withColumn('primary3_topic', when((col('primary3_topic') =='FRAUD')| (col('primary3_topic') =='STOLEN & DAMAGED'),'PLAN & TRAVEL PASS').otherwise(col('primary3_topic')))
features = features.withColumn('primary3_topic', when(col('primary3_topic') =='PHONE SETTINGS','phone_settings').otherwise(col('primary3_topic')))
features = features.withColumn('primary3_topic', when(col('primary3_topic') =='BILLING','billing').otherwise(col('primary3_topic')))
features = features.withColumn('primary3_topic', when(col('primary3_topic') =='CRUISE','cruise').otherwise(col('primary3_topic')))
features = features.withColumn('primary3_topic', when(col('primary3_topic') =='PLAN & TRAVEL PASS','plan_travelpass').otherwise(col('primary3_topic')))

In [None]:
features = features.withColumn('primary4_topic', when(features['primary4_topic'].isNull(), 'Unknown').otherwise(col('primary4_topic')))
features = features.withColumn('primary4_topic', when((col('primary4_topic') =='FRAUD')| (col('primary4_topic') =='STOLEN & DAMAGED'),'PLAN & TRAVEL PASS').otherwise(col('primary4_topic')))
features = features.withColumn('primary4_topic', when(col('primary4_topic') =='PHONE SETTINGS','phone_settings').otherwise(col('primary4_topic')))
features = features.withColumn('primary4_topic', when(col('primary4_topic') =='BILLING','billing').otherwise(col('primary4_topic')))
features = features.withColumn('primary4_topic', when(col('primary4_topic') =='CRUISE','cruise').otherwise(col('primary4_topic')))
features = features.withColumn('primary4_topic', when(col('primary4_topic') =='PLAN & TRAVEL PASS','plan_travelpass').otherwise(col('primary4_topic')))

In [None]:
features = features.withColumn('primary5_topic', when(features['primary5_topic'].isNull(), 'Unknown').otherwise(col('primary5_topic')))
features = features.withColumn('primary5_topic', when((col('primary5_topic') =='FRAUD')| (col('primary5_topic') =='STOLEN & DAMAGED'),'PLAN & TRAVEL PASS').otherwise(col('primary5_topic')))
features = features.withColumn('primary5_topic', when(col('primary5_topic') =='PHONE SETTINGS','phone_settings').otherwise(col('primary5_topic')))
features = features.withColumn('primary5_topic', when(col('primary5_topic') =='BILLING','billing').otherwise(col('primary5_topic')))
features = features.withColumn('primary5_topic', when(col('primary5_topic') =='CRUISE','cruise').otherwise(col('primary5_topic')))
features = features.withColumn('primary5_topic', when(col('primary5_topic') =='PLAN & TRAVEL PASS','plan_travelpass').otherwise(col('primary5_topic')))

In [None]:
labelIndexer = StringIndexer(inputCol='primary_topic',outputCol='label')

In [None]:
features.dtypes

In [None]:
# Create a StringIndexer
ethnicity_idx = StringIndexer(inputCol="ethnicity",outputCol="ethnicity_index")

# Create a OneHotEncoder
ethnicity_index_vec = OneHotEncoder(inputCol="ethnicity_index",outputCol="ethnicity_index_fact")

In [None]:
msg_allow_share_ind_idx = StringIndexer(inputCol="msg_allow_share_ind",outputCol="msg_allow_share_ind_index")
msg_allow_share_ind_vec = OneHotEncoder(inputCol="msg_allow_share_ind_index",outputCol="msg_allow_share_ind_fact")
data_allow_share_ind_idx = StringIndexer(inputCol="data_allow_share_ind",outputCol="data_allow_share_ind_index")
data_allow_share_ind_vec = OneHotEncoder(inputCol="data_allow_share_ind_index",outputCol="data_allow_share_ind_fact")
voice_allow_share_ind_idx = StringIndexer(inputCol="voice_allow_share_ind",outputCol="voice_allow_share_ind_index")
voice_allow_share_ind_vec = OneHotEncoder(inputCol="voice_allow_share_ind_index",outputCol="voice_allow_share_ind_fact")
free_plan_idx = StringIndexer(inputCol="free_plan",outputCol="free_plan_index")
free_plan_vec = OneHotEncoder(inputCol="free_plan_index",outputCol="free_plan_fact")
line_type_idx = StringIndexer(inputCol="line_type",outputCol="line_type_index")
line_type_vec = OneHotEncoder(inputCol="line_type_index",outputCol="line_type_fact")
tvl_pass_included_idx = StringIndexer(inputCol="tvl_pass_included",outputCol="tvl_pass_included_index")
tvl_pass_included_vec = OneHotEncoder(inputCol="tvl_pass_included_index",outputCol="tvl_pass_included_fact")
can_mex_free_plan_idx = StringIndexer(inputCol="can_mex_free_plan",outputCol="can_mex_free_plan_index")
can_mex_free_plan_vec = OneHotEncoder(inputCol="can_mex_free_plan_index",outputCol="can_mex_free_plan_fact")

In [None]:
primary1_topic_idx = StringIndexer(inputCol="primary1_topic",outputCol="primary1_topic_index")
primary1_topic_vec = OneHotEncoder(inputCol="primary1_topic_index",outputCol="primary1_topic_fact")
primary2_topic_idx = StringIndexer(inputCol="primary2_topic",outputCol="primary2_topic_index")
primary2_topic_vec = OneHotEncoder(inputCol="primary2_topic_index",outputCol="primary2_topic_fact")
primary3_topic_idx = StringIndexer(inputCol="primary3_topic",outputCol="primary3_topic_index")
primary3_topic_vec = OneHotEncoder(inputCol="primary3_topic_index",outputCol="primary3_topic_fact")
primary4_topic_idx = StringIndexer(inputCol="primary4_topic",outputCol="primary4_topic_index")
primary4_topic_vec = OneHotEncoder(inputCol="primary4_topic_index",outputCol="primary4_topic_fact")
primary5_topic_idx = StringIndexer(inputCol="primary5_topic",outputCol="primary5_topic_index")
primary5_topic_vec = OneHotEncoder(inputCol="primary5_topic_index",outputCol="primary5_topic_fact")

In [None]:
plans_visited_ind_idx = StringIndexer(inputCol="plans_visited_ind",outputCol="plans_visited_ind_index")
plans_visited_ind_vec = OneHotEncoder(inputCol="plans_visited_ind_index",outputCol="plans_visited_ind_fact")
intl_plans_visited_ind_idx = StringIndexer(inputCol="intl_plans_visited_ind",outputCol="intl_plans_visited_ind_index")
intl_plans_visited_ind_vec = OneHotEncoder(inputCol="intl_plans_visited_ind_index",outputCol="intl_plans_visited_ind_fact")
trvln_outsid_us_ind_idx = StringIndexer(inputCol="trvln_outsid_us_ind",outputCol="trvln_outsid_us_ind_index")
trvln_outsid_us_ind_vec = OneHotEncoder(inputCol="trvln_outsid_us_ind_index",outputCol="trvln_outsid_us_ind_fact")
trip_planner_ind_idx = StringIndexer(inputCol="trip_planner_ind",outputCol="trip_planner_ind_index")
trip_planner_ind_vec = OneHotEncoder(inputCol="trip_planner_ind_index",outputCol="trip_planner_ind_fact")
while_outside_us_ind_idx = StringIndexer(inputCol="while_outside_us_ind",outputCol="while_outside_us_ind_index")
while_outside_us_ind_vec = OneHotEncoder(inputCol="while_outside_us_ind_index",outputCol="while_outside_us_ind_fact")
support_ind_idx = StringIndexer(inputCol="support_ind",outputCol="support_ind_index")
support_ind_vec = OneHotEncoder(inputCol="support_ind_index",outputCol="support_ind_fact")
billing_and_payment_ind_idx = StringIndexer(inputCol="billing_and_payment_ind",outputCol="billing_and_payment_ind_index")
billing_and_payment_ind_vec = OneHotEncoder(inputCol="billing_and_payment_ind_index",outputCol="billing_and_payment_ind_fact")
plan_and_account_ind_idx = StringIndexer(inputCol="plan_and_account_ind",outputCol="plan_and_account_ind_index")
plan_and_account_ind_vec = OneHotEncoder(inputCol="plan_and_account_ind_index",outputCol="plan_and_account_ind_fact")
device_troubleshooting_assistant_ind_idx = StringIndexer(inputCol="device_troubleshooting_assistant_ind",outputCol="device_troubleshooting_assistant_ind_index")
device_troubleshooting_assistant_ind_vec = OneHotEncoder(inputCol="device_troubleshooting_assistant_ind_index",outputCol="device_troubleshooting_assistant_ind_fact")
international_services_ind_idx = StringIndexer(inputCol="international_services_ind",outputCol="international_services_ind_index")
international_services_ind_vec = OneHotEncoder(inputCol="international_services_ind_index",outputCol="international_services_ind_fact")
bill_ind_idx = StringIndexer(inputCol="bill_ind",outputCol="bill_ind_index")
bill_ind_vec = OneHotEncoder(inputCol="bill_ind_index",outputCol="bill_ind_fact")
overview_ind_idx = StringIndexer(inputCol="overview_ind",outputCol="overview_ind_index")
overview_ind_vec = OneHotEncoder(inputCol="overview_ind_index",outputCol="overview_ind_fact")
next_bill_estimate_ind_idx = StringIndexer(inputCol="next_bill_estimate_ind",outputCol="next_bill_estimate_ind_index")
next_bill_estimate_ind_vec = OneHotEncoder(inputCol="next_bill_estimate_ind_index",outputCol="next_bill_estimate_ind_fact")
payment_history_ind_idx = StringIndexer(inputCol="payment_history_ind",outputCol="payment_history_ind_index")
payment_history_ind_vec = OneHotEncoder(inputCol="payment_history_ind_index",outputCol="payment_history_ind_fact")
family_controls_ind_idx = StringIndexer(inputCol="family_controls_ind",outputCol="family_controls_ind_index")
family_controls_ind_vec = OneHotEncoder(inputCol="family_controls_ind_index",outputCol="family_controls_ind_fact")

In [None]:
mfapp_plans_visited_ind_idx = StringIndexer(inputCol="mfapp_plans_visited_ind",outputCol="mfapp_plans_visited_ind_index")
mfapp_plans_visited_ind_vec = OneHotEncoder(inputCol="mfapp_plans_visited_ind_index",outputCol="mfapp_plans_visited_ind_fact")
mfapp_intl_plans_visited_ind_idx = StringIndexer(inputCol="mfapp_intl_plans_visited_ind",outputCol="mfapp_intl_plans_visited_ind_index")
mfapp_intl_plans_visited_ind_vec = OneHotEncoder(inputCol="mfapp_intl_plans_visited_ind_index",outputCol="mfapp_intl_plans_visited_ind_fact")
mfapp_trvln_outsid_us_ind_idx = StringIndexer(inputCol="mfapp_trvln_outsid_us_ind",outputCol="mfapp_trvln_outsid_us_ind_index")
mfapp_trvln_outsid_us_ind_vec = OneHotEncoder(inputCol="mfapp_trvln_outsid_us_ind_index",outputCol="mfapp_trvln_outsid_us_ind_fact")
mfapp_trip_planner_ind_idx = StringIndexer(inputCol="mfapp_trip_planner_ind",outputCol="mfapp_trip_planner_ind_index")
mfapp_trip_planner_ind_vec = OneHotEncoder(inputCol="mfapp_trip_planner_ind_index",outputCol="mfapp_trip_planner_ind_fact")
mfapp_while_outside_us_ind_idx = StringIndexer(inputCol="mfapp_while_outside_us_ind",outputCol="mfapp_while_outside_us_ind_index")
mfapp_while_outside_us_ind_vec = OneHotEncoder(inputCol="mfapp_while_outside_us_ind_index",outputCol="mfapp_while_outside_us_ind_fact")
mfapp_support_ind_idx = StringIndexer(inputCol="mfapp_support_ind",outputCol="mfapp_support_ind_index")
mfapp_support_ind_vec = OneHotEncoder(inputCol="mfapp_support_ind_index",outputCol="support_ind_fact")
mfapp_billing_and_payment_ind_idx = StringIndexer(inputCol="mfapp_billing_and_payment_ind",outputCol="mfapp_billing_and_payment_ind_index")
mfapp_billing_and_payment_ind_vec = OneHotEncoder(inputCol="mfapp_billing_and_payment_ind_index",outputCol="mfapp_billing_and_payment_ind_fact")
mfapp_plan_and_account_ind_idx = StringIndexer(inputCol="mfapp_plan_and_account_ind",outputCol="mfapp_plan_and_account_ind_index")
mfapp_plan_and_account_ind_vec = OneHotEncoder(inputCol="mfapp_plan_and_account_ind_index",outputCol="mfapp_plan_and_account_ind_fact")
mfapp_device_troubleshooting_assistant_ind_idx = StringIndexer(inputCol="mfapp_device_troubleshooting_assistant_ind",outputCol="mfapp_device_troubleshooting_assistant_ind_index")
mfapp_device_troubleshooting_assistant_ind_vec = OneHotEncoder(inputCol="mfapp_device_troubleshooting_assistant_ind_index",outputCol="mfapp_device_troubleshooting_assistant_ind_fact")
mfapp_international_services_ind_idx = StringIndexer(inputCol="mfapp_international_services_ind",outputCol="mfapp_international_services_ind_index")
mfapp_international_services_ind_vec = OneHotEncoder(inputCol="mfapp_international_services_ind_index",outputCol="mfapp_international_services_ind_fact")
mfapp_bill_ind_idx = StringIndexer(inputCol="mfapp_bill_ind",outputCol="mfapp_bill_ind_index")
mfapp_bill_ind_vec = OneHotEncoder(inputCol="mfapp_bill_ind_index",outputCol="mfapp_bill_ind_fact")
mfapp_overview_ind_idx = StringIndexer(inputCol="mfapp_overview_ind",outputCol="mfapp_overview_ind_index")
mfapp_overview_ind_vec = OneHotEncoder(inputCol="mfapp_overview_ind_index",outputCol="mfapp_overview_ind_fact")
mfapp_next_bill_estimate_ind_idx = StringIndexer(inputCol="mfapp_next_bill_estimate_ind",outputCol="mfapp_next_bill_estimate_ind_index")
mfapp_next_bill_estimate_ind_vec = OneHotEncoder(inputCol="mfapp_next_bill_estimate_ind_index",outputCol="mfapp_next_bill_estimate_ind_fact")
mfapp_payment_history_ind_idx = StringIndexer(inputCol="mfapp_payment_history_ind",outputCol="mfapp_payment_history_ind_index")
mfapp_payment_history_ind_vec = OneHotEncoder(inputCol="mfapp_payment_history_ind_index",outputCol="mfapp_payment_history_ind_fact")
mfapp_family_controls_ind_idx = StringIndexer(inputCol="mfapp_family_controls_ind",outputCol="mfapp_family_controls_ind_index")
mfapp_family_controls_ind_vec = OneHotEncoder(inputCol="mfapp_family_controls_ind_index",outputCol="mfapp_family_controls_ind_fact")

In [None]:
age_idx = StringIndexer(inputCol="age_buckets",outputCol="age_buckets_index")
age_vec = OneHotEncoder(inputCol="age_buckets_index",outputCol="age_buckets_fact")
#call_estd_date_idx = StringIndexer(inputCol="call_estd_date_buckets",outputCol="call_estd_date_buckets_index")
#call_estd_date_vec = OneHotEncoder(inputCol="call_estd_date_buckets_index",outputCol="call_estd_date_buckets_fact")
tvl1_call1_idx = StringIndexer(inputCol="tvl1_call1_buckets",outputCol="tvl1_call1_buckets_index")
tvl1_call1_vec = OneHotEncoder(inputCol="tvl1_call1_buckets_index",outputCol="tvl1_call1_buckets_fact")
bill_call1_idx = StringIndexer(inputCol="bill_call1_buckets",outputCol="bill_call1_buckets_index")
bill_call1_vec = OneHotEncoder(inputCol="bill_call1_buckets_index",outputCol="bill_call1_buckets_fact")
access_amt_idx = StringIndexer(inputCol="access_amt_buckets",outputCol="access_amt_buckets_index")
access_amt_vec = OneHotEncoder(inputCol="access_amt_buckets_index",outputCol="access_amt_buckets_fact")
access_data_diff_idx = StringIndexer(inputCol="access_data_diff_buckets",outputCol="access_data_diff_buckets_index")
access_data_diff_vec = OneHotEncoder(inputCol="access_data_diff_buckets_index",outputCol="access_data_diff_buckets_fact")
feat_amt_idx = StringIndexer(inputCol="feat_amt_buckets",outputCol="feat_amt_buckets_index")
feat_amt_vec = OneHotEncoder(inputCol="feat_amt_buckets_index",outputCol="feat_amt_buckets_fact")
late_pymnt_chrg_amt_idx = StringIndexer(inputCol="late_pymnt_chrg_amt_buckets",outputCol="late_pymnt_chrg_amt_buckets_index")
late_pymnt_chrg_amt_vec = OneHotEncoder(inputCol="late_pymnt_chrg_amt_buckets_index",outputCol="late_pymnt_chrg_amt_fact")
occ_amt_idx = StringIndexer(inputCol="occ_amt_buckets",outputCol="occ_amt_buckets_index")
occ_amt_vec = OneHotEncoder(inputCol="occ_amt_buckets_index",outputCol="occ_amt_fact")
prev_tvl_count_idx = StringIndexer(inputCol="prev_tvl_count_buckets",outputCol="prev_tvl_count_buckets_index")
prev_tvl_count_vec = OneHotEncoder(inputCol="prev_tvl_count_buckets_index",outputCol="prev_tvl_count_fact")
prev_call_count_idx = StringIndexer(inputCol="prev_call_count_buckets",outputCol="prev_call_count_buckets_index")
prev_call_count_vec = OneHotEncoder(inputCol="prev_call_count_buckets_index",outputCol="prev_call_count_fact")

In [None]:
recent_cntry_idx = StringIndexer(inputCol="ccd1",outputCol="ccd1_index")
recent_cntry_vec = OneHotEncoder(inputCol="ccd1_index",outputCol="ccd1_fact")

In [None]:
vec_assembler = VectorAssembler(inputCols=["ethnicity_index_fact","msg_allow_share_ind_fact","data_allow_share_ind_fact",
                                           "voice_allow_share_ind_fact","age_buckets_fact",
                                           "tvl1_call1_buckets_fact","bill_call1_buckets_fact",
                                           "free_plan_fact","line_type_fact","tvl_pass_included_fact","can_mex_free_plan_fact","primary1_topic_fact",
                                           "primary2_topic_fact","primary3_topic_fact","primary4_topic_fact","primary5_topic_fact", 
                                           "access_amt_buckets_fact","access_data_diff_buckets_fact","feat_amt_buckets_fact",
                                           "late_pymnt_chrg_amt_fact","occ_amt_fact","prev_tvl_count_fact","prev_call_count_fact",
                                           "ccd1_fact"], outputCol="features_final")

In [None]:
vec_assembler_v1 = VectorAssembler(inputCols=["ethnicity_index_fact","msg_allow_share_ind_fact","data_allow_share_ind_fact",
                                           "voice_allow_share_ind_fact","age_buckets_fact",
                                           "tvl1_call1_buckets_fact","bill_call1_buckets_fact",
                                           "free_plan_fact","line_type_fact","tvl_pass_included_fact","can_mex_free_plan_fact","primary1_topic_fact",
                                           "primary2_topic_fact","primary3_topic_fact","primary4_topic_fact","primary5_topic_fact", 
                                           "access_amt_buckets_fact","access_data_diff_buckets_fact","feat_amt_buckets_fact",
                                           "late_pymnt_chrg_amt_fact","occ_amt_fact","prev_tvl_count_fact","prev_call_count_fact",
                                           "ccd1_fact"], outputCol="features_final")

In [None]:
stages=[labelIndexer,vec_assembler_v1]

In [None]:
stages

In [None]:
# Import Pipeline
from pyspark.ml import Pipeline

# Make the pipeline
dataset_pipe = Pipeline(stages=[ethnicity_idx,ethnicity_index_vec,msg_allow_share_ind_idx,data_allow_share_ind_idx,
                                           voice_allow_share_ind_idx,age_idx,
                                           tvl1_call1_idx,bill_call1_idx,
                                           free_plan_idx,
                                           msg_allow_share_ind_vec,data_allow_share_ind_vec,
                                           voice_allow_share_ind_vec,age_vec,
                                           tvl1_call1_vec,bill_call1_vec,
                                           free_plan_vec,line_type_idx,line_type_vec,tvl_pass_included_idx,tvl_pass_included_vec,
                                can_mex_free_plan_idx,can_mex_free_plan_vec,primary1_topic_idx,primary1_topic_vec,
                                primary2_topic_idx,primary2_topic_vec,primary3_topic_idx,primary3_topic_vec,
                                primary4_topic_idx,primary4_topic_vec,primary5_topic_idx,primary5_topic_vec,
                                access_amt_idx,access_amt_vec,access_data_diff_idx ,access_data_diff_vec,feat_amt_idx,feat_amt_vec,
                               late_pymnt_chrg_amt_idx, late_pymnt_chrg_amt_vec,occ_amt_idx,occ_amt_vec,
                                prev_tvl_count_idx,prev_tvl_count_vec,prev_call_count_idx,prev_call_count_vec,
                                recent_cntry_idx,recent_cntry_vec,                                
                                vec_assembler,labelIndexer])

In [None]:
training, testing = features.randomSplit([0.8,0.2])
training_data_pipeline = dataset_pipe.fit(training)
training_datapiped=training_data_pipeline.transform(training)

In [None]:
import pandas as pd
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
training_datapiped.select('features_final').limit(10).toPandas()

In [None]:
pipeline_model=pipeline.fit(training)

In [None]:
training_transform=pipeline_model.transform(training)

In [None]:
test_null=training_datapiped.select([count(when(col(c).isNull(), c)).alias(c) for c in training_datapiped.columns]).toPandas()

In [None]:
import pandas as pd
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
test_null

In [None]:
test_null.to_csv('/user/vjain01/null_check')

In [None]:
from pyspark.ml import Pipeline,PipelineModel
training_data_pipeline.save("/user/vjain01/final_nosc_vf")
load_dataset_pipe = PipelineModel.load("/user/vjain01/final_nosc_vf")

In [None]:
testing_datapipe=load_dataset_pipe.transform(testing)

In [None]:
training_data_pipe.where(training_data_pipe['primary_topic']=='plan_travelpass').count()

In [None]:
training_data_pipe_bill=training_datapiped.filter(training_datapiped.primary_topic=='billing').limit(36000)
training_data_pipe_crus=training_datapiped.filter(training_datapiped.primary_topic=='cruise').limit(36000)
training_data_pipe_plan=training_datapiped.filter(training_datapiped.primary_topic=='plan_travelpass').limit(36000)
training_data_pipe_ph=training_datapiped.filter(training_datapiped.primary_topic=='phone_settings').limit(36000)

In [None]:
import functools 
def unionAll(dfs):
    return functools.reduce(lambda df1,df2: df1.union(df2.select(df1.columns)), dfs) 
training_data_pipe_all = unionAll([training_data_pipe_bill,training_data_pipe_crus,training_data_pipe_plan,training_data_pipe_ph])

In [None]:
training_data_pipe_all.select('label').show(1000)

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol="features_final",
                           labelCol="label",
                           predictionCol="Prediction_Value")

# Fit the model
lrModel = lr.fit(training_data_pipe_all)

In [None]:
from pyspark.ml.classification import  RandomForestClassifier
rf = RandomForestClassifier(featuresCol="features_final",
                           labelCol="label",
                           predictionCol="Prediction_Value", rawPredictionCol='Probability_value',
                           maxDepth=16,numTrees=10000,minInstancesPerNode=100)

# Fit the model
rfModel = rf.fit(training_data_pipe_all)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
predictions = lrModel.transform(testing_datapipe)
evaluatorRF = MulticlassClassificationEvaluator(labelCol="label", predictionCol="Prediction_Value", metricName="accuracy")
accuracy = evaluatorRF.evaluate(predictions)

print("Accuracy = %g" % accuracy)
print("Test Error = %g" % (1.0 - accuracy))

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
predictions = rfModel.transform(testing_datapipe)
evaluatorRFF = MulticlassClassificationEvaluator(labelCol="label", predictionCol="Prediction_Value", metricName="accuracy")
accuracy = evaluatorRFF.evaluate(predictions)

In [None]:
print("Accuracy = %g" % accuracy)
print("Test Error = %g" % (1.0 - accuracy))

In [None]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics
predictions_cfp=predictions.select("Prediction_Value", "label").rdd
metrics=MulticlassMetrics(predictions_cfp)
print(metrics.confusionMatrix())

In [None]:
##########Cross Validation###############
for i in range(10):
    training, testing = features.randomSplit([0.8,0.2],seed=i)
    training_data_pipeline = dataset_pipe.fit(training)
    training_datapiped=training_data_pipeline.transform(training)
    testing_datapipe=training_data_pipeline.transform(testing)
    training_data_pipe_bill=training_datapiped.filter(training_datapiped.primary_topic=='billing').limit(36000)
    training_data_pipe_crus=training_datapiped.filter(training_datapiped.primary_topic=='cruise').limit(36000)
    training_data_pipe_plan=training_datapiped.filter(training_datapiped.primary_topic=='plan_travelpass').limit(36000)
    training_data_pipe_ph=training_datapiped.filter(training_datapiped.primary_topic=='phone_settings').limit(36000)
    training_data_pipe_all = unionAll([training_data_pipe_bill,training_data_pipe_crus,training_data_pipe_plan,training_data_pipe_ph])
    lr = LogisticRegression(featuresCol="features_final",
                               labelCol="label",
                               predictionCol="Prediction_Value")

    # Fit the model
    lrModel = lr.fit(training_data_pipe_all)
    predictions = lrModel.transform(testing_datapipe)
    evaluatorRF = MulticlassClassificationEvaluator(labelCol="label", predictionCol="Prediction_Value", metricName="accuracy")
    accuracy = evaluatorRF.evaluate(predictions)
    print("Accuracy = %g" % accuracy)
    print("Test Error = %g" % (1.0 - accuracy))