# Ensemble models - PySpark


## Goal
To examplify the uses of ensemble models in PySpark as the ensemble models in [previous project using sklearn and keras](https://github.com/tankwin08/ensemble-models-ML-DL-) and predict if the client will subscribe (yes/no) a term deposit (variable y) using market campaign data.


## Ensemble models

Ensemble modeling is a process where multiple diverse models are created to predict an outcome, either by using many different modeling algorithms or using different training data sets. The ensemble model then aggregates the prediction of each base model and results in once final prediction for the unseen data. The motivation for using ensemble models is to reduce the generalization error of the prediction. As long as the base models are diverse and independent, the prediction error of the model decreases when the ensemble approach is used.

The approach seeks the wisdom of crowds in making a prediction. Even though the ensemble model has multiple base models within the model, it acts and performs as a single model.

A signle model generally suffers from high bias or high variance due to data quality, train and test data drift, distribution of hypothesis...

The goal of the modelling to find a method wit low bias and low variance. Thus, it is common to **aggregate** several base models to provide solutions.

### Aggregate strategies

There are multiple ways to conduct aggregation and improve the model performance either from accuracy or robustness. 

**1 Baggging**

The bagging strategy is built on the bootstrap sampling. In short, it built multiple classifer independently and in parallel using data derived from resampling from the training set. Then aggregate these classifiers using average processing or major vote to redce the variability of prediction. However the accuracy/point estimate is not improved.


**2 Boosting**

 
The boosting is similar to bagging strategy to some extent in terms of resampling methods. But it differs in two major ways:

    1 how trees are built: The Bagging method builds each tree independently while Boosting method builds one tree at a time. This additive model (ensemble) works in a forward stage-wise manner, introducing a weak learner to improve the shortcomings of existing weak learners. 
    
    2 Results combination: The Bagging method combine results at the end of the process (by averaging or "majority rules") while the boosting combines results along the way.
    
If you carefully tune parameters, boosting can result in better performance than bagging. However, boosting may not be a good choice if you have a lot of noise, as it can result in overfitting. They also tend to be harder to tune than bagging method.


**3 Stacking**

Stacking provide a whole new different way to combine classifers. There are two major differences:

    1 stacking often considers heterogeneous weak learners (different learning algorithms are combined) whereas bagging and boosting consider mainly homogeneous weak learners.
    
    2 The stacking uses a second layer model which uses the predictions of weak classifiers such as bagging and bostting results as input.


In this project, the stacking strategy was used to predict if the client will subscribe (yes/no) a term deposit (variable y) using market campaign data.


## Data

The data used for this project can be downloaded from [here](https://archive.ics.uci.edu/ml/machine-learning-databases/00222/).

The explination of the data can be found [here](https://archive.ics.uci.edu/ml/datasets/bank+marketing).

The data is related with **direct marketing campaigns of a Portuguese banking institution**. The marketing campaigns were based on phone calls. Often, more than one contact to the same client was required, in order to access if the product (bank term deposit) would be ('yes') or not ('no') subscribed. 

### Input variables:

**bank client data:**

    1 - age (numeric)
    2 - job : type of job (categorical: 'admin.','blue-collar','entrepreneur','housemaid','management','retired','self-employed','services','student','technician','unemployed','unknown')
    3 - marital : marital status (categorical: 'divorced','married','single','unknown'; note: 'divorced' means divorced or widowed)
    4 - education (categorical: 'basic.4y','basic.6y','basic.9y','high.school','illiterate','professional.course','university.degree','unknown')
    5 - default: has credit in default? (categorical: 'no','yes','unknown')
    6 - housing: has housing loan? (categorical: 'no','yes','unknown')
    7 - loan: has personal loan? (categorical: 'no','yes','unknown')
    
** related with the last contact of the current campaign:**

    8 - contact: contact communication type (categorical: 'cellular','telephone') 
    9 - month: last contact month of year (categorical: 'jan', 'feb', 'mar', ..., 'nov', 'dec')
    10 - day_of_week: last contact day of the week (categorical: 'mon','tue','wed','thu','fri')
    11 - duration: last contact duration, in seconds (numeric). 
    
**Important note:** Attribute 11 highly affects the output target (e.g., if duration=0 then y='no'). Yet, the duration is not known before a call is performed. Also, after the end of the call y is obviously known. Thus, this input should only be included for benchmark purposes and should be discarded if the intention is to have a realistic predictive model.


**other attributes:**

    12 - campaign: number of contacts performed during this campaign and for this client (numeric, includes last contact)
    13 - pdays: number of days that passed by after the client was last contacted from a previous campaign (numeric; 999 means client was not previously contacted)
    14 - previous: number of contacts performed before this campaign and for this client (numeric)
    15 - poutcome: outcome of the previous marketing campaign (categorical: 'failure','nonexistent','success')

**social and economic context attributes**

    16 - emp.var.rate: employment variation rate - quarterly indicator (numeric)
    17 - cons.price.idx: consumer price index - monthly indicator (numeric) 
    18 - cons.conf.idx: consumer confidence index - monthly indicator (numeric) 
    19 - euribor3m: euribor 3 month rate - daily indicator (numeric)
    20 - nr.employed: number of employees - quarterly indicator (numeric)

**Output variable (desired target):**

    21 - y - has the client subscribed a term deposit? (binary: 'yes','no')
    
    
    
    
## References

[Machine Learning Case Study with Spark: Make it better](http://people.stat.sc.edu/haigang/improvement.html).



## Setup pyspark

If this is your first time to set up pyspark, please follow the [intructions](https://medium.com/@naomi.fridman/install-pyspark-to-run-on-jupyter-notebook-on-windows-4ec2009de21f) to set it up properly.


In [1]:
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark import SQLContext

In [2]:
import numpy as np
import pandas as pd
pd.options.display.max_columns = None

In [3]:
from pyspark.sql.functions import *
from pyspark.ml.classification import  RandomForestClassifier
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, VectorSlicer
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
import pyspark.sql.functions as F

In [4]:
sc = SparkContext()
spark = SQLContext(sc)

## 1 Import data

In [5]:
## data
data_file = './data/bank-additional/bank-additional-full.csv'

In [6]:
df = spark.read.option("delimiter", ";").csv(data_file,header = True, inferSchema = True)

In [7]:
df.count()

41188

In [8]:
df.dtypes

[('age', 'int'),
 ('job', 'string'),
 ('marital', 'string'),
 ('education', 'string'),
 ('default', 'string'),
 ('housing', 'string'),
 ('loan', 'string'),
 ('contact', 'string'),
 ('month', 'string'),
 ('day_of_week', 'string'),
 ('duration', 'int'),
 ('campaign', 'int'),
 ('pdays', 'int'),
 ('previous', 'int'),
 ('poutcome', 'string'),
 ('emp.var.rate', 'double'),
 ('cons.price.idx', 'double'),
 ('cons.conf.idx', 'double'),
 ('euribor3m', 'double'),
 ('nr.employed', 'double'),
 ('y', 'string')]

## 2 Data explorations

In [9]:
## rename the column names
df = df.toDF(*(c.replace('.', '_') for c in df.columns))

In [10]:
df.limit(5).toPandas()

Unnamed: 0,age,job,marital,education,default,housing,loan,contact,month,day_of_week,duration,campaign,pdays,previous,poutcome,emp_var_rate,cons_price_idx,cons_conf_idx,euribor3m,nr_employed,y
0,56,housemaid,married,basic.4y,no,no,no,telephone,may,mon,261,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
1,57,services,married,high.school,unknown,no,no,telephone,may,mon,149,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
2,37,services,married,high.school,no,yes,no,telephone,may,mon,226,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
3,40,admin.,married,basic.6y,no,no,no,telephone,may,mon,151,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
4,56,services,married,high.school,no,no,yes,telephone,may,mon,307,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no


In [11]:
df.describe().toPandas()

Unnamed: 0,summary,age,job,marital,education,default,housing,loan,contact,month,day_of_week,duration,campaign,pdays,previous,poutcome,emp_var_rate,cons_price_idx,cons_conf_idx,euribor3m,nr_employed,y
0,count,41188.0,41188,41188,41188,41188,41188,41188,41188,41188,41188,41188.0,41188.0,41188.0,41188.0,41188,41188.0,41188.0,41188.0,41188.0,41188.0,41188
1,mean,40.02406040594348,,,,,,,,,,258.2850101971448,2.567592502670681,962.4754540157328,0.1729629989317276,,0.0818855006317839,93.57566436828918,-40.50260027191787,3.6212908128585366,5167.035910944004,
2,stddev,10.421249980934055,,,,,,,,,,259.27924883646455,2.770013542902331,186.9109073447414,0.4949010798392892,,1.57095974051703,0.5788400489541355,4.628197856174595,1.7344474048512557,72.25152766825924,
3,min,17.0,admin.,divorced,basic.4y,no,no,no,cellular,apr,fri,0.0,1.0,0.0,0.0,failure,-3.4,92.201,-50.8,0.634,4963.6,no
4,max,98.0,unknown,unknown,unknown,yes,yes,yes,telephone,sep,wed,4918.0,56.0,999.0,7.0,success,1.4,94.767,-26.9,5.045,5228.1,yes


In [12]:
df.groupBy('y').count().toPandas()

Unnamed: 0,y,count
0,no,36548
1,yes,4640


In [13]:
### check if missing values in the columns
from pyspark.sql.functions import col,sum
df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).show()

+---+---+-------+---------+-------+-------+----+-------+-----+-----------+--------+--------+-----+--------+--------+------------+--------------+-------------+---------+-----------+---+
|age|job|marital|education|default|housing|loan|contact|month|day_of_week|duration|campaign|pdays|previous|poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|
+---+---+-------+---------+-------+-------+----+-------+-----+-----------+--------+--------+-----+--------+--------+------------+--------------+-------------+---------+-----------+---+
|  0|  0|      0|        0|      0|      0|   0|      0|    0|          0|       0|       0|    0|       0|       0|           0|             0|            0|        0|          0|  0|
+---+---+-------+---------+-------+-------+----+-------+-----+-----------+--------+--------+-----+--------+--------+------------+--------------+-------------+---------+-----------+---+



In [14]:
## select data type = string
columnList = [item[0] for item in df.dtypes if item[1].startswith('string')]

In [15]:
columnList

['job',
 'marital',
 'education',
 'default',
 'housing',
 'loan',
 'contact',
 'month',
 'day_of_week',
 'poutcome',
 'y']

In [16]:
### check each category's number of groups
final_sum = []
for col in columnList:
    
    #col = columnList[0]
    summ = pd.DataFrame(df.select(col).groupby(col).count().orderBy(F.desc_nulls_first("count")).collect(),columns=["value","count"]) 
    final_sum.append(summ)

In [17]:
from pyspark.sql.functions import col, countDistinct

df.agg(*(countDistinct(col(c)).alias(c) for c in columnList)).show()

+---+-------+---------+-------+-------+----+-------+-----+-----------+--------+---+
|job|marital|education|default|housing|loan|contact|month|day_of_week|poutcome|  y|
+---+-------+---------+-------+-------+----+-------+-----+-----------+--------+---+
| 12|      4|        8|      3|      3|   3|      2|   10|          5|       3|  2|
+---+-------+---------+-------+-------+----+-------+-----+-----------+--------+---+



## 3 Train and test data split

In [18]:
fractions = df.select("y").distinct().withColumn("fraction", lit(0.8)).rdd.collectAsMap()
df_train = df.sampleBy('y',fractions,seed=17)
df_test = df.subtract(df_train)

In [19]:
df_test.columns

['age',
 'job',
 'marital',
 'education',
 'default',
 'housing',
 'loan',
 'contact',
 'month',
 'day_of_week',
 'duration',
 'campaign',
 'pdays',
 'previous',
 'poutcome',
 'emp_var_rate',
 'cons_price_idx',
 'cons_conf_idx',
 'euribor3m',
 'nr_employed',
 'y']

### 3.1 Data preprocessing

In [20]:
# one hot encoding and assembling
encoding_var = [i[0] for i in df.dtypes if (i[1]=='string') & (i[0]!='y')]
num_var = [i[0] for i in df.dtypes if ((i[1]=='int') | (i[1]=='double')) & (i[0]!='y')]

'''from string to interger'''
string_indexes = [StringIndexer(inputCol = c, outputCol = 'IDX_' + c, handleInvalid = 'keep') for c in encoding_var]
'''from interger to binary vectors'''
onehot_indexes = [OneHotEncoderEstimator(inputCols = ['IDX_' + c], outputCols = ['OHE_' + c]) for c in encoding_var]
label_indexes = StringIndexer(inputCol = 'y', outputCol = 'label', handleInvalid = 'keep')


## The input for the model should be binary vectors
assembler = VectorAssembler(inputCols = num_var + ['OHE_' + c for c in encoding_var], outputCol = "features")

### 3.2 Feature importances

To avoid the overfitting issue, the feature selection was conducted.

In [21]:
from FeatureImportanceSelector import ExtractFeatureImp, FeatureImpSelector

In [22]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
fi_pipe = Pipeline(stages = string_indexes + onehot_indexes + [assembler, label_indexes, rf])

In [23]:
## model fit
mod = fi_pipe.fit(df_train)
pred_train = mod.transform(df_train) ##get the predicted train values

In [24]:
ExtractFeatureImp(mod.stages[-1].featureImportances, pred_train, "features").head(10)

Unnamed: 0,idx,name,score
1,1,duration,0.218672
9,9,nr_employed,0.170947
3,3,pdays,0.162137
62,62,OHE_poutcome_success,0.135583
8,8,euribor3m,0.103517
7,7,cons_conf_idx,0.04852
6,6,cons_price_idx,0.031215
60,60,OHE_poutcome_nonexistent,0.020316
51,51,OHE_month_oct,0.015218
52,52,OHE_month_sep,0.00913


## 4 Ensemble models

### 4.1 Used all input variables

In [25]:
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier,LogisticRegression, NaiveBayes

rf = RandomForestClassifier(numTrees=20)
xgb = GBTClassifier(maxIter= 10)
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.2)
#nb = NaiveBayes(smoothing= 0.5, modelType="multinomial")

methods = {"random forest": rf,
           "logistic regression": lr,
          "boosting tree": xgb ##this needs to be different from others
          #"naive bayes": nb
          }

In [26]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
fitted_models ={}

for method_name, method in methods.items():
    
    method.setPredictionCol("prediction_" + method_name)
    if method_name != "boosting tree":
        method.setProbabilityCol("probability_" + method_name)
        method.setRawPredictionCol("raw_prediction_" + method_name)
        sel_col = "probability_" + method_name
    else:
        sel_col = "probability"
    

    pipe = Pipeline(stages = string_indexes + onehot_indexes + [assembler, label_indexes, method])
    # need to keep fitted model somewhere
    fitted_models[method_name] = pipe.fit(df_train)
    df_test = fitted_models[method_name].transform(df_test)
    
    filter_col1 = [col for col in df_test.columns if col.startswith('IDX')]
    filter_col2 = [col for col in df_test.columns if col.startswith('OHE')]
    drop_cols = filter_col1 + filter_col2 + ['features','label']
    
    evaluator= BinaryClassificationEvaluator(rawPredictionCol=sel_col, metricName= "areaUnderROC")
    print(evaluator.evaluate(df_test))
    if method_name != list(methods.keys())[len(methods)-1]: ##if it is the last layer, we will not drop columns
        df_test = df_test.drop(*drop_cols)

0.9173651541898475
0.8935541330413904
0.9463653404925537


In [27]:
df_test.limit(3).toPandas()

Unnamed: 0,age,job,marital,education,default,housing,loan,contact,month,day_of_week,duration,campaign,pdays,previous,poutcome,emp_var_rate,cons_price_idx,cons_conf_idx,euribor3m,nr_employed,y,raw_prediction_random forest,probability_random forest,prediction_random forest,raw_prediction_logistic regression,probability_logistic regression,prediction_logistic regression,IDX_job,IDX_marital,IDX_education,IDX_default,IDX_housing,IDX_loan,IDX_contact,IDX_month,IDX_day_of_week,IDX_poutcome,OHE_job,OHE_marital,OHE_education,OHE_default,OHE_housing,OHE_loan,OHE_contact,OHE_month,OHE_day_of_week,OHE_poutcome,features,label,rawPrediction,probability,prediction_boosting tree
0,60,retired,divorced,high.school,no,yes,no,telephone,may,thu,223,2,999,0,nonexistent,1.1,93.994,-36.4,4.855,5191.0,no,"[18.469669090507765, 1.5303309094922366, 0.0]","[0.9234834545253883, 0.07651654547461183, 0.0]",0.0,"[2.412882704278391, 0.33597119273934295, -4.69...","[0.8879918088105491, 0.11128016241138071, 0.00...",0.0,5.0,2.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 1.0, 0.0)","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0)","(1.0, 0.0, 0.0)","(1.0, 0.0, 0.0)","(0.0, 1.0)","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0)","(60.0, 223.0, 2.0, 999.0, 0.0, 1.1, 93.994, -3...",0.0,"[1.3070615799780285, -1.3070615799780285]","[0.9317650097070745, 0.06823499029292546]",0.0
1,26,student,single,university.degree,no,yes,no,telephone,may,wed,217,1,999,0,nonexistent,1.1,93.994,-36.4,4.859,5191.0,no,"[18.459292407360678, 1.5407075926393248, 0.0]","[0.9229646203680337, 0.07703537963196623, 0.0]",0.0,"[2.4128949176825527, 0.3359552684012981, -4.69...","[0.8879945178373172, 0.1112773707595281, 0.000...",0.0,10.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,2.0,0.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0)","(1.0, 0.0, 0.0)","(1.0, 0.0, 0.0)","(0.0, 1.0)","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0)","(26.0, 217.0, 1.0, 999.0, 0.0, 1.1, 93.994, -3...",0.0,"[1.3119994885913469, -1.3119994885913469]","[0.9323902330034555, 0.06760976699654453]",0.0
2,38,unknown,married,unknown,unknown,yes,no,telephone,may,thu,270,1,999,0,nonexistent,1.1,93.994,-36.4,4.86,5191.0,no,"[19.1173812019021, 0.8826187980979053, 0.0]","[0.9558690600951048, 0.04413093990489526, 0.0]",0.0,"[2.412796884645821, 0.3360900132153952, -4.693...","[0.8879714787209175, 0.11130038890590825, 0.00...",0.0,11.0,0.0,6.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0)","(0.0, 1.0, 0.0)","(1.0, 0.0, 0.0)","(1.0, 0.0, 0.0)","(0.0, 1.0)","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0)","(38.0, 270.0, 1.0, 999.0, 0.0, 1.1, 93.994, -3...",0.0,"[1.3070615799780285, -1.3070615799780285]","[0.9317650097070745, 0.06823499029292546]",0.0


### Second layer prediction

To bulid a model on these probability columns and see how well it can predict

In [28]:
prediction_vars = [var for var in df_test.columns if var.startswith("probability")]
vs_second_layers = VectorAssembler(inputCols= prediction_vars, outputCol= "second_layer_input")

In [29]:
second_layer = RandomForestClassifier(featuresCol= "second_layer_input", labelCol= "label",  probabilityCol = "second_layer_output")

In [30]:
# To aviod existig column problems
method_name = 'RF2'

second_layer.setPredictionCol("prediction_" + method_name)
second_layer.setProbabilityCol("probability_" + method_name)
second_layer.setRawPredictionCol("raw_prediction_" + method_name)

RandomForestClassifier_b1589c9e4ad6

In [31]:
pipe1 = Pipeline(stages = [vs_second_layers, second_layer])

model_second_layer = pipe1.fit(df_test)
df_test1 =model_second_layer.transform(df_test)

### Evaluation 

In [32]:
evaluator1= BinaryClassificationEvaluator(rawPredictionCol="probability_" + method_name, metricName= "areaUnderROC")

In [33]:
print(evaluator1.evaluate(df_test1))

0.9455766541155921


#### Confusion matrix (as our data in fact is imblance,overall accuray is not engogh)

In [48]:
from sklearn.metrics import confusion_matrix
from sklearn import metrics
from sklearn.metrics import classification_report

In [50]:
y_test = df_test1.select('label').toPandas().apply(lambda x : x[0], 1).values.tolist()

In [63]:
#y_test = df_test1.select('label').toPandas().apply(lambda x : x[0], 1).values.tolist()
y_test_pred = df_test1.select('prediction_RF2').toPandas().apply(lambda x : x[0], 1).values.tolist()

In [64]:
cm_test = confusion_matrix(y_test, y_test_pred)

In [65]:
cm_test

array([[7110,  269],
       [ 398,  520]], dtype=int64)

### 4.2 Used Feature importance + ensemble models

In [34]:
fractions = df.select("y").distinct().withColumn("fraction", lit(0.8)).rdd.collectAsMap()
df_train2 = df.sampleBy('y',fractions,seed=17)
df_test2 = df.subtract(df_train2)

In [35]:
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier,LogisticRegression, NaiveBayes

rf = RandomForestClassifier(numTrees=20,labelCol="label", featuresCol="features")

rf2 = RandomForestClassifier(labelCol="label", featuresCol="features_subset")
xgb2 = GBTClassifier(maxIter= 10,labelCol="label", featuresCol="features_subset")
lr2 = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.2,labelCol="label", featuresCol="features_subset")
#nb = NaiveBayes(smoothing= 0.5, modelType="multinomial")

