### Predicting Customer Bank Term Deposit Subscription

#### The data is related with direct marketing campaigns of a Portuguese banking institution. The marketing campaigns were based on phone calls. Often, more than one contact to the same client was required, in order to access if the product (bank term deposit) would be ('yes') or not ('no') subscribed. By building predictive models using machine learning algorithms, we can predict whether a clinet is subscribing to term deposit due to the campaign conducted.The classification goal is to predict if the client will subscribe a term deposit (variable y)

In [3]:
# File location and type
file_location = "/FileStore/tables/bank_full-bd3df.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ";"

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,y
58,management,married,tertiary,no,2143,yes,no,unknown,5,may,261,1,-1,0,unknown,no
44,technician,single,secondary,no,29,yes,no,unknown,5,may,151,1,-1,0,unknown,no
33,entrepreneur,married,secondary,no,2,yes,yes,unknown,5,may,76,1,-1,0,unknown,no
47,blue-collar,married,unknown,no,1506,yes,no,unknown,5,may,92,1,-1,0,unknown,no
33,unknown,single,unknown,no,1,no,no,unknown,5,may,198,1,-1,0,unknown,no
35,management,married,tertiary,no,231,yes,no,unknown,5,may,139,1,-1,0,unknown,no
28,management,single,tertiary,no,447,yes,yes,unknown,5,may,217,1,-1,0,unknown,no
42,entrepreneur,divorced,tertiary,yes,2,yes,no,unknown,5,may,380,1,-1,0,unknown,no
58,retired,married,primary,no,121,yes,no,unknown,5,may,50,1,-1,0,unknown,no
43,technician,single,secondary,no,593,yes,no,unknown,5,may,55,1,-1,0,unknown,no


### About Data

#### We have used dataset from UCI Machine Learning Repository.The data is related with direct marketing campaigns of a Portuguese banking institution.Each data entry involves the information about the customer which has features such as: 
categorical variables : 1.Job, 2.Marital, 3.Education, 4.Housing, 5.Loan, 6. Month, 7.Poutcome, 8.Contact and 9. y (dependent varaible)
Numeric Data Types : 1.Age, 2.Balance, 3.Day, 4.campaign,5.pdays 6.previous.

In the dataset the number of rows which contain the data for a customer subscribing to a term deposit is around 12% of the total data

For the purpose of the project, the dependent variable is whether the customer will subscribe to term deposit or not.

#### Data Pre-processing

In [7]:
df.printSchema()

In [8]:
from pyspark.sql.types import IntegerType, FloatType, StringType, DoubleType, TimestampType
from pyspark.sql.functions import when,count,col
from pyspark.sql.functions import *

In [9]:
df = df.withColumn('age',df['age'].cast(IntegerType())).\
         withColumn('balance',df['balance'].cast(IntegerType())).\
         withColumn('day',df['day'].cast(IntegerType())).\
         withColumn('duration',df['duration'].cast(IntegerType())).\
         withColumn('campaign',df['campaign'].cast(IntegerType())).\
         withColumn('pdays',df['pdays'].cast(IntegerType())).\
         withColumn('previous',df['previous'].cast(IntegerType())).\
         withColumn('y',regexp_replace('y', 'no', '0')).\
         withColumn('y',regexp_replace('y', 'yes', '1'))

df = df.withColumn('y',df['y'].cast(IntegerType()))

In [10]:
df.printSchema()

In [11]:
df.show(5)

In [12]:
# Checking for count of null values present in each column
df.select([count(when(col(i).isNull(),i)).alias(i) for i in df.columns]).show()

In [13]:
df.createOrReplaceTempView("bank_data")

### Analysis

#### Number of subscribed and non-subscribed customers for term deposits .

In [16]:
df.groupby(df.y).count().show()

####Top 5 professions with term deposits subscribed

In [18]:
%sql

select 
 job , count(*) as number from bank_data where y=1 group by job order by number desc limit 5;

job,number
management,1301
technician,840
blue-collar,708
admin.,631
retired,516


#####Month with highest percentage of term deposits

In [20]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F


df.groupby('month','y').count().withColumnRenamed("count","NumberOfSubscriptions").\
withColumn("Percentage",F.round(F.col("NumberOfSubscriptions")*100/F.sum("NumberOfSubscriptions").over(Window.partitionBy()),2)).sort(["y","Percentage"],ascending=[False,False]).select(['month','Percentage']).show(1,False)

