# Apache Spark
Apache Spark, once a component of the Hadoop ecosystem, is now becoming the big-data platform of choice for enterprises. It is a powerful open source engine that provides real-time stream processing, interactive processing, graph processing, in-memory processing as well as batch processing with very fast speed, ease of use and standard interface. 

# Exploring The Data
We will use the same data set when we built a Logistic Regression in Python, and it is related to direct marketing campaigns (phone calls) of a Portuguese banking institution. The classification goal is to predict whether the client will hit a specfif Advertisment (Yes/No) to a term of Click through Ratio (CTR).

# Binary Classification Problem (CRITEO_CTR)
The Pipelines API provides higher-level API built on top of DataFrames for constructing ML pipelines.

Binary Classification is the task of predicting a binary label. E.g., is an email spam or not spam? Should I show this ad to this user or not? Will it rain tomorrow or not? 

## Load Data
we will read in the criteo dataset from criteo-labe. We'll read in the data in SQL using the CSV data source for Spark and rename the columns appropriately.

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Large-Scale Dynamic Distributed Machine Learning Benchmarking').getOrCreate()
df = spark.read.csv('data/day_4_E.csv', header = True, inferSchema = True)
df.printSchema()

root
 |-- Label: integer (nullable = true)
 |-- I1: double (nullable = true)
 |-- I2: double (nullable = true)
 |-- I3: double (nullable = true)
 |-- I4: double (nullable = true)
 |-- I5: double (nullable = true)
 |-- I6: double (nullable = true)
 |-- I7: double (nullable = true)
 |-- I8: string (nullable = true)
 |-- I9: integer (nullable = true)
 |-- I10: double (nullable = true)
 |-- I11: double (nullable = true)
 |-- I12: double (nullable = true)
 |-- I13: double (nullable = true)
 |-- C1: string (nullable = true)
 |-- C2: string (nullable = true)
 |-- C3: string (nullable = true)
 |-- C4: string (nullable = true)
 |-- C5: string (nullable = true)
 |-- C6: string (nullable = true)
 |-- C7: string (nullable = true)
 |-- C8: string (nullable = true)
 |-- C9: string (nullable = true)
 |-- C10: string (nullable = true)
 |-- C11: string (nullable = true)
 |-- C12: string (nullable = true)
 |-- C13: string (nullable = true)
 |-- C14: integer (nullable = true)
 |-- C15: string (nullable =

# Dataset Review

## Input variables: 

"I1", "I2", "I3", "I4", "I5", "I6", "I7", "I8", "I9", "I10", "I11", "I12", "I13", "C1", "C2", "C3", "C4", "C5", "C6", "C7", "C8", "C9"," C10", "C11", "C12", "C13", "C14", "C15", "C16", "C17", "C18", "C19", "C20", "C21", "C22", "C23", "C24", "C25", "C26"

## Output variable: "Label" (Yes/No)

### Have a peek of the first five observations. 
### Pandas data frame is prettier than Spark DataFrame.show()

In [2]:
import pandas as pd
#pd.DataFrame(df.take(5), columns=df.columns).transpose()

In [5]:
pd.DataFrame(df.take(5), columns=df.columns)

Unnamed: 0,Label,I1,I2,I3,I4,I5,I6,I7,I8,I9,...,C18,C19,C20,C21,C22,C23,C24,C25,C26,I9_bin
0,1,1.426553,6.021023,1.386294,0.0,0.0,0.0,0.693147,2.9444389791664403,1,...,b8170bba,9512c20b,47849e55,73b3f46d,d994ba60,1,6d4ed687,337b81aa,b757e957,0
1,1,1.426553,4.691348,2.078408,0.0,0.0,0.89284,0.0,-inf,0,...,b8170bba,9512c20b,156cbe87,96fbe197,15562d5d,1,301c171b,337b81aa,ed10571d,0
2,1,1.426553,1.098612,2.078408,0.0,0.0,0.89284,0.0,0.0,0,...,b8170bba,9512c20b,156cbe87,96fbe197,15562d5d,1,0ca53ef8,d632d034,b757e957,0
3,1,1.426553,2.70805,2.078408,0.0,0.0,0.89284,0.0,2.6390573296152584,0,...,a1eb1511,9512c20b,da876e8c,b7d223df,0c2a56ba,1,5dea4c3e,73bcdfc8,b757e957,0
4,1,1.4088,7.391415,1.386294,0.0,0.0,0.0,0.0,5.043425116919247,27,...,0e45c477,9512c20b,47849e55,73b3f46d,d994ba60,1,32547d2f,321935cd,b757e957,2


### Our Classes are nearly balanced

In [3]:
df.groupby('Label').count().toPandas()

Unnamed: 0,Label,count
0,1,184086
1,0,483329


In [7]:
df.dtypes

[('Label', 'int'),
 ('I1', 'double'),
 ('I2', 'double'),
 ('I3', 'double'),
 ('I4', 'double'),
 ('I5', 'double'),
 ('I6', 'double'),
 ('I7', 'double'),
 ('I8', 'string'),
 ('I9', 'int'),
 ('I10', 'double'),
 ('I11', 'double'),
 ('I12', 'double'),
 ('I13', 'double'),
 ('C1', 'string'),
 ('C2', 'string'),
 ('C3', 'string'),
 ('C4', 'string'),
 ('C5', 'string'),
 ('C6', 'string'),
 ('C7', 'string'),
 ('C8', 'string'),
 ('C9', 'string'),
 ('C10', 'string'),
 ('C11', 'string'),
 ('C12', 'string'),
 ('C13', 'string'),
 ('C14', 'int'),
 ('C15', 'string'),
 ('C16', 'int'),
 ('C17', 'int'),
 ('C18', 'string'),
 ('C19', 'string'),
 ('C20', 'string'),
 ('C21', 'string'),
 ('C22', 'string'),
 ('C23', 'int'),
 ('C24', 'string'),
 ('C25', 'string'),
 ('C26', 'string'),
 ('I9_bin', 'int')]

### Summary statistics for numeric variables

In [8]:
numeric_features = [t[0] for t in df.dtypes if t[1] == 'int']
df.select(numeric_features).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Label,667415,0.27581939273165873,0.446926565113209,0,1
I9,667415,9.480125559059955,12.09530144381858,0,95
C14,667415,0.37489567959964937,0.48409633354005055,0,1
C16,667415,0.37489567959964937,0.48409633354005055,0,1
C17,667415,0.37489567959964937,0.48409633354005055,0,1
C23,667415,0.37489567959964937,0.48409633354005055,0,1
I9_bin,667415,0.9307926852108508,0.8477607624730499,0,2


In [9]:
numeric_features_double = [t[0] for t in df.dtypes if t[1] == 'double']
df.select(numeric_features_double).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
I1,667415,1.1841115167200893,0.35947613242977,0.5265890341390446,1.874791384027278
I2,667415,5.190329586356789,1.5539756025601303,0.6931471805599453,8.330984087895091
I3,667415,1.7736005470472096,0.7879161140024145,0.0,4.110873864173311
I4,667415,0.30590741170143343,0.3441797392202564,0.0,0.6931471805599453
I5,667415,0.2708738361273557,0.338205517522918,0.0,0.6931471805599453
I6,667415,0.45190440200812115,0.7577983778738169,0.0,3.1570004211501135
I7,667415,0.05204395330052398,0.211770267089723,0.0,1.252762968495368
I10,667415,0.19121387345043264,0.3209294007296766,0.0,1.252762968495368
I11,667415,0.2708738361273557,0.338205517522918,0.0,0.6931471805599453


In [7]:
numeric_features = [t[0] for t in df.dtypes if t[1] == 'string']
df.select(numeric_features).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
I8,667415,2.5244974824555655,2.1082299939160927,-inf,7.277593209447151
C1,667415,Infinity,,00001f65,fffdcdf8
C2,667415,Infinity,,00007246,fffd60ee
C3,667415,Infinity,,0000803f,fff0eed3
C4,667415,Infinity,,0012f703,ffe2b3ca
C5,667415,Infinity,,0001c733,ffece438
C6,667415,,,6fcd6dcb,919c68e4
C7,667415,Infinity,,000112e6,fff605eb
C8,667415,Infinity,,00982daf,ffac3596


### Correlations between independent variables.

In [6]:
numeric_data = df.select(numeric_features).toPandas()
axs = pd.plotting.scatter_matrix(numeric_data, figsize=(8, 8));
n = len(numeric_data.columns)
for i in range(n):
    v = axs[i, 0]
    v.yaxis.label.set_rotation(0)
    v.yaxis.label.set_ha('right')
    v.set_yticks(())
    h = axs[n-1, i]
    h.xaxis.label.set_rotation(90)
    h.set_xticks(())

It’s obvious that there aren’t highly correlated numeric variables. Therefore, we will keep all of them for the model. However, day and month columns are not really useful, we will remove these two columns.

In [4]:
df = df.select('Label', 'I1', 'I2', 'I3', 'I4', 'I5', 'I6', 'I7', 'I8', 'I9', 'I10', 'I11', 'I12', 'I13', 'C1', 'C2', 'C3', 'C4', 'C5', 'C6', 'C7', 'C8', 'C9', 'C10', 'C11', 'C12', 'C13', 'C14', 'C15', 'C16', 'C17', 'C18', 'C19', 'C20', 'C21', 'C22', 'C23', 'C24', 'C25', 'C26', 'I9_bin')
cols = df.columns
df.printSchema()

root
 |-- Label: integer (nullable = true)
 |-- I1: double (nullable = true)
 |-- I2: double (nullable = true)
 |-- I3: double (nullable = true)
 |-- I4: double (nullable = true)
 |-- I5: double (nullable = true)
 |-- I6: double (nullable = true)
 |-- I7: double (nullable = true)
 |-- I8: string (nullable = true)
 |-- I9: integer (nullable = true)
 |-- I10: double (nullable = true)
 |-- I11: double (nullable = true)
 |-- I12: double (nullable = true)
 |-- I13: double (nullable = true)
 |-- C1: string (nullable = true)
 |-- C2: string (nullable = true)
 |-- C3: string (nullable = true)
 |-- C4: string (nullable = true)
 |-- C5: string (nullable = true)
 |-- C6: string (nullable = true)
 |-- C7: string (nullable = true)
 |-- C8: string (nullable = true)
 |-- C9: string (nullable = true)
 |-- C10: string (nullable = true)
 |-- C11: string (nullable = true)
 |-- C12: string (nullable = true)
 |-- C13: string (nullable = true)
 |-- C14: integer (nullable = true)
 |-- C15: string (nullable =

# Preprocess Data
Since we are going to try algorithms like Logistic Regression, we will have to convert the categorical variables in the dataset into numeric variables. There are 2 ways we can do this.

#### Category Indexing

This is basically assigning a numeric value to each category from {0, 1, 2, ...numCategories-1}. This introduces an implicit ordering among your categories, and is more suitable for ordinal variables (eg: Poor: 0, Average: 1, Good: 2)

#### One-Hot Encoding

This converts categories into binary vectors with at most one nonzero value (eg: (Blue: [1, 0]), (Green: [0, 1]), (Red: [0, 0]))

In this dataset, we have ordinal variables like Label (one - Zero), and also nominal variables like categorical (C1, C11, C26, etc). For simplicity's sake, we will use One-Hot Encoding to convert all categorical variables into binary vectors. It is possible here to improve prediction accuracy by converting each categorical column with an appropriate method.

Here, we will use a combination of StringIndexer and OneHotEncoderEstimator to convert the categorical variables. The OneHotEncoderEstimator will return a SparseVector.

Since we will have more than 1 stages of feature transformations, we use a Pipeline to tie the stages together. This simplifies our code.

## Preparing Data for Machine Learning
Category Indexing, One-Hot Encoding and VectorAssembler - a feature transformer that merges multiple columns into a vector column.

In [5]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler

categoricalColumns = ['I8', 'C1', 'C2', 'C3', 'C4', 'C5', 'C6', 'C7', 'C8', 'C9', 'C10', 'C11', 'C12', 'C13', 'C15', 'C18', 'C19', 'C20', 'C21', 'C22', 'C24', 'C25', 'C26']
stages = [] # stages in our Pipeline

for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    # encoder = OneHotEncoderEstimator(inputCol=categoricalCol + "Index", outputCol=categoricalCol + "classVec")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

# We use the StringIndexer again to encode our labels to label indices.
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol = 'Label', outputCol = 'final_label')
stages += [label_stringIdx]



The above code basically indexes each categorical column using the StringIndexer, and then converts the indexed categories into one-hot encoded variables. The resulting output has the binary vectors appended to the end of each row.



Next, we will use the VectorAssembler to combine all the feature columns into a single vector column. This will include both the numeric columns and the one-hot encoded binary vector columns in our dataset.

In [6]:
# Transform all features into a vector using VectorAssembler
numericCols = ['I1', 'I2', 'I3', 'I4', 'I5', 'I6', 'I7','I9', 'I10', 'I11', 'I12', 'I13', 'C14', 'C16', 'C17', 'C23', 'I9_bin']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

We finally run our stages as a Pipeline. This puts the data through all of the feature transformations we described in a single call.

The above code are taken from databricks’ official site and it indexes each categorical column using the StringIndexer, then converts the indexed categories into one-hot encoded variables. The resulting output has the binary vectors appended to the end of each row. We use the StringIndexer again to encode our labels to label indices. Next, we use the VectorAssembler to combine all the feature columns into a single vector column.

### Pipeline

We use Pipeline to chain multiple Transformers and Estimators together to specify our machine learning workflow. A Pipeline’s stages are specified as an ordered array.

In [7]:
from pyspark.ml import Pipeline
# Create a Pipeline.
pipeline = Pipeline(stages = stages)
# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
# Keep relevant columns
selectedCols = ['final_label', 'features'] + cols
df = df.select(selectedCols)
df.printSchema()

root
 |-- final_label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- Label: integer (nullable = true)
 |-- I1: double (nullable = true)
 |-- I2: double (nullable = true)
 |-- I3: double (nullable = true)
 |-- I4: double (nullable = true)
 |-- I5: double (nullable = true)
 |-- I6: double (nullable = true)
 |-- I7: double (nullable = true)
 |-- I8: string (nullable = true)
 |-- I9: integer (nullable = true)
 |-- I10: double (nullable = true)
 |-- I11: double (nullable = true)
 |-- I12: double (nullable = true)
 |-- I13: double (nullable = true)
 |-- C1: string (nullable = true)
 |-- C2: string (nullable = true)
 |-- C3: string (nullable = true)
 |-- C4: string (nullable = true)
 |-- C5: string (nullable = true)
 |-- C6: string (nullable = true)
 |-- C7: string (nullable = true)
 |-- C8: string (nullable = true)
 |-- C9: string (nullable = true)
 |-- C10: string (nullable = true)
 |-- C11: string (nullable = true)
 |-- C12: string (nullable = true)
 |-- C13: strin

In [13]:
from py4j.java_gateway import JavaGateway, GatewayParameters
gateway = JavaGateway(gateway_parameters=GatewayParameters(port=25333))
logDebugCmd = gateway.entry_point.getLogDebugCmd()
logDebugCmd.setLog("Hi").execute()

Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:25333)

