## Illustration of PySpark ML usage on Bank Marketing Dataset.
This notebook is realized by **[Baligh Mnassri]** and running on a Spark cluster using Python programming language on [databricks cloud community edition]. 
[Baligh Mnassri]: https://github.com/mnassrib
[databricks cloud community edition]: https://databricks.com/try-databricks
Problem statement and dataset can be found here: https://archive.ics.uci.edu/ml/datasets/Bank+Marketing

The data are provided to test more computationally demanding machine learning algorithms. The classification goal is to predict if the customer will subscribe (yes or no) a term deposit (variable: deposit).

These data are related with direct marketing campaigns (phone calls) of a Portuguese banking institution.

###### I am importing the dataset from [Kaggle]. Next, I directly upload it to [databricks cloud]. 

[Kaggle]: https://www.kaggle.com/rouseguy/bankbalanced/data

[databricks cloud]: https://docs.databricks.com/user-guide/importing-data.html#import-data

#### Attribute Information:
##### Input variables:
* bank client data:
  * 1 - age (numeric)
  * 2 - job : type of job (categorical: 'admin.','blue-collar','entrepreneur','housemaid','management','retired','self-employed','services','student','technician','unemployed','unknown')
  * 3 - marital : marital status (categorical: 'divorced','married','single','unknown'; note: 'divorced' means divorced or widowed)
  * 4 - education (categorical: 'basic.4y','basic.6y','basic.9y','high.school','illiterate','professional.course','university.degree','unknown')
  * 5 - default: has credit in default? (categorical: 'no','yes','unknown')
  * 6 - balance: balance level
  * 7 - housing: has housing loan? (categorical: 'no','yes','unknown')
  * 8 - loan: has personal loan? (categorical: 'no','yes','unknown')
* related with the last contact of the current campaign:
  * 9 - contact: contact communication type (categorical: 'cellular','telephone') 
  * 10 - day_of_week: last contact day of the week (categorical: 'mon','tue','wed','thu','fri')
  * 11 - month: last contact month of year (categorical: 'jan', 'feb', 'mar', ..., 'nov', 'dec')
  * 12 - duration: last contact duration, in seconds (numeric). Important note: this attribute highly affects the output target (e.g., if duration=0 then y='no'). Yet, the duration is not known before a call is performed. Also, after the end of the call y is obviously known. Thus, this input should only be included for benchmark purposes and should be discarded if the intention is to have a realistic predictive model.
* other attributes:
  * 13 - campaign: number of contacts performed during this campaign and for this client (numeric, includes last contact)
  * 14 - pdays: number of days that passed by after the client was last contacted from a previous campaign (numeric; 999 means client was not previously contacted)
  * 15 - previous: number of contacts performed before this campaign and for this client (numeric)
  * 16 - poutcome: outcome of the previous marketing campaign (categorical: 'failure','nonexistent','success')

##### Output variable:
* desired target:
  * 17 - deposit - has the client subscribed a term deposit? (binary: 'yes', 'no')

### Importing needful libraries

In [3]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.functions import mean, col, split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer, IndexToString, VectorAssembler, VectorIndexer
from pyspark.ml.feature import QuantileDiscretizer, OneHotEncoderEstimator, OneHotEncoder

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

#### Beginning with SparkSession

***The entry point into all functionality in Spark is the SparkSession class.
To create a basic SparkSession, just use SparkSession.builder***

In [5]:
spark = SparkSession.builder.appName("Spark ML applied on Bank Marketing dataset").getOrCreate()

Read the data by given the dataset path after uploaded it to [databricks cloud].
[databricks cloud]: https://docs.databricks.com/user-guide/importing-data.html#import-data

In [7]:
bank_data_path = "/FileStore/tables/bank.csv"

bank_df = spark.read.csv(bank_data_path, header = 'True', inferSchema = 'True')

In [8]:
bank_df.printSchema()

In [9]:
bank_df.show(5)

Number of cutomers in the dataframe

In [11]:
clients_count = bank_df.count()
print("Number of cutomers is {}".format(clients_count))

Number of customers which are subscribed vs. those not subscribed a term deposit