#### Top 5 age group's with highest number of term deposit subscriptions

In [22]:
%sql

select
  AgeGroup,
  sum(case when y= 1 then 1 else 0 end) as SuccessfulCampaigns,
  count(y) as TotalCampaigns
from (
    select 
     case when age <= 20 then '0-20'
          when age > 20 and age <= 30  then '20-30'
          when age > 30 and age <= 40  then '30-40'
          when age > 40 and age <= 50  then '40-50'
          when age > 50 and age <= 60  then '50-60'
          when age > 60 and age <= 70  then '60-70'
          when age > 70 and age <= 80  then '70-80'
          when age > 80 and age <= 90  then '80-90'
          when age > 90 and age <= 100  then '90-100'
     end as AgeGroup,
     y
    from bank_data
)
 group by AgeGroup
 order by SuccessfulCampaigns desc limit 5

AgeGroup,SuccessfulCampaigns,TotalCampaigns
30-40,1812,17687
20-30,1112,6933
40-50,1019,11239
50-60,811,8067
60-70,284,701


#### Breakdown of campaign success by Marital status of the person in percentages

In [24]:
%sql

select MaritalStatus ,
cast(SuccessfulCampaigns*100/TotalCampaigns as decimal(4,2)) as CampaignSuccessPercentage
from (
    select
      marital as MaritalStatus,
      sum(case when y= 1 then 1 else 0 end) as SuccessfulCampaigns,
      count(y) as TotalCampaigns
    from bank_data
      group by marital
)
order by CampaignSuccessPercentage desc
limit 5

MaritalStatus,CampaignSuccessPercentage
single,14.95
divorced,11.95
married,10.12


#### success percentage of term deposit campaign for customers who already have either credit default history or housing or personal loan

In [26]:
%sql

select cast(SuccessCount*100/TotalCount as decimal(4,2)) as SuccessPercentage
from (

  select
    sum(case when (default = "yes" or housing = "yes" or loan = "yes") and y = 1 then 1 else 0 end) as SuccessCount,
    sum(case when default = "yes" or housing = "yes" or loan = "yes" then 1 else 0 end) as TotalCount
  from bank_data

)

SuccessPercentage
7.69


#### Top 5 job categories with highest campaign success percentage

In [28]:
%sql

select JobCategory,
cast(SuccessfulCampaigns*100/TotalCampaigns as decimal(4,2)) as CampaignSuccessPercentage
from (
    select
      job as JobCategory,
      sum(case when y= 1 then 1 else 0 end) as SuccessfulCampaigns,
      count(y) as TotalCampaigns
    from bank_data
      group by job
)
order by CampaignSuccessPercentage desc
limit 5

JobCategory,CampaignSuccessPercentage
student,28.68
retired,22.79
unemployed,15.5
management,13.76
admin.,12.2


#### Campaign Success Percentage by Contact Communication Type

In [30]:
%sql

select ContactType,
cast(SuccessfulCampaigns*100/TotalCampaigns as decimal(4,2)) as SuccessPercentage
from (
    select
      contact as ContactType,
      sum(case when y= 1 then 1 else 0 end) as SuccessfulCampaigns,
      count(y) as TotalCampaigns
    from bank_data
      group by contact
)
order by SuccessPercentage desc

ContactType,SuccessPercentage
cellular,14.92
telephone,13.42
unknown,4.07


#### Top 5 job categories which needed highest number of contacts in this campaign for subscribing to term deposit on an average

In [32]:
%sql

select JobCategory, 
  cast(TotalContactsMade/SuccessfulCampaigns as decimal(4,2)) as Average_Contacts_Made_Per_One_SuccessfulCampaign
from(
    select job as JobCategory,
      sum(campaign) as TotalContactsMade,
      sum(case when y= 1 then 1 else 0 end) as SuccessfulCampaigns
    from bank_data
    group by JobCategory
)
order by Average_Contacts_Made_Per_One_SuccessfulCampaign desc
limit 5

JobCategory,Average_Contacts_Made_Per_One_SuccessfulCampaign
blue-collar,38.72
entrepreneur,33.85
housemaid,32.09
services,30.6
unknown,28.03


In [33]:
%sql

select Month,
  cast(SuccessfulCampaigns*100/TotalCampaigns as decimal(4,2)) as SuccessPercentage,
  SuccessfulCampaigns,
  TotalCampaigns
