# Binary Classification on Spark

This is just a demonstration of PySpark. See [official document](https://spark.apache.org/docs/latest/) for detailed explanation.

## With scikit-learn

First we look at a process with scikit-learn. We will execute a similar process with Spark.

In [1]:
import numpy as np
import pandas as pd

In [2]:
from sklearn.datasets import load_breast_cancer

bcancer = load_breast_cancer()
df = pd.DataFrame(bcancer.data, columns=bcancer.feature_names)
df['label'] = [bcancer.target_names[x] for x in bcancer.target]

In [3]:
df.head()

Unnamed: 0,mean radius,mean texture,mean perimeter,mean area,mean smoothness,mean compactness,mean concavity,mean concave points,mean symmetry,mean fractal dimension,...,worst texture,worst perimeter,worst area,worst smoothness,worst compactness,worst concavity,worst concave points,worst symmetry,worst fractal dimension,label
0,17.99,10.38,122.8,1001.0,0.1184,0.2776,0.3001,0.1471,0.2419,0.07871,...,17.33,184.6,2019.0,0.1622,0.6656,0.7119,0.2654,0.4601,0.1189,malignant
1,20.57,17.77,132.9,1326.0,0.08474,0.07864,0.0869,0.07017,0.1812,0.05667,...,23.41,158.8,1956.0,0.1238,0.1866,0.2416,0.186,0.275,0.08902,malignant
2,19.69,21.25,130.0,1203.0,0.1096,0.1599,0.1974,0.1279,0.2069,0.05999,...,25.53,152.5,1709.0,0.1444,0.4245,0.4504,0.243,0.3613,0.08758,malignant
3,11.42,20.38,77.58,386.1,0.1425,0.2839,0.2414,0.1052,0.2597,0.09744,...,26.5,98.87,567.7,0.2098,0.8663,0.6869,0.2575,0.6638,0.173,malignant
4,20.29,14.34,135.1,1297.0,0.1003,0.1328,0.198,0.1043,0.1809,0.05883,...,16.67,152.2,1575.0,0.1374,0.205,0.4,0.1625,0.2364,0.07678,malignant


In [4]:
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler(with_mean=True, with_std=True)
scaler.fit(df.drop('label',axis=1));

In [5]:
dg = pd.DataFrame(scaler.transform(df.drop('label',axis=1)), columns=df.columns[:-1])
dg['label'] = df['label']
dg.tail()

Unnamed: 0,mean radius,mean texture,mean perimeter,mean area,mean smoothness,mean compactness,mean concavity,mean concave points,mean symmetry,mean fractal dimension,...,worst texture,worst perimeter,worst area,worst smoothness,worst compactness,worst concavity,worst concave points,worst symmetry,worst fractal dimension,label
564,2.110995,0.721473,2.060786,2.343856,1.041842,0.21906,1.947285,2.320965,-0.312589,-0.931027,...,0.1177,1.752563,2.015301,0.378365,-0.273318,0.664512,1.629151,-1.360158,-0.709091,malignant
565,1.704854,2.085134,1.615931,1.723842,0.102458,-0.017833,0.693043,1.263669,-0.217664,-1.058611,...,2.047399,1.42194,1.494959,-0.69123,-0.39482,0.236573,0.733827,-0.531855,-0.973978,malignant
566,0.702284,2.045574,0.672676,0.577953,-0.840484,-0.03868,0.046588,0.105777,-0.809117,-0.895587,...,1.374854,0.579001,0.427906,-0.809587,0.350735,0.326767,0.414069,-1.104549,-0.318409,malignant
567,1.838341,2.336457,1.982524,1.735218,1.525767,3.272144,3.296944,2.658866,2.137194,1.043695,...,2.237926,2.303601,1.653171,1.430427,3.904848,3.197605,2.289985,1.919083,2.219635,malignant
568,-1.808401,1.221792,-1.814389,-1.347789,-3.112085,-1.150752,-1.114873,-1.26182,-0.82007,-0.561032,...,0.76419,-1.432735,-1.075813,-1.859019,-1.207552,-1.305831,-1.745063,-0.048138,-0.751207,benign


In [6]:
from sklearn.model_selection import GridSearchCV

X = dg.drop('label',axis=1)
y = dg.label

In [7]:
from sklearn.linear_model import LogisticRegression
param_plr = {'C': [0.01,0.1,1,10], 'penalty': ['l1','l2']}
grid_plr = GridSearchCV(LogisticRegression(), param_plr, cv=5, scoring='accuracy')
grid_plr.fit(X,y);

In [8]:
pd.DataFrame(grid_plr.cv_results_)[['params','mean_test_score']]

Unnamed: 0,params,mean_test_score
0,"{'penalty': 'l1', 'C': 0.01}",0.924429
1,"{'penalty': 'l2', 'C': 0.01}",0.964851
2,"{'penalty': 'l1', 'C': 0.1}",0.973638
3,"{'penalty': 'l2', 'C': 0.1}",0.982425
4,"{'penalty': 'l1', 'C': 1}",0.975395
5,"{'penalty': 'l2', 'C': 1}",0.97891
6,"{'penalty': 'l1', 'C': 10}",0.963093
7,"{'penalty': 'l2', 'C': 10}",0.970123


In [9]:
from sklearn.tree import DecisionTreeClassifier
param_tree = {'max_depth': [2,3,4,5]}
grid_tree = GridSearchCV(DecisionTreeClassifier(random_state=3), param_tree, 
                         cv=5, scoring='accuracy')
grid_tree.fit(X,y);

In [10]:
pd.DataFrame(grid_tree.cv_results_)[['params','mean_test_score']]

Unnamed: 0,params,mean_test_score
0,{'max_depth': 2},0.927944
1,{'max_depth': 3},0.924429
2,{'max_depth': 4},0.927944
3,{'max_depth': 5},0.924429


In [11]:
df.reset_index().to_csv('/tmp/bcancer.csv', index=False)

## PySpark

In [12]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
                    .appName("Binary classification on Spark")\
                    .config("spark.some.config.option", "some-value")\
                    .getOrCreate()

In [13]:
## load the data
df = spark.read.csv('/tmp/bcancer.csv', header=True)
print(df.columns)

['index', 'mean radius', 'mean texture', 'mean perimeter', 'mean area', 'mean smoothness', 'mean compactness', 'mean concavity', 'mean concave points', 'mean symmetry', 'mean fractal dimension', 'radius error', 'texture error', 'perimeter error', 'area error', 'smoothness error', 'compactness error', 'concavity error', 'concave points error', 'symmetry error', 'fractal dimension error', 'worst radius', 'worst texture', 'worst perimeter', 'worst area', 'worst smoothness', 'worst compactness', 'worst concavity', 'worst concave points', 'worst symmetry', 'worst fractal dimension', 'label']


In [14]:
df.printSchema() ## show the schema (data types of columns)

root
 |-- index: string (nullable = true)
 |-- mean radius: string (nullable = true)
 |-- mean texture: string (nullable = true)
 |-- mean perimeter: string (nullable = true)
 |-- mean area: string (nullable = true)
 |-- mean smoothness: string (nullable = true)
 |-- mean compactness: string (nullable = true)
 |-- mean concavity: string (nullable = true)
 |-- mean concave points: string (nullable = true)
 |-- mean symmetry: string (nullable = true)
 |-- mean fractal dimension: string (nullable = true)
 |-- radius error: string (nullable = true)
 |-- texture error: string (nullable = true)
 |-- perimeter error: string (nullable = true)
 |-- area error: string (nullable = true)
 |-- smoothness error: string (nullable = true)
 |-- compactness error: string (nullable = true)
 |-- concavity error: string (nullable = true)
 |-- concave points error: string (nullable = true)
 |-- symmetry error: string (nullable = true)
 |-- fractal dimension error: string (nullable = true)
 |-- worst radius:

In [15]:
for col in df.columns[:-1]:
    df = df.withColumn(col, df[col].cast('float')) ## convert the data types

In [16]:
df.printSchema()

root
 |-- index: float (nullable = true)
 |-- mean radius: float (nullable = true)
 |-- mean texture: float (nullable = true)
 |-- mean perimeter: float (nullable = true)
 |-- mean area: float (nullable = true)
 |-- mean smoothness: float (nullable = true)
 |-- mean compactness: float (nullable = true)
 |-- mean concavity: float (nullable = true)
 |-- mean concave points: float (nullable = true)
 |-- mean symmetry: float (nullable = true)
 |-- mean fractal dimension: float (nullable = true)
 |-- radius error: float (nullable = true)
 |-- texture error: float (nullable = true)
 |-- perimeter error: float (nullable = true)
 |-- area error: float (nullable = true)
 |-- smoothness error: float (nullable = true)
 |-- compactness error: float (nullable = true)
 |-- concavity error: float (nullable = true)
 |-- concave points error: float (nullable = true)
 |-- symmetry error: float (nullable = true)
 |-- fractal dimension error: float (nullable = true)
 |-- worst radius: float (nullable = tr

In [17]:
print(df.columns[1:-1]) ## feature variables

['mean radius', 'mean texture', 'mean perimeter', 'mean area', 'mean smoothness', 'mean compactness', 'mean concavity', 'mean concave points', 'mean symmetry', 'mean fractal dimension', 'radius error', 'texture error', 'perimeter error', 'area error', 'smoothness error', 'compactness error', 'concavity error', 'concave points error', 'symmetry error', 'fractal dimension error', 'worst radius', 'worst texture', 'worst perimeter', 'worst area', 'worst smoothness', 'worst compactness', 'worst concavity', 'worst concave points', 'worst symmetry', 'worst fractal dimension']


In [18]:
from pyspark.ml.feature import StringIndexer
## convert the target variable into indexes
indexer = StringIndexer(inputCol='label', outputCol='target')

## check if this works
output0 = indexer.fit(df).transform(df)
output0.select(['index','label','target']).show()

+-----+---------+------+
|index|    label|target|
+-----+---------+------+
|  0.0|malignant|   1.0|
|  1.0|malignant|   1.0|
|  2.0|malignant|   1.0|
|  3.0|malignant|   1.0|
|  4.0|malignant|   1.0|
|  5.0|malignant|   1.0|
|  6.0|malignant|   1.0|
|  7.0|malignant|   1.0|
|  8.0|malignant|   1.0|
|  9.0|malignant|   1.0|
| 10.0|malignant|   1.0|
| 11.0|malignant|   1.0|
| 12.0|malignant|   1.0|
| 13.0|malignant|   1.0|
| 14.0|malignant|   1.0|
| 15.0|malignant|   1.0|
| 16.0|malignant|   1.0|
| 17.0|malignant|   1.0|
| 18.0|malignant|   1.0|
| 19.0|   benign|   0.0|
+-----+---------+------+
only showing top 20 rows



In [19]:
from pyspark.ml.feature import VectorAssembler
## collect several columns and make a column of vectors
assembler = VectorAssembler(inputCols=df.columns[1:-1], outputCol='features')

## check if this works
output1 = assembler.transform(output0)
output1.select(['features','target']).show(2, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+
|features                                                                                                                                                                                                                                                                                                                                                                                                                                    

In [20]:
from pyspark.ml.feature import StandardScaler
## convert the values of feature variables into so-called z-score
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=True)

## check if this works
output2 = scaler.fit(output1).transform(output1)
output2.select(['index','scaledFeatures','target']).show(5)

+-----+--------------------+------+
|index|      scaledFeatures|target|
+-----+--------------------+------+
|  0.0|[1.09609946770103...|   1.0|
|  1.0|[1.82821189310576...|   1.0|
|  2.0|[1.57849935861176...|   1.0|
|  3.0|[-0.7682333050599...|   1.0|
|  4.0|[1.74875817547660...|   1.0|
+-----+--------------------+------+
only showing top 5 rows



In [21]:
from pyspark.ml.classification import LogisticRegression
## penalised logistic regression 
lr = LogisticRegression(maxIter=10, regParam=0.1, elasticNetParam=0.1,
                        featuresCol='scaledFeatures', labelCol='target')

## training 
output3 = lr.fit(output2).transform(output2)
output3.select(['index','label','target','rawPrediction','probability','prediction']).show()

+-----+---------+------+--------------------+--------------------+----------+
|index|    label|target|       rawPrediction|         probability|prediction|
+-----+---------+------+--------------------+--------------------+----------+
|  0.0|malignant|   1.0|[-6.0735711701612...|[0.00229764301676...|       1.0|
|  1.0|malignant|   1.0|[-2.5681949401107...|[0.07121360239452...|       1.0|
|  2.0|malignant|   1.0|[-4.6129242295229...|[0.00982526566767...|       1.0|
|  3.0|malignant|   1.0|[-3.1390600606333...|[0.04152451298545...|       1.0|
|  4.0|malignant|   1.0|[-2.6198289474893...|[0.06787311464596...|       1.0|
|  5.0|malignant|   1.0|[-0.6017401354838...|[0.35394567918113...|       1.0|
|  6.0|malignant|   1.0|[-2.5739435099823...|[0.07083431514422...|       1.0|
|  7.0|malignant|   1.0|[-0.6894290207694...|[0.33416010245534...|       1.0|
|  8.0|malignant|   1.0|[-1.7618308092031...|[0.14656119212615...|       1.0|
|  9.0|malignant|   1.0|[-3.1709857963004...|[0.04027229669087..

In [22]:
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [23]:
## create a pipeline
indexer = StringIndexer(inputCol='label', outputCol='target')
assembler = VectorAssembler(inputCols=df.columns[1:-1], outputCol='features')
scaler = StandardScaler(inputCol=assembler.getOutputCol(), outputCol="scaledFeatures",
                        withStd=True, withMean=True)
plr = LogisticRegression(featuresCol=scaler.getOutputCol(), labelCol='target')

pipeline_plr = Pipeline(stages=[indexer, assembler, scaler, plr])

In [24]:
## parameter grid
param_plr = ParamGridBuilder().addGrid(plr.regParam, [0.1,1,10])\
                              .addGrid(plr.elasticNetParam, [0, 0.5, 1])\
                              .build() ## list of dicts

In [25]:
param_plr[0] ## element contains an explanation for a parameter

{Param(parent='LogisticRegression_452f9d9f53cc0684355e', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0,
 Param(parent='LogisticRegression_452f9d9f53cc0684355e', name='regParam', doc='regularization parameter (>= 0).'): 0.1}

In [26]:
## create an Estimator executing cross-validation
cv_plr = CrossValidator(estimator=pipeline_plr,
                        estimatorParamMaps=param_plr,
                        evaluator=BinaryClassificationEvaluator(labelCol='target'),
                        numFolds=3)

In [27]:
model_plr = cv_plr.fit(df) ## by fitting it, we obtain a Transformer

In [28]:
model_plr.avgMetrics ## the CV scores of each parameter

[0.9950977405808112,
 0.9906988772985421,
 0.9851063433903431,
 0.9924913853725812,
 0.5,
 0.5,
 0.9868870445310425,
 0.5,
 0.5]

In [29]:
from collections import defaultdict

def validation_result(grid,metrics):
    df = defaultdict(list)

    for param in grid:
        for param_obj, param_val in param.items():
            df[param_obj.name].append(param_val)

    df = pd.DataFrame(df)
    df['score'] = metrics
    return df

In [30]:
validation_result(param_plr, model_plr.avgMetrics) ## show the results of the cross-validation

Unnamed: 0,elasticNetParam,regParam,score
0,0.0,0.1,0.995098
1,0.0,1.0,0.990699
2,0.0,10.0,0.985106
3,0.5,0.1,0.992491
4,0.5,1.0,0.5
5,0.5,10.0,0.5
6,1.0,0.1,0.986887
7,1.0,1.0,0.5
8,1.0,10.0,0.5


In [31]:
### decision tree
from pyspark.ml.classification import DecisionTreeClassifier
tree = DecisionTreeClassifier(featuresCol=scaler.getOutputCol(), labelCol='target')
param_tree = ParamGridBuilder().addGrid(tree.maxDepth, [2,3,5,7,11]).build()

pipeline_tree = Pipeline(stages=[indexer, assembler, scaler, tree])

cv_tree = CrossValidator(estimator=pipeline_tree,
                              estimatorParamMaps=param_tree,
                              evaluator=BinaryClassificationEvaluator(labelCol='target'),
                              numFolds=3)

In [32]:
model_tree = cv_tree.fit(df)

In [33]:
validation_result(param_tree, model_tree.avgMetrics)

Unnamed: 0,maxDepth,score
0,2,0.923427
1,3,0.95768
2,5,0.938555
3,7,0.942534
4,11,0.942534