In [13]:
groupBy_clients = bank_df.groupBy("deposit").count()

groupBy_clients.show()

In [14]:
display(groupBy_clients)

deposit,count
no,5873
yes,5289


Summary statistics for the numeric variables

In [16]:
bank_df.describe([t[0] for t in bank_df.dtypes if t[1] == 'int']).show()

In [17]:
display(bank_df.groupBy("job").count())

job,count
management,2566
retired,778
unknown,70
self-employed,405
student,360
blue-collar,1944
entrepreneur,328
admin.,1334
technician,1823
services,923


In [18]:
display(bank_df.groupBy("housing", "deposit").count())

housing,deposit,count
no,no,2527
no,yes,3354
yes,yes,1935
yes,no,3346


### Data preprocessing

The following function code initially inspired from [here] indexes each categorical column using the StringIndexer, and then converts the indexed categories into one-hot encoded variables. The resulting output has the binary vectors appended to the end of each row. Then, the StringIndexer is used again to encode the labels to label indices. Finally, the VectorAssembler function is used to combine all the feature columns into a single vector column. This includes both the numeric columns and the one-hot encoded binary vector columns in the dataset. Index labels, adding metadata to the label column by using the StringIndexer again to encode the labels to label indices.

Running the stages as a Pipeline is used to chain multiple Transformers and Estimators together. This puts the data through all of the feature transformations we described in a single call.
[here]: https://runawayhorse001.github.io/LearningApacheSpark/classification.html

In [21]:
def get_dummy(df, categoricalCols, continuousCols, labelCol):
  
  indexers = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) for c in categoricalCols]

  encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(),
                             outputCol="{0}_encoded".format(indexer.getOutputCol()))
              for indexer in indexers]

  assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
                              + continuousCols, outputCol="features")
  
  indexer = StringIndexer(inputCol=labelCol, outputCol='indexedLabel')

  pipeline = Pipeline(stages = indexers + encoders + [assembler] + [indexer])

  model=pipeline.fit(df)
  data = model.transform(df)

  data = data.withColumn('label', col(labelCol))
  
  return data.select('features', 'indexedLabel', 'label'), StringIndexer(inputCol='label').fit(data)

Once, we have defined our lists of categorical as well as numerical variables, we can transform the data:

In [23]:
categoricalColumns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome']
numericCols = ['age', 'balance', 'duration', 'campaign', 'pdays', 'previous']
(bank_df, labelindexer) = get_dummy(bank_df, categoricalColumns, numericCols, 'deposit')
bank_df.show(5)

**It is essential to fit the following ``featureIndexer`` model on the whole of the ``bank_df`` dataframe**. *Automatically identify categorical features, and index them. Set maxCategories so features with > 4 distinct values are treated as continuous.*

In [25]:
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(bank_df)

featureIndexer.transform(bank_df).show(5)

In [26]:
bank_df.show(5, False)

### Data splitting

Now that the dataset is all set, let's randomly split it into training and test sets. Set seed for reproducibility.

In [28]:
(trainingData, testData) = bank_df.randomSplit([0.8, 0.2], seed=10)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

In [29]:
print("The first 5 samples of the Training Dataset:")
trainingData.show(5, False)
print("The first 5 samples of the Test Dataset:")
testData.show(5, False)

#### Fit and evaluate models

We are now ready to try out some of the Binary Classification algorithms available in the Pipelines API.

Out of these algorithms, the below are also capable of supporting multiclass classification with the Python API:
- Decision Tree Classifier
- Random Forest Classifier

These are the general steps we will take to build our models:
- Create initial model using the training set
- Tune parameters with a `ParamGrid` and 5-fold Cross Validation
- Evaluate the best model obtained from the Cross Validation using the test set

We use the `BinaryClassificationEvaluator` to evaluate our models, which uses [areaUnderROC] as the default metric.

[areaUnderROC]: https://en.wikipedia.org/wiki/Receiver_operating_characteristic#Area_under_the_curve

### Logistic Regression

For more details about [Logistic Regression], read the [classification and regression] section of MLlib Programming Guide.
In the Pipelines API, It is now able to perform Elastic-Net Regularization with Logistic Regression, as well as other linear methods.