from (
    select
      Month,
      sum(case when y= 1 then 1 else 0 end) as SuccessfulCampaigns,
      count(y) as TotalCampaigns
    from bank_data
      group by Month
)
order by SuccessPercentage desc limit 5

Month,SuccessPercentage,SuccessfulCampaigns,TotalCampaigns
mar,51.99,248,477
dec,46.73,100,214
sep,46.46,269,579
oct,43.77,323,738
apr,19.68,577,2932


In [34]:
data=df.select(['age','job','marital','education','default','balance','housing','loan','contact','day','month',
                                 'campaign','pdays','previous','poutcome','y'])

In [35]:
data=data.dropna()

In [36]:
train_data,test_data=data.randomSplit([0.8,0.2])

### Models

#### Linear SVC Model

In [39]:
from pyspark.ml.classification import LinearSVC
from pyspark.ml.feature import VectorAssembler,StringIndexer,StandardScaler
from pyspark.ml import Pipeline

In [40]:
# Use StringIndexer to convert the categorical columns to hold numerical data
# handleInvalid is set to 'keep' which means that when a new class name is encountered in test dataset, in that case the StringIndexer will fail to find the label, and an exception will be raised, in order to avoid that and create new labels when new class in encountered 'keep' value for handleInvalid param helps

job_indexer = StringIndexer(inputCol='job',outputCol='job_index',handleInvalid='keep')
marital_indexer = StringIndexer(inputCol='marital',outputCol='marital_index',handleInvalid='keep')
education_indexer = StringIndexer(inputCol='education',outputCol='education_index',handleInvalid='keep')
default_indexer = StringIndexer(inputCol='default',outputCol='default_index',handleInvalid='keep')
housing_indexer = StringIndexer(inputCol='housing',outputCol='housing_index',handleInvalid='keep')
loan_indexer = StringIndexer(inputCol='loan',outputCol='loan_index',handleInvalid='keep')
contact_indexer = StringIndexer(inputCol='contact',outputCol='contact_index',handleInvalid='keep')
month_indexer = StringIndexer(inputCol='month',outputCol='month_index',handleInvalid='keep')
poutcome_indexer = StringIndexer(inputCol='poutcome',outputCol='poutcome_index',handleInvalid='keep')

In [41]:
linearsvc_assembler = VectorAssembler(inputCols=['age','job_index','marital_index','education_index',
                                       'default_index','housing_index','loan_index','contact_index','month_index',
                                       'day','campaign','pdays','previous','poutcome_index'],
                            outputCol="unscaled_features")

In [42]:
# Since SVM's do not perform well on the data that is not scaled, scaling the data for better performance of the Linear SVC Model

scaler = StandardScaler(inputCol="unscaled_features",outputCol="features")

In [43]:
linear_svc_model = LinearSVC(labelCol='y')

In [44]:
# creating pipeline for all the above actions to be performed

linear_svc_pipe = Pipeline(stages=[job_indexer,marital_indexer,education_indexer,default_indexer,
                        housing_indexer,loan_indexer,contact_indexer,month_indexer,poutcome_indexer,
                        linearsvc_assembler,scaler,linear_svc_model])

In [45]:
fit_Linear_svc_model = linear_svc_pipe.fit(train_data)

In [46]:
# Store the results in a dataframe

results_linear_svc = fit_Linear_svc_model.transform(test_data)

In [47]:
results_linear_svc.select(['y','prediction']).show()

### Logistic Regression

In [49]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler,StringIndexer,OneHotEncoderEstimator
from pyspark.ml import Pipeline

In [50]:
# OneHotEncoderEstimator converts the indexed data into a vector which will be effectively handled by Logistic Regression model

data_encoder = OneHotEncoderEstimator(inputCols=['job_index','marital_index','education_index',
                                                 'contact_index','month_index','poutcome_index','day'],
                                      outputCols=['job_index_enc','marital_index_enc','education_index_enc',
                                                 'contact_index_enc','month_index_enc','poutcome_index_enc','day_enc'],
                                      handleInvalid='keep')

In [51]:
logistic_assembler = VectorAssembler(inputCols=['age','balance','job_index_enc','marital_index_enc','education_index_enc',
                                       'default_index','housing_index','loan_index','contact_index_enc','month_index_enc',
                                       'day_enc','campaign','pdays','previous','poutcome_index_enc'],
                            outputCol="features")