In [15]:
pd.DataFrame(df.take(5), columns=df.columns).transpose()

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:44023)
Traceback (most recent call last):
  File "/home/javeed/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 852, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/javeed/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 990, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:44023)

As you can see, we now have features column and label column.

Randomly split data into train and test sets, and set seed for reproducibility.

In [12]:
from py4j.java_gateway import JavaGateway
gateway = JavaGateway()                   # connect to the JVM
random = gateway.jvm.java.util.Random()

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:25333)
Traceback (most recent call last):
  File "/home/javeed/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 852, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/javeed/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 990, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:25333)

In [9]:
train, test = df.randomSplit([0.7, 0.3], seed = 2017)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:38793)
Traceback (most recent call last):
  File "/home/javeed/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 852, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/javeed/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 990, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:38793)

# Fit and Evaluate Models
We are now ready to try out some of the Binary Classification algorithms available in the Pipelines API.

Out of these algorithms, the below are also capable of supporting multiclass classification with the Python API:

#### Decision Tree Classifier
#### Random Forest Classifier
These are the general steps we will take to build our models:

#### Create initial model using the training set
#### Tune parameters with a ParamGrid and 5-fold Cross Validation
#### Evaluate the best model obtained from the Cross Validation using the test set
We use the BinaryClassificationEvaluator to evaluate our models, which uses areaUnderROC as the default metric.

