# Machine Learning with PySpark and MLlib: Solving a Binary Classification Problem

Here, we will learn how to build a **Binary Classification** application using **PySpark** and **MLlib Pipelines API**. 

We tried **Logistic Regression**, **Decision Tree**, **Random Forest**, and **Gradient-Boosted Tree** algorithms and **Gradient Boosting Tree**  performed best on the data set.

**[Apache Spark](https://spark.apache.org/)**, once a component of the **[Hadoop](http://hadoop.apache.org/)** ecosystem, is now becoming the Big-Data platform of choice for enterprises. It is a powerful open source engine that provides real-time stream processing, interactive processing, graph processing, in-memory processing as well as batch processing with very fast speed, ease of use and standard interface.

In [None]:
!pip install pyspark

In [None]:

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the "../input/" directory.

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# Any results you write to the current directory are saved as output.

## Exploring Dataset

We will use the same data set when we built a [Logistic Regression](https://towardsdatascience.com/building-a-logistic-regression-in-python-step-by-step-becd4d56c9c8/) in Python, and it is related to direct marketing campaigns (phone calls) of a Portuguese banking institution. The classification goal is to predict whether the client will subscribe (Yes/No) to a term deposit. The dataset can be downloaded from [Kaggle](https://www.kaggle.com/rouseguy/bankbalanced/data).

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('ml-bank').getOrCreate()
sdf = spark.read.csv('../input/bankbalanced/bank.csv', header = True, inferSchema = True)
sdf.printSchema()

**Input variables:** age, job, marital, education, default, balance, housing, loan, contact, day, month, duration, campaign, pdays, previous, poutcome

**Output variable:** deposit

Have a peek of the first five observations. Pandas data frame is prettier than Spark DataFrame show() method.

In [None]:
import pandas as pd

pd.DataFrame(sdf.take(5), columns=sdf.columns).transpose()

Dataset classes are perfect balanced.

In [None]:
sdf.toPandas().groupby(['deposit']).size()

### Summary statistics for numeric variables

In [None]:
numeric_features = [t[0] for t in sdf.dtypes if t[1] == 'int']
sdf.select(numeric_features).describe().toPandas().transpose()

### Correlations between independent variables

In [None]:

numeric_data = sdf.select(numeric_features).toPandas()
axs = pd.plotting.scatter_matrix(numeric_data, figsize=(8, 8));
n = len(numeric_data.columns)

for i in range(n):
    v = axs[i, 0]
    v.yaxis.label.set_rotation(0)
    v.yaxis.label.set_ha('right')
    v.set_yticks(())
    h = axs[n-1, i]
    h.xaxis.label.set_rotation(90)
    h.set_xticks(())

It’s obvious that there aren’t highly correlated numeric variables. Therefore, we will keep all of them for the model. 

However, day and month columns are not really useful, we will remove these two columns.

In [None]:
sdf = sdf.select(
    'age', 'job', 'marital', 'education', 'default', 'balance', 'housing', 'loan', 
    'contact', 'duration', 'campaign', 'pdays', 'previous', 'poutcome', 'deposit'
)
cols = sdf.columns
sdf.printSchema()

## Preparing Dataset for Machine Learning

The process includes **Category Indexing**, **One-Hot Encoding** and **VectorAssembler** (a feature transformer that merges multiple columns into a vector column).

This  code  indexes each categorical column using the <code>StringIndexer</code>, then converts the indexed categories into *one-hot encoded* variables. The resulting output has the binary vectors appended to the end of each row. We use the <code>StringIndexer</code> again to encode our labels to label indices. Next, we use the <code>VectorAssembler</code>  to combine all the feature columns into a single vector column.

In [None]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler

stages = []
categoricalColumns = [
    'job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome'
]

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(
        inputCols=[stringIndexer.getOutputCol()], 
        outputCols=[categoricalCol + "classVec"]
    )
    stages += [stringIndexer, encoder]
    
label_stringIdx = StringIndexer(inputCol = 'deposit', outputCol = 'label')
stages += [label_stringIdx]
numericCols = ['age', 'balance', 'duration', 'campaign', 'pdays', 'previous']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

### Pipeline

We use <code>Pipeline</code> to chain multiple Transformers and Estimators together to specify the **Machine Learning** workflow. A <code>Pipeline</code>’s stages are specified as an ordered array.

In [None]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(sdf)
sdf = pipelineModel.transform(sdf)
selectedCols = ['label', 'features'] + cols
sdf = sdf.select(selectedCols)
sdf.printSchema()

We now have <code>features</code> column and <code>label</code> column.

In [None]:
pdf = pd.DataFrame(sdf.take(5), columns=sdf.columns)
pdf.iloc[:,0:2] 


In [None]:
len(pdf.features[0])

### Split dataset into train and test set

Randomly split data into train and test sets, and set seed for reproducibility.

In [None]:
train, test = sdf.randomSplit([0.7, 0.3], seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

## Logistic Regression Model


In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train)

We can obtain the coefficients by using <code>LogisticRegressionModel</code>’s attributes.

In [None]:
import matplotlib.pyplot as plt
import numpy as np

beta = np.sort(lrModel.coefficients)
plt.plot(beta)
plt.ylabel('Beta Coefficients')
plt.show()

Summarize the model over the training set, we can also obtain the **ROC Receiver-Operating Characteristic)** and the **Area under ROC** (<code>areaUnderROC</code>).

In [None]:
trainingSummary = lrModel.summary
lrROC = trainingSummary.roc.toPandas()

plt.plot(lrROC['FPR'],lrROC['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()

print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))

### Precision and recall

In [None]:
pr = trainingSummary.pr.toPandas()
plt.plot(pr['recall'],pr['precision'])
plt.ylabel('Precision')
plt.xlabel('Recall')
plt.show()

### Make predictions on the test set

In [None]:
lrPreds = lrModel.transform(test)
lrPreds.select('age', 'job', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

In [None]:
lrPreds.show()

### Evaluate the Logistic Regression model

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

lrEval = BinaryClassificationEvaluator()
print('Test Area Under ROC', lrEval.evaluate(lrPreds))

## Decision Tree Classifier

**Decision trees** are widely used since they are easy to interpret, handle categorical features, extend to the multi-class classification, do not require feature scaling, and are able to capture non-linearities and feature interactions.

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(train)
dtPreds = dtModel.transform(test)
dtPreds.select('age', 'job', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

### Evaluate the Decision Tree model

One simple decision tree performed poorly because it is too weak given the range of different features. The prediction accuracy of decision trees can be improved by Ensemble methods, such as Random Forest and Gradient-Boosted Tree.

In [None]:
dtEval = BinaryClassificationEvaluator()
dtROC = dtEval.evaluate(dtPreds, {dtEval.metricName: "areaUnderROC"})
print("Test Area Under ROC: " + str(dtROC))

## Random Forest Classifier

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train)
rfPreds = rfModel.transform(test)
rfPreds.select('age', 'job', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

### Evaluate the Random Forest Classifier

In [None]:
rfEval = BinaryClassificationEvaluator()
rfROC = rfEval.evaluate(rfPreds, {rfEval.metricName: "areaUnderROC"})
print("Test Area Under ROC: " + str(rfROC))

## Gradient-Boosted Tree Classifier

In [None]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(maxIter=10)
gbtModel = gbt.fit(train)
gbtPreds = gbtModel.transform(test)
gbtPreds.select('age', 'job', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

### Evaluate the Gradient-Boosted Tree Classifier

In [None]:
gbtEval = BinaryClassificationEvaluator()
gbtROC = gbtEval.evaluate(gbtPreds, {gbtEval.metricName: "areaUnderROC"})
print("Test Area Under ROC: " + str(gbtROC))

**Gradient-Boosted Tree** achieved the best results, we will try tuning this model with the <code>ParamGridBuilder</code> and the <code>CrossValidator</code>. 

Before that we can use <code>explainParams()</code> to print a list of all params and their definitions to understand what params available for tuning.

In [None]:
print(gbt.explainParams())

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [2, 4, 6])
             .addGrid(gbt.maxBins, [20, 60])
             .addGrid(gbt.maxIter, [10, 20])
             .build())

cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid, evaluator=gbtEval, numFolds=5)

# Run cross validations.  
# This can take some minutes since it is training over 20 trees!
cvModel = cv.fit(train)
cvPreds = cvModel.transform(test)
gbtEval.evaluate(cvPreds)