In [52]:
# Creating an object for the Logistic Regression model

logreg_model = LogisticRegression(labelCol='y')

In [53]:
# Pipeline is used to pass the data through indexer and assembler simultaneously. Also, it helps to pre-rocess the test data
# in the same way as that of the train data. It also 

logreg_pipe = Pipeline(stages=[job_indexer,marital_indexer,education_indexer,default_indexer,
                        housing_indexer,loan_indexer,contact_indexer,month_indexer,poutcome_indexer,
                        data_encoder,logistic_assembler,logreg_model])

In [54]:
fit_logreg_model=logreg_pipe.fit(train_data)

In [55]:
# Storing the results in a dataframe

logreg_results = fit_logreg_model.transform(test_data)

In [56]:
logreg_results.select(['y','prediction']).show()

### Decision Trees

In [58]:
# Import the required libraries

from pyspark.ml.classification import DecisionTreeClassifier

In [59]:
# Vector assembler is used to create a vector of input features

generic_assembler = VectorAssembler(inputCols=['age','job_index','marital_index','education_index',
                                       'default_index','housing_index','loan_index','contact_index','month_index',
                                       'day','campaign','pdays','previous','poutcome_index'],
                            outputCol="features")

In [60]:
# Create an object for the Logistic Regression model
# Use the parameter maxBins and assign a value that is equal to or more than the number of categories in any sigle feature

dt_model = DecisionTreeClassifier(labelCol='y',maxBins=10000)

In [61]:
# Pipeline is used to pass the data through indexer and assembler simultaneously. Also, it helps to pre-rocess the test data
# in the same way as that of the train data. It also 

dt_pipe = Pipeline(stages=[job_indexer,marital_indexer,education_indexer,default_indexer,
                        housing_indexer,loan_indexer,contact_indexer,month_indexer,poutcome_indexer,
                        generic_assembler,dt_model])

In [62]:
dt_fit_model = dt_pipe.fit(train_data)

In [63]:
# Store the results in a dataframe

dt_results = dt_fit_model.transform(test_data)

### Random Forest Classifier

In [65]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [66]:
rf_model = RandomForestClassifier(labelCol="y")

In [67]:
# creating pipeline for Random Forest Classifier

rf_pipe = Pipeline(stages=[job_indexer,marital_indexer,education_indexer,default_indexer,
                        housing_indexer,loan_indexer,contact_indexer,month_indexer,poutcome_indexer,
                        generic_assembler,rf_model])

In [68]:
# fitting data into the pipeline

rf_fit_model=rf_pipe.fit(train_data)

In [69]:
# Store the results in a dataframe

rf_results = rf_fit_model.transform(test_data)

In [70]:
rf_results.select(['y','prediction']).show()

### Gradient Boosting Classifier

In [72]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [73]:
gbt_model = GBTClassifier(labelCol="y", maxIter=100)

In [74]:
# creating pipeline for Gradinet Boosting Classifier

gbt_pipe = Pipeline(stages=[job_indexer,marital_indexer,education_indexer,default_indexer,
                        housing_indexer,loan_indexer,contact_indexer,month_indexer,poutcome_indexer,
                        generic_assembler,gbt_model])

In [75]:
gbt_fit_model = gbt_pipe.fit(train_data)

In [76]:
gbt_results = gbt_fit_model.transform(test_data)

### Weighted Logistic Regression

#### In our dataset the count of positive class(customer subscribed to term deposit) is 5289 out of 45211 total instances present in the dataset and the negative class count is ~39,000 and because of this the number of 1's in the column to be predicted is very less when we pass the training dataset for model, in order to handle this imbalancing there is a technique in the Logistic Regression where we use a hyperParameter called "weightCol" while intializing the model, in our case we created a additional column in training dataset where we assigned the Balancing Ratio (number of negtaive instances/ total instances in dataset) to the rows which are classified to fall positive class and (1-Balancing Ratio) for the negative instances so that model would get trained better when this weighted column is introduced. "ColumnWeights" is the column we have created for purpose and below is the reference link that we have used

https://medium.com/@dhiraj.p.rai/logistic-regression-in-spark-ml-8a95b5f5434c

In [79]:
from pyspark.ml.feature import StringIndexer
indexer_job = StringIndexer(inputCol="job", outputCol="jobIndex")
indexed_job_df = indexer_job.fit(df).transform(df)
indexed_job_df.show()