# Logistic Regression Model

In [None]:
from pyspark.ml.classification import LogisticRegression
# Create initial LogisticRegression model
lr = LogisticRegression(featuresCol = 'features', labelCol = 'Label', maxIter=10)
# Train model with Training Data
lrModel = lr.fit(train)

### We can obtain the coefficients by using LogisticRegressionModel’s attributes.

In [None]:
import matplotlib.pyplot as plt
import numpy as np

beta = np.sort(lrModel.coefficients)

plt.plot(beta)
plt.ylabel('Beta Coefficients')
plt.show()

Summarize the model over the training set, we can also obtain the 
#### receiver-operating characteristic and areaUnderROC.

In [None]:
trainingSummary = lrModel.summary

roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()

print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))

# Precision and recall.

In [None]:
pr = trainingSummary.pr.toPandas()
plt.plot(pr['recall'],pr['precision'])
plt.ylabel('Precision')
plt.xlabel('Recall')
plt.show()

Set the model threshold to maximize F-Measure

In [None]:
f = trainingSummary.fMeasureByThreshold.toPandas()
plt.plot(f['threshold'],f['F-Measure'])
plt.ylabel('F-Measure')
plt.xlabel('Threshold')
plt.show()

In [None]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(test)
predictions.select('age', 'job', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

## Evaluate our Logistic Regression model

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions))

In [None]:
evaluator.getMetricName()

Pretty good.

Try tuning the model with the ParamGridBuilder and the CrossValidator.