[classification and regression]: https://spark.apache.org/docs/latest/ml-classification-regression.html
[Logistic Regression]: https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression

Create initial LogisticRegression model and then train it with the Training Data

In [32]:
#lr = LogisticRegression(labelCol="indexedLabel", featuresCol="indexedFeatures") # if you would using indexedFeatures instead features column
lr = LogisticRegression(labelCol="indexedLabel", featuresCol="features")

Pipeline architecture:
  * Convert indexed labels back to original labels
  * Chain indexers and tree in a Pipeline
  * Train model

In [34]:
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelindexer.labels) 

pipeline = Pipeline(stages=[featureIndexer, lr, labelConverter])

lrModel = pipeline.fit(trainingData)

Make predictions on the test data using the ``transform()`` method. ``LogisticRegression.transform()`` will only use the column given in featuresCol parameter.

In [36]:
predictions = lrModel.transform(testData)

predictions.show(5)

You can print the elements in predictions, view model's predictions and probabilities of each prediction class. You can select any columns in the above schema to view as well. You are generally interested by the label, prediction and the probability:

In [38]:
predictions.select("features", "label", "probability", "predictedLabel").show(5)

**Compute the model accuracy**

You can create a DataFrame with the label and the prediction to check the number of class in the label and the prediction:

In [40]:
cm = predictions.select("label", "predictedLabel")			
cm.groupby('label').agg({'label': 'count'}).show()	
cm.groupby('predictedLabel').agg({'predictedLabel': 'count'}).show()

In [41]:
predictions.groupBy('label', 'predictedLabel').count().show()

For instance, in the test dataset, there are 1021 customers that have the intension to subscribe a deposit and 1197 no. The classifier, however, predicted 972 clients having the intension to subscribe a deposit. You can compute the accuracy by computing the count when the labels are correctly classified over the total number of rows.

In [43]:
print("The Accuracy for test set is {}".format(cm.filter(cm.label == cm.predictedLabel).count()/cm.count()))

Indeed, the accuracy of the model and other metrics can be computed using the ``MulticlassClassificationEvaluator()`` function:

In [45]:
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
print("The Accuracy for test set is {}".format(evaluator.evaluate(predictions)))

**Use of RDD principles to compute some other metrics**

We can also generate a Confusion Matrix to better see the results of the predictions. ``ConfusionMatrix()`` works only with RDDs, so we will have to convert our DataFrame of (prediction, label) into a RDD.

``confusionMatrix()`` returns a DenseMatrix with the columns representing the predicted class ordered by ascending class label, and each row represents the actual class ordered by ascending class label. The diagonal from top left to bottom right represents the observations that were predicted correctly.

In [47]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics

predictionAndLabel = predictions.select("prediction", "indexedLabel").rdd

# Instantiate metrics object 
metricsMulti = MulticlassMetrics(predictionAndLabel)
metricsBinary= BinaryClassificationMetrics(predictionAndLabel)
# Overall statistics 
confusionMatrix = metricsMulti.confusionMatrix()
precision = metricsMulti.precision(label=1) 
recall = metricsMulti.recall(label=1) 
f1Score = metricsMulti.fMeasure() 
print("Summary Stats")
print("Confusion Matrix = \n %s" % confusionMatrix)
print("Precision = %s" % precision) 
print("Recall = %s" % recall) 
print("F1 Score = %s" % f1Score) 

# Area under precision-recall curve 
print("Area under PR = %s" % metricsBinary.areaUnderPR) 
# Area under ROC curve 
print("Area under ROC = %s" % metricsBinary.areaUnderROC)

**Compute the area under ROC metric**

In [49]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")
print("The area under ROC for test set is {}".format(evaluator.evaluate(predictions)))

Now, we will try tuning the model with the ``ParamGridBuilder`` and the ``CrossValidator``. If you are unsure what params are available for tuning, you can use ``explainParams()`` to print a list of all params and their definitions.

In [51]:
print(lr.explainParams())

**Hyperparameter tuning using 5-fold cross validation**