In [80]:
#Further expanding the same logic to other categorical columns.

from pyspark.ml.feature import StringIndexer
indexer_marital = StringIndexer(inputCol="marital", outputCol="maritalIndex")
indexed_job_marital_df = indexer_marital.fit(indexed_job_df).transform(indexed_job_df)

In [81]:
indexer_education = StringIndexer(inputCol="education", outputCol="educationIndex")
indexed_jme_df = indexer_education.fit(indexed_job_marital_df).transform(indexed_job_marital_df)

In [82]:
indexer_default = StringIndexer(inputCol="default", outputCol="defaultIndex")
indexed_jmed_df = indexer_default.fit(indexed_jme_df).transform(indexed_jme_df)

In [83]:
indexer_housing = StringIndexer(inputCol="housing", outputCol="housingIndex")
indexed_jmedh_df = indexer_housing.fit(indexed_jmed_df).transform(indexed_jmed_df)

In [84]:
indexer_loan = StringIndexer(inputCol="loan", outputCol="loanIndex")
indexed_jmedhl_df = indexer_loan.fit(indexed_jmedh_df).transform(indexed_jmedh_df)

In [85]:
indexer_contact = StringIndexer(inputCol="contact", outputCol="contactIndex")
indexed_jmedhlc_df = indexer_contact.fit(indexed_jmedhl_df).transform(indexed_jmedhl_df)

In [86]:
indexer_month = StringIndexer(inputCol="month", outputCol="monthIndex")
indexed_jmedhlcm_df = indexer_month.fit(indexed_jmedhlc_df).transform(indexed_jmedhlc_df)

In [87]:
indexer_poutcome = StringIndexer(inputCol="poutcome", outputCol="poutcomeIndex")
indexed_jmedhlcmp_df = indexer_poutcome.fit(indexed_jmedhlcm_df).transform(indexed_jmedhlcm_df)

In [88]:
cols = indexed_jmedhlcmp_df.columns
for c in {"job","marital","education","default","housing","loan","contact","month","poutcome",'y'}:
  cols.remove(c)

# Let us import the vector assembler

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=cols,outputCol="features")

# Now let us use the transform method to transform our dataset

raw_df=assembler.transform(indexed_jmedhlcmp_df)
raw_df.select("features").show(truncate=False)

In [89]:
# scaling the features column

from pyspark.ml.feature import StandardScaler
standardscaler = StandardScaler().setInputCol("features").setOutputCol("features_scaled")
raw_df = standardscaler.fit(raw_df).transform(raw_df)
raw_df.select("features","features_scaled").show(5)

In [90]:
train_data_wl, test_data_wl = raw_df.randomSplit([0.8, 0.2])

In [91]:
dataset_size = df.count()
numNegatives = df.filter(df.y == "false").count()
BalancingRatio= numNegatives/dataset_size

In [92]:
train_data_wl = train_data_wl.withColumn("classWeights", when(train_data_wl.y == "true",BalancingRatio).otherwise(1-BalancingRatio))

In [93]:
# Feature selection using chisquareSelector

from pyspark.ml.feature import ChiSqSelector
css = ChiSqSelector(featuresCol='features_scaled',outputCol='Aspect',labelCol='y',fpr=0.05)
train_data_wl=css.fit(train_data_wl).transform(train_data_wl)
test_data_wl=css.fit(test_data_wl).transform(test_data_wl)
test_data_wl.select("Aspect").show(5,truncate=False)

In [94]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="y", featuresCol="Aspect",weightCol="classWeights",maxIter=10)
model=lr.fit(train_data_wl)
predict_train=model.transform(train_data_wl)
predict_test=model.transform(test_data_wl)
predict_test.select("y","prediction").show(10)

#### Model's Evaluation

#### 1. Accuracy

In [97]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import confusion_matrix

In [98]:
#Linear SVC
linearsvc_acc_eval = MulticlassClassificationEvaluator(
    labelCol="y", predictionCol="prediction", metricName="accuracy")

# Logistic Regression
logreg_acc_eval = MulticlassClassificationEvaluator(
    labelCol="y", predictionCol="prediction", metricName="accuracy")

#Decision Trees
dt_acc_eval = MulticlassClassificationEvaluator(
    labelCol="y", predictionCol="prediction", metricName="accuracy")