methods = {"random forest": rf2,
           "logistic regression": lr2,
          "boosting tree": xgb2 ##this needs to be different from others
          #"naive bayes": nb
          }

In [36]:
##feature selection, which include rf. thus, no need to put it in the pipeline
fis = FeatureImpSelector(estimator = rf, selectorType = "numTopFeatures",
                         numTopFeatures = 7, outputCol = "features_subset")


In [37]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
fitted_models2 ={}

for method_name, method in methods.items():
    
    method.setPredictionCol("prediction_" + method_name)
    if method_name != "boosting tree":
        method.setProbabilityCol("probability_" + method_name)
        method.setRawPredictionCol("raw_prediction_" + method_name)
        sel_col = "probability_" + method_name
    else:
        sel_col = "probability"
    

    pipe2 = Pipeline(stages = string_indexes + onehot_indexes + [assembler, label_indexes,fis, method])
    # need to keep fitted model somewhere
    fitted_models2[method_name] = pipe2.fit(df_train2)
    df_test2 = fitted_models2[method_name].transform(df_test2)
    
    filter_col1 = [col for col in df_test1.columns if col.startswith('IDX')]
    filter_col2 = [col for col in df_test1.columns if col.startswith('OHE')]
    drop_cols = filter_col1 + filter_col2 + ['features','features_subset','label']
    
    evaluator= BinaryClassificationEvaluator(rawPredictionCol=sel_col, metricName= "areaUnderROC")
    print(evaluator.evaluate(df_test2))
    if method_name != list(methods.keys())[len(methods)-1]: ##if it is the last layer, we will not drop columns
        df_test2 = df_test2.drop(*drop_cols)