In the following example, we indicate 3 values for regParam, 3 values for maxIter, and 3 values for elasticNetParam,
this grid will have then 3 x 3 x 3 = 27 parameter settings for CrossValidator to choose from.
We will create a 5-fold CrossValidator.

***Create ParamGrid for Cross Validation***

In [54]:
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")

***Create and run 5-fold CrossValidator***

In [56]:
#cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
#pipeline = Pipeline(stages=[featureIndexer, cv, labelConverter])
#cvModel = pipeline.fit(trainingData)

pipeline = Pipeline(stages=[featureIndexer, lr, labelConverter]) 
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5, parallelism=10, seed=100)
cvModel = cv.fit(trainingData)

cvModel uses the best model found from the Cross Validation. Use test or new data to measure the accuracy of the model.

In [58]:
predictions = cvModel.transform(testData)

predictions.select("features", "label", "probability", "predictedLabel").show(5)

***Evaluate the best model***

In [60]:
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
print("The Accuracy for test set is {}".format(evaluator.evaluate(predictions)))

In [61]:
predictionAndLabel = predictions.select("prediction", "indexedLabel").rdd

# Instantiate metrics object 
metricsMulti = MulticlassMetrics(predictionAndLabel)
metricsBinary= BinaryClassificationMetrics(predictionAndLabel)
# Overall statistics 
confusionMatrix = metricsMulti.confusionMatrix()
precision = metricsMulti.precision(label=1) 
recall = metricsMulti.recall(label=1) 
f1Score = metricsMulti.fMeasure() 
print("Summary Stats")
print("Confusion Matrix = \n %s" % confusionMatrix)
print("Precision = %s" % precision) 
print("Recall = %s" % recall) 
print("F1 Score = %s" % f1Score) 

# Area under precision-recall curve 
print("Area under PR = %s" % metricsBinary.areaUnderPR) 
# Area under ROC curve 
print("Area under ROC = %s" % metricsBinary.areaUnderROC)

In [62]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")
print("The area under ROC for test set is {}".format(evaluator.evaluate(predictions)))

### Decision Trees

You can read more about [Decision Trees](http://spark.apache.org/docs/latest/mllib-decision-tree.html) in the Spark MLLib Programming Guide. Decision trees are a popular family of classification and regression methods.

In [64]:
# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="features")

# Train model with Training Data.
dtModel = dt.fit(trainingData)

# Make predictions on test data.
predictions = dtModel.transform(testData)

# Evaluate the model by computing the metrics. 
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
print("The Accuracy for test set is {}".format(evaluator.evaluate(predictions)))

print("===============================================")

predictionAndLabel = predictions.select("prediction", "indexedLabel").rdd

# Instantiate metrics object 
metricsMulti = MulticlassMetrics(predictionAndLabel)
metricsBinary= BinaryClassificationMetrics(predictionAndLabel)
# Overall statistics 
confusionMatrix = metricsMulti.confusionMatrix()
precision = metricsMulti.precision(label=1) 
recall = metricsMulti.recall(label=1) 
f1Score = metricsMulti.fMeasure() 
print("Summary Stats")
print("Confusion Matrix = \n %s" % confusionMatrix)
print("Precision = %s" % precision) 
print("Recall = %s" % recall) 
print("F1 Score = %s" % f1Score) 

# Area under precision-recall curve 
print("Area under PR = %s" % metricsBinary.areaUnderPR) 
# Area under ROC curve 
print("Area under ROC = %s" % metricsBinary.areaUnderROC)

print("===============================================")

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")
print("The area under ROC for test set is {}".format(evaluator.evaluate(predictions)))


**Hyperparameter tuning using 5-fold cross validation**

In [66]:
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1, 2, 6, 10])
             .addGrid(dt.maxBins, [20, 40, 80])
             .build())

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")

pipeline = Pipeline(stages=[featureIndexer, dt, labelConverter]) 
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5, parallelism=10, seed=100)
cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)

# Evaluate the best model
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
print("The Accuracy for test set is {}".format(evaluator.evaluate(predictions)))

print("===============================================")

predictionAndLabel = predictions.select("prediction", "indexedLabel").rdd

