# Ensemble models - pyspark

The code was partially borrowed from [here](http://people.stat.sc.edu/haigang/improvement.html).

The variable importance can be used in other analysis.

In [2]:
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark import SQLContext

In [3]:
import numpy as np
import pandas as pd
pd.options.display.max_columns = None

In [4]:
from pyspark.sql.functions import *
from pyspark.ml.classification import  RandomForestClassifier
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, VectorSlicer
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

In [5]:
sc = SparkContext()
spark = SQLContext(sc)

In [6]:
## data
## where: https://archive.ics.uci.edu/ml/machine-learning-databases/00222/

data_file = './data/bank-additional/bank-additional-full.csv'

In [7]:
df = spark.read.option("delimiter", ";").csv(data_file,header = True, inferSchema = True)

In [8]:
df.count()

41188

In [9]:
df.dtypes

[('age', 'int'),
 ('job', 'string'),
 ('marital', 'string'),
 ('education', 'string'),
 ('default', 'string'),
 ('housing', 'string'),
 ('loan', 'string'),
 ('contact', 'string'),
 ('month', 'string'),
 ('day_of_week', 'string'),
 ('duration', 'int'),
 ('campaign', 'int'),
 ('pdays', 'int'),
 ('previous', 'int'),
 ('poutcome', 'string'),
 ('emp.var.rate', 'double'),
 ('cons.price.idx', 'double'),
 ('cons.conf.idx', 'double'),
 ('euribor3m', 'double'),
 ('nr.employed', 'double'),
 ('y', 'string')]

In [10]:
## rename the column names
df = df.toDF(*(c.replace('.', '_') for c in df.columns))

In [11]:
#df.limit(5).toPandas()

In [12]:
#df.describe().toPandas()

In [13]:
#df.groupBy('y').count().toPandas()

### train and test data split

In [14]:
fractions = df.select("y").distinct().withColumn("fraction", lit(0.8)).rdd.collectAsMap()
df_train = df.sampleBy('y',fractions,seed=17)
df_test = df.subtract(df_train)

In [15]:
df_test.columns

['age',
 'job',
 'marital',
 'education',
 'default',
 'housing',
 'loan',
 'contact',
 'month',
 'day_of_week',
 'duration',
 'campaign',
 'pdays',
 'previous',
 'poutcome',
 'emp_var_rate',
 'cons_price_idx',
 'cons_conf_idx',
 'euribor3m',
 'nr_employed',
 'y']

### Data preprocessing

In [16]:
# one hot encoding and assembling
encoding_var = [i[0] for i in df.dtypes if (i[1]=='string') & (i[0]!='y')]
num_var = [i[0] for i in df.dtypes if ((i[1]=='int') | (i[1]=='double')) & (i[0]!='y')]

'''from string to interger'''
string_indexes = [StringIndexer(inputCol = c, outputCol = 'IDX_' + c, handleInvalid = 'keep') for c in encoding_var]
'''from interger to binary vectors'''
onehot_indexes = [OneHotEncoderEstimator(inputCols = ['IDX_' + c], outputCols = ['OHE_' + c]) for c in encoding_var]
label_indexes = StringIndexer(inputCol = 'y', outputCol = 'label', handleInvalid = 'keep')


## The input for the model should be binary vectors
assembler = VectorAssembler(inputCols = num_var + ['OHE_' + c for c in encoding_var], outputCol = "features")

### Ensemble models

In [17]:
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier,LogisticRegression, NaiveBayes

rf = RandomForestClassifier(numTrees=20)
xgb = GBTClassifier(maxIter= 10)
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.2)
#nb = NaiveBayes(smoothing= 0.5, modelType="multinomial")

methods = {"random forest": rf,
           "logistic regression": lr,
          "boosting tree": xgb ##this needs to be different from others
          #"naive bayes": nb
          }

In [18]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
fitted_models ={}

for method_name, method in methods.items():
    
    method.setPredictionCol("prediction_" + method_name)
    if method_name != "boosting tree":
        method.setProbabilityCol("probability_" + method_name)
        method.setRawPredictionCol("raw_prediction_" + method_name)
        sel_col = "probability_" + method_name
    else:
        sel_col = "probability"
    

    pipe = Pipeline(stages = string_indexes + onehot_indexes + [assembler, label_indexes, method])
    # need to keep fitted model somewhere
    fitted_models[method_name] = pipe.fit(df_train)
    df_test = fitted_models[method_name].transform(df_test)
    
    filter_col1 = [col for col in df_test.columns if col.startswith('IDX')]
    filter_col2 = [col for col in df_test.columns if col.startswith('OHE')]
    drop_cols = filter_col1 + filter_col2 + ['features','label']
    
    
    
    evaluator= BinaryClassificationEvaluator(rawPredictionCol=sel_col, metricName= "areaUnderROC")
    print(evaluator.evaluate(df_test))
    if method_name != list(methods.keys())[len(methods)-1]: ##if it is the last layer, we will not drop columns
        df_test = df_test.drop(*drop_cols)

0.908785781708145
0.8935541330413904
0.9459866824566329


### Second layer prediction

To bulid a model on these probability columns and see how well it can predict

In [19]:
prediction_vars = [var for var in df_test.columns if var.startswith("probability")]
vs_second_layers = VectorAssembler(inputCols= prediction_vars, outputCol= "second_layer_input")

In [20]:
second_layer = RandomForestClassifier(featuresCol= "second_layer_input", labelCol= "label",  probabilityCol = "second_layer_output")

In [21]:
# To aviod existig column problems
method_name = 'RF2'

second_layer.setPredictionCol("prediction_" + method_name)
second_layer.setProbabilityCol("probability_" + method_name)
second_layer.setRawPredictionCol("raw_prediction_" + method_name)

RandomForestClassifier_563258491f72

In [22]:
pipe1 = Pipeline(stages = [vs_second_layers, second_layer])

In [24]:
model_second_layer = pipe1.fit(df_test)

In [25]:
df_test1 =model_second_layer.transform(df_test)

In [27]:
df_test1.columns

['age',
 'job',
 'marital',
 'education',
 'default',
 'housing',
 'loan',
 'contact',
 'month',
 'day_of_week',
 'duration',
 'campaign',
 'pdays',
 'previous',
 'poutcome',
 'emp_var_rate',
 'cons_price_idx',
 'cons_conf_idx',
 'euribor3m',
 'nr_employed',
 'y',
 'raw_prediction_random forest',
 'probability_random forest',
 'prediction_random forest',
 'raw_prediction_logistic regression',
 'probability_logistic regression',
 'prediction_logistic regression',
 'IDX_job',
 'IDX_marital',
 'IDX_education',
 'IDX_default',
 'IDX_housing',
 'IDX_loan',
 'IDX_contact',
 'IDX_month',
 'IDX_day_of_week',
 'IDX_poutcome',
 'OHE_job',
 'OHE_marital',
 'OHE_education',
 'OHE_default',
 'OHE_housing',
 'OHE_loan',
 'OHE_contact',
 'OHE_month',
 'OHE_day_of_week',
 'OHE_poutcome',
 'features',
 'label',
 'rawPrediction',
 'probability',
 'prediction_boosting tree',
 'second_layer_input',
 'raw_prediction_RF2',
 'probability_RF2',
 'prediction_RF2']

In [28]:
evaluator1= BinaryClassificationEvaluator(rawPredictionCol="probability_" + method_name, metricName= "areaUnderROC")

In [30]:
print(evaluator1.evaluate(df_test1))

0.9469240271736227
