# Source : https://towardsdatascience.com/machine-learning-with-pyspark-and-mllib-solving-a-binary-classification-problem-96396065d2aa

In [None]:

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

import os
print(os.listdir("../input"))


In [None]:
!pip install pyspark

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("vivek_spark_app").getOrCreate()
sdf = spark.read.load("../input/bank.csv",format="csv",inferSchema= True,header=True,sep=";")
# sdf.take(2)
sdf = sdf.withColumnRenamed("y","deposit")
sdf.printSchema()

In [None]:
pdf = sdf.toPandas()
pdf

In [None]:
pd.DataFrame(sdf.take(5), columns=sdf.columns).traspose()

In [None]:
num_features = [t[0] for t in sdf.dtypes if t[1] == 'int']
num_features

### https://spark.apache.org/docs/latest/ml-features.html#stringindexer
### https://spark.apache.org/docs/latest/ml-features.html#onehotencoderestimator
### https://spark.apache.org/docs/latest/ml-features.html#vectorassembler

In [48]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
categoricalColumns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome']
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol = 'deposit', outputCol = 'label')
stages += [label_stringIdx]
numericCols = ['age', 'balance', 'duration', 'campaign', 'pdays', 'previous']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
print(assemblerInputs)
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
print([assembler])
stages += [assembler]
print(stages)

['jobclassVec', 'maritalclassVec', 'educationclassVec', 'defaultclassVec', 'housingclassVec', 'loanclassVec', 'contactclassVec', 'poutcomeclassVec', 'age', 'balance', 'duration', 'campaign', 'pdays', 'previous']
[VectorAssembler_31a1ec005d1f]
[StringIndexer_85c9ffc6d54e, OneHotEncoderEstimator_350a52e7db43, StringIndexer_293849b93586, OneHotEncoderEstimator_20aa317bce70, StringIndexer_4643093f60b6, OneHotEncoderEstimator_e6ace1af176f, StringIndexer_47c6e89afead, OneHotEncoderEstimator_5b5ea6091e4f, StringIndexer_9ae623a5eb28, OneHotEncoderEstimator_18ae1eb47e41, StringIndexer_1f976458f01c, OneHotEncoderEstimator_f65015e3d2eb, StringIndexer_1a82d8c8935d, OneHotEncoderEstimator_aad460c1002a, StringIndexer_5ce9db597dc4, OneHotEncoderEstimator_753a9c84b04e, StringIndexer_a267d6bcd402, VectorAssembler_31a1ec005d1f]


In [49]:
from pyspark.ml import Pipeline
cols = sdf.columns
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(sdf)
sdf = pipelineModel.transform(sdf)
selectedCols = ['label', 'features'] + cols
sdf = sdf.select(selectedCols)
sdf.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



In [None]:
print(sdf.select('features').take(2))
sdf.select('label').take(2)

In [None]:
train, test = p=sdf.randomSplit([0.7, 0.3], seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))


In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth=3) #maxDepth to avoide overfitting

dtModel = dt.fit(train) # train the model
predictions_dt = dtModel.transform(test) # test the model / make prediction 

predictions_dt.select('age', 'job', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
eval = BinaryClassificationEvaluator()
eval.evaluate(predictions_dt,{eval.metricName: "areaUnderROC"})
#ROC ?

# Random Forest Classifier

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label' )
rfModel = rf.fit(train)
predictions_rf = rfModel.transform(test)
predictions_rf.select('age', 'job', 'label', 'rawPrediction', 'prediction', 'probability').toPandas()


In [None]:
eval = BinaryClassificationEvaluator()
eval.evaluate(predictions_rf,{eval.metricName: 'areaUnderROC'})

# Gradient-Boosted Tree Classifier


In [None]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier()
gbtModel = gbt.fit(train)
gbtPrediction =  gbtModel.transform(test)
gbtPrediction.select('age', 'job', 'label', 'rawPrediction', 'prediction', 'probability').toPandas()


In [None]:
eval.evaluate(gbtPrediction,{eval.metricName: "areaUnderROC"})