# Instantiate metrics object 
metricsMulti = MulticlassMetrics(predictionAndLabel)
metricsBinary= BinaryClassificationMetrics(predictionAndLabel)
# Overall statistics 
confusionMatrix = metricsMulti.confusionMatrix()
precision = metricsMulti.precision(label=1) 
recall = metricsMulti.recall(label=1) 
f1Score = metricsMulti.fMeasure() 
print("Summary Stats")
print("Confusion Matrix = \n %s" % confusionMatrix)
print("Precision = %s" % precision) 
print("Recall = %s" % recall) 
print("F1 Score = %s" % f1Score) 

# Area under precision-recall curve 
print("Area under PR = %s" % metricsBinary.areaUnderPR) 
# Area under ROC curve 
print("Area under ROC = %s" % metricsBinary.areaUnderROC)

print("===============================================")

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")
print("The area under ROC for test set is {}".format(evaluator.evaluate(predictions)))

### Random Forest

You can read more about [Random Forest](https://spark.apache.org/docs/2.2.0/ml-classification-regression.html#random-forest-classifier) in the Spark MLLib Programming Guide.

In [68]:
# Create initial Random Forest Classifier
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features")

# Train model with Training Data.
rfModel = rf.fit(trainingData)

# Make predictions on test data.
predictions = rfModel.transform(testData)

# Evaluate the model by computing the metrics. 
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
print("The Accuracy for test set is {}".format(evaluator.evaluate(predictions)))

print("===============================================")

predictionAndLabel = predictions.select("prediction", "indexedLabel").rdd

# Instantiate metrics object 
metricsMulti = MulticlassMetrics(predictionAndLabel)
metricsBinary= BinaryClassificationMetrics(predictionAndLabel)
# Overall statistics 
confusionMatrix = metricsMulti.confusionMatrix()
precision = metricsMulti.precision(label=1) 
recall = metricsMulti.recall(label=1) 
f1Score = metricsMulti.fMeasure() 
print("Summary Stats")
print("Confusion Matrix = \n %s" % confusionMatrix)
print("Precision = %s" % precision) 
print("Recall = %s" % recall) 
print("F1 Score = %s" % f1Score) 

# Area under precision-recall curve 
print("Area under PR = %s" % metricsBinary.areaUnderPR) 
# Area under ROC curve 
print("Area under ROC = %s" % metricsBinary.areaUnderROC)

print("===============================================")

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")
print("The area under ROC for test set is {}".format(evaluator.evaluate(predictions)))

**Hyperparameter tuning using 5-fold cross validation**

In [70]:
paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")

pipeline = Pipeline(stages=[featureIndexer, rf, labelConverter]) 
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5, parallelism=10, seed=100)
cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)

# Evaluate the best model
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
print("The Accuracy for test set is {}".format(evaluator.evaluate(predictions)))

print("===============================================")

predictionAndLabel = predictions.select("prediction", "indexedLabel").rdd

# Instantiate metrics object 
metricsMulti = MulticlassMetrics(predictionAndLabel)
metricsBinary= BinaryClassificationMetrics(predictionAndLabel)
# Overall statistics 
confusionMatrix = metricsMulti.confusionMatrix()
precision = metricsMulti.precision(label=1) 
recall = metricsMulti.recall(label=1) 
f1Score = metricsMulti.fMeasure() 
print("Summary Stats")
print("Confusion Matrix = \n %s" % confusionMatrix)
print("Precision = %s" % precision) 
print("Recall = %s" % recall) 
print("F1 Score = %s" % f1Score) 

# Area under precision-recall curve 
print("Area under PR = %s" % metricsBinary.areaUnderPR) 
# Area under ROC curve 
print("Area under ROC = %s" % metricsBinary.areaUnderROC)

print("===============================================")

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")
print("The area under ROC for test set is {}".format(evaluator.evaluate(predictions)))

## Conclusion
This tutorial analyses a binary classification example using Spark ML applied with Python programming language. The data provided here are related with direct marketing campaigns (phone calls) of a Portuguese banking institution. Three main algorithm classifiers are tested which are Logistic regression, Decision trees and Random forest.  Different metrics are computed after hyperparameter tunings using 5-fold cross validation to evaluate the models corresponding to these algorithms.