#Random Forest Classifier
rf_acc_eval = MulticlassClassificationEvaluator(
    labelCol="y", predictionCol="prediction", metricName="accuracy")

#Gradient Boosting Classifier
gb_acc_eval = MulticlassClassificationEvaluator(
    labelCol="y", predictionCol="prediction", metricName="accuracy")

#Weighted Logistic Regression
wlr_acc_eval = MulticlassClassificationEvaluator(
    labelCol="y", predictionCol="prediction", metricName="accuracy")

In [99]:
#Linear SVC
accuracy_linear_svc = linearsvc_acc_eval.evaluate(results_linear_svc)

#Logistic Regression
logreg_accuracy = logreg_acc_eval.evaluate(logreg_results)

#Decision Trees
dt_accuracy = dt_acc_eval.evaluate(dt_results)

#Random Forest Classifier
rf_accuracy = rf_acc_eval.evaluate(rf_results)

#Gradient Boosting Classifier
gb_accuracy = rf_acc_eval.evaluate(gbt_results)

#Weighted Logistic Regression
wlr_accuracy = wlr_acc_eval.evaluate(predict_test)

In [100]:
#Linear SVC
print("The accuracy of linear SVC model is {}".format(accuracy_linear_svc))

#Logistic Regression
print("The accuracy of Logistic Regression model is {}".format(logreg_accuracy))

#Decision Trees
print("The accuracy of Decision Trees model is {}".format(dt_accuracy))

#Random Forest Classifier
print("The accuracy of Random Forest Classifier model is {}".format(rf_accuracy))

#Gradient Boosting Classifier
print("The accuracy of Grdaient Boosting Classifier model is {}".format(gb_accuracy))

#Weighted Logistic Regression 
print("The accuracy of Weighted Logistic Regression is {}".format(wlr_accuracy))

#### 2. Confusion Matrix

In [102]:
# Linear SVC
from sklearn.metrics import confusion_matrix
y_true = results_linear_svc.select("y")
y_true = y_true.toPandas()
y_pred = results_linear_svc.select("prediction")
y_pred = y_pred.toPandas()
confusion_matrix = confusion_matrix(y_true, y_pred)
print("confusion matrix for Linear SVC Model: \n {}".format(confusion_matrix))

# Logistic Regression
from sklearn.metrics import confusion_matrix
logreg_y_true = logreg_results.select("y")
logreg_y_true = logreg_y_true.toPandas()
logreg_y_pred = logreg_results.select("prediction")
logreg_y_pred = logreg_y_pred.toPandas()
logreg_cnf_matrix = confusion_matrix(logreg_y_true, logreg_y_pred)
print("confusion matrix for Logistic Regression Model \n {}".format(logreg_cnf_matrix))

#Decision Trees
from sklearn.metrics import confusion_matrix
dt_y_true = dt_results.select("y")
dt_y_true = dt_y_true.toPandas()
dt_y_pred = dt_results.select("prediction")
dt_y_pred = dt_y_pred.toPandas()
dt_cnf_matrix = confusion_matrix(dt_y_true, dt_y_pred)
print("confusion matrix for Decision Trees Model\n {}".format(dt_cnf_matrix))

#Random Forest Classifier
from sklearn.metrics import confusion_matrix
rf_y_true = rf_results.select("y")
rf_y_true = rf_y_true.toPandas()
rf_y_pred = rf_results.select("prediction")
rf_y_pred = rf_y_pred.toPandas()
rf_cnf_matrix = confusion_matrix(rf_y_true, rf_y_pred)
print("confusion matrix for Random Forest Classifier \n {}".format(rf_cnf_matrix))

#Gradient Boosting Classifier
from sklearn.metrics import confusion_matrix
gb_y_true = gbt_results.select("y")
gb_y_true = gb_y_true.toPandas()
gb_y_pred = gbt_results.select("prediction")
gb_y_pred = gb_y_pred.toPandas()
gb_cnf_matrix = confusion_matrix(gb_y_true, gb_y_pred)
print("confusion matrix for Gradient Boosting Classifier \n {}".format(gb_cnf_matrix))