0.9283402584204542
0.9205904644310935
0.9451353292819138


### Second layer prediction

To bulid a model on these probability columns and see how well it can predict

In [38]:
prediction_vars2 = [var for var in df_test2.columns if var.startswith("probability")]
vs_second_layers2 = VectorAssembler(inputCols= prediction_vars2, outputCol= "second_layer_input")

In [39]:
second_layer2 = RandomForestClassifier(featuresCol= "second_layer_input", labelCol= "label",  probabilityCol = "second_layer_output")

In [45]:
# To aviod existig column problems
method_name = 'RF22'

second_layer2.setPredictionCol("prediction_" + method_name)
second_layer2.setProbabilityCol("probability_" + method_name)
second_layer2.setRawPredictionCol("raw_prediction_" + method_name)

RandomForestClassifier_42e25de3b51f

In [42]:
pipe2 = Pipeline(stages = [vs_second_layers2, second_layer2])

model_second_layer2 = pipe2.fit(df_test2)
df_test22 =model_second_layer2.transform(df_test2)

### Evaluation

In [46]:
evaluator2 = BinaryClassificationEvaluator(rawPredictionCol="probability_" + method_name, metricName= "areaUnderROC")

In [47]:
print(evaluator2.evaluate(df_test22))

0.9454093950299399


In [66]:
y_test2 = df_test22.select('label').toPandas().apply(lambda x : x[0], 1).values.tolist()
y_test_pred2 = df_test22.select('prediction_RF22').toPandas().apply(lambda x : x[0], 1).values.tolist()

In [67]:
confusion_matrix(y_test2, y_test_pred2)

array([[7162,  217],
       [ 420,  498]], dtype=int64)

## Conclusions

It seems adding the feature selection did not improve the model performance in this case. Of course, there are other parameters such as the number of paramters from the feature selection step. Another factor which significantly affect the model performance will be the imblance data. The minorty group didn't captur well with the current methods.

The ensemble model can make our results become more stable as three methods both give us very similar performance, but it did not improve the final classification results.