#Weighted Logistic Regression Model
from sklearn.metrics import confusion_matrix
wlr_y_true = predict_test.select("y")
wlr_y_true = wlr_y_true.toPandas()
wlr_y_pred = predict_test.select("prediction")
wlr_y_pred = wlr_y_pred.toPandas()
wlr_cnf_matrix = confusion_matrix(wlr_y_true, wlr_y_pred)
print("confusion matrix for Weighted Logistic Regression Model \n {}".format(wlr_cnf_matrix))

### Precision and Recall

In [104]:
#Linear SVC
from sklearn.metrics import precision_score,recall_score
lsvc_precision_score = precision_score(y_true,y_pred)
lsvc_recall_score = recall_score(y_true,y_pred)
print("Precision Score for Linear SVC Model: \n {}".format(lsvc_precision_score))
print("Recall Score for Linear SVC Model: \n {}".format(lsvc_recall_score))

#Logistic Regression Model
from sklearn.metrics import precision_score,recall_score
logreg_precision_score = precision_score(logreg_y_true,logreg_y_pred)
logreg_recall_score = recall_score(logreg_y_true,logreg_y_pred)
print("Precision Score for Logistic Regression Model: \n {}".format(logreg_precision_score))
print("Recall Score for Logistic Regression Model: \n {}".format(logreg_recall_score))

#Decision Trees
from sklearn.metrics import precision_score,recall_score
dt_precision_score = precision_score(dt_y_true,dt_y_pred)
dt_recall_score = recall_score(dt_y_true,dt_y_pred)
print("Precision Score for Decision Trees Model: \n {}".format(dt_precision_score))
print("Recall Score for Decision Trees Model: \n {}".format(dt_recall_score))

#Random Forest Classifier
from sklearn.metrics import precision_score,recall_score
rf_precision_score = precision_score(rf_y_true,rf_y_pred)
rf_recall_score = recall_score(rf_y_true,rf_y_pred)
print("Precision Score for Random Forest Classifier Model: \n {}".format(rf_precision_score))
print("Recall Score for Random Forest Classifier Model: \n {}".format(rf_recall_score))

#Gradient Boosting Classifier
from sklearn.metrics import precision_score,recall_score
gb_precision_score = precision_score(gb_y_true,gb_y_pred)
gb_recall_score = recall_score(gb_y_true,gb_y_pred)
print("Precision Score for Gradient Boosting Classifier Model: \n {}".format(gb_precision_score))
print("Recall Score for Gradient Boosting Classifier Model: \n {}".format(gb_recall_score))

#Weighted Logistic Regression Model
from sklearn.metrics import precision_score,recall_score
wlg_precision_score = precision_score(wlr_y_true,wlr_y_pred)
wlg_recall_score = recall_score(wlr_y_true,wlr_y_pred)
print("Precision Score for Weighted Logistic Regression Model Model: \n {}".format(wlg_precision_score))
print("Recall Score for Weighted Logistic Regression Model Model: \n {}".format(wlg_recall_score))

### Results

#### For the business case that we have taken, the predictive models that we have developed will have an impact when they are able to predict correctly whether customer would be subscribing for the term deposit offered by the bank, so in our case the focus should be on approaching/reaching out to the customers who are willing to subscribe, in this scenario the most important aspect that we should look at is how many times the model is correctly predicting that customer would subscribe when they are subscribing in reality, that is in actual positive class (where customer is subscribing) the number of times where model is predicting that customer would subscribe (predicted positive class) should be high, this ensures that we are not ignoring that particular customers who are actually willing to subscribe to term deposit, becuase this would be beneficial to the company and is the core aspect of our classification task here. 


1.Linear SVC Model has predicted that 0 customers are subscribing to term deposits when actually 1069 customers are subscribing, though this model has 88% accuracy this does not provide any value to the company by using this model since this model has predicted no customer subscribed, in our business case/scenario accuracy does not have a greater significance rather we should concentrate more on recall followed by precision metrics.


2.Logistic Regression Model has predicted that out of 1069 actual subscriptions for the term deposit, it predicted 207 times correctly and out of 7093 cases where customers did not subscribe for term deposits in real it predicted 7816 times correctly.

3.Decision Tree Classifier has predicted that out of 1069 actual subscriptions for the term deposit, it predicted 181 times correctly and out of 7093 cases where customers did not subscribe for term deposits in real it predicted 7855 times correctly.

4.Random Forest Classifier has predicted that out of 1069 actual subscriptions for the term deposit, it predicted 161 times correctly and out of 7093 cases where customers did not subscribe for term deposits in real it predicted 7863 times correctly.

5.Gradient Boosting Classifier has predicted that out of 1069 actual subscriptions for the term deposit, it predicted 278 times correctly and out of 7093 cases where customers did not subscribe for term deposits in real it predicted 7774 times correctly.


6.Weighted Logistic Regression has predicted that out of 1069 actual subscriptions for the term deposit, it predicted 340 times correctly and out of 7093 cases where customers did not subscribe for term deposits in real it predicted 7864 times correctly.

Based on the above explanation and by looking at the above True Positive Count and Flase Negative count from the confusion matrix of different models Weighted Logistic Expression is the best model compared to remaining models since it has the maximum count for an "Actual Yes" and "Predicted Yes" Scenario which is of the prime importance to us and low value for "Actual Yes" and Predicted "No Scenario"

Based on the above explanation when we calculate the precision and recall score for each model using the respective confusion matrices and considering there needs to be a trade between the precision and recall we can say that Weighted Logistic Regression (precision 66.5% and recall 32 %) is the best model we have got to use on this dataset whether a customer would be subscribing to term deposit or not

### Area under ROC and PR Curves

In [108]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

AUC_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='y',metricName='areaUnderROC')
PR_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='y',metricName='areaUnderPR')

#Linear SVC
AUC = AUC_evaluator.evaluate(results_linear_svc)
print("Area under the ROC curve for Linear SVC Model is {}".format(AUC))
PR = PR_evaluator.evaluate(results_linear_svc)
print("Area under the PR curve for Linear SVC Model is {}\n".format(PR))

#Logistic Regression
AUC = AUC_evaluator.evaluate(logreg_results)
print("Area under the ROC curve for Logistic Regression Model is {}".format(AUC))
PR = PR_evaluator.evaluate(logreg_results)
print("Area under the PR curve for Logistic Regression Model is {}\n".format(PR))

#Decision Trees
AUC = AUC_evaluator.evaluate(dt_results)
print("Area under the ROC curve for Decision Trees is {}".format(AUC))
PR = PR_evaluator.evaluate(dt_results)
print("Area under the PR curve for Decision Trees is {}\n".format(PR))

#Random Forests
AUC = AUC_evaluator.evaluate(rf_results)
print("Area under the ROC curve for Random Forests is {}".format(AUC))
PR = PR_evaluator.evaluate(rf_results)
print("Area under the PR curve for Random Forests is {}\n".format(PR))

#Gradient Boosting Classifier
AUC = AUC_evaluator.evaluate(gbt_results)
print("Area under the ROC curve for Gradient Boosting Classifier is {}".format(AUC))
PR = PR_evaluator.evaluate(gbt_results)
print("Area under the PR curve for Gradient Boosting Classifier is {}\n".format(PR))

#Weighted Logistic Regression
AUC = AUC_evaluator.evaluate(predict_test)
print("Area under the ROC curve for Weighted Logistic Regression Model is {}".format(AUC))
PR = PR_evaluator.evaluate(predict_test)
print("Area under the PR curve for Weighted Logistic Regression Model is {}\n".format(PR))

### ROC Curve Plots

In [110]:
from sklearn.metrics import roc_curve
import matplotlib
import matplotlib.pyplot as plt

y_true_array = [y_true, logreg_y_true, dt_y_true, rf_y_true, gb_y_true, wlr_y_true]
y_pred_array = [y_pred, logreg_y_pred, dt_y_pred, rf_y_pred, gb_y_pred, wlr_y_pred]

for i in range(len(y_true_array)):
  fpr, tpr, thresholds = roc_curve(y_true_array[i], y_pred_array[i] ,pos_label=1)
  plt.plot(fpr, tpr)
  plt.plot([0, 1], [0, 1], 'k--')
  plt.axis([0, 1, 0, 1])
  if i==0:
    plt.xlabel('ROC Curve of Linear SVC Model')
  elif i==1:
    plt.xlabel("ROC Curve of Logistic Regression Model")
  elif i == 2:
    plt.xlabel("ROC Curve of Decision Trees Classifier")
  elif i == 3:
    plt.xlabel("ROC Curve of Random Forest Classifier")
  elif i == 4:
    plt.xlabel("ROC Curve of Gradient Boosting Classifier")
  elif i == 5:
    plt.xlabel("ROC Curve of Weighted Logistic Regression Model")
  display(plt.show())