# Jonathan Halverson
# Tuesday, December 27, 2016
# Wine classification in Spark 2

Here we work a standard machine learning binary classification problem with the twist that we split the three class records between the 0 and 1 class so that the classifier isn't very good and we can examine its performance. For the EDA see the appropriate notebook in the machine_learning directory.

Here is a nice notebook by Ben Sadeghi on a related topic:
http://nbviewer.jupyter.org/github/bensadeghi/pyspark-churn-prediction/blob/master/churn-prediction.ipynb

In [None]:
from __future__ import print_function
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[4]").appName("Wine classification").getOrCreate()

In [None]:
df = spark.read.csv('../../machine_learning/wine.csv', header=False, inferSchema=True)
df.sample(False, 0.1).show()

In [None]:
df.printSchema()

Class labels must begin with 0 and count up in Spark. Here we will only consider a binary classification problem so we randomly assign class 3 to the other classes -- this will lead to mistakes by the classifier which will allow for a interesting validation:

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from random import random as rng
from random import seed

seed(123456)
def randomFlip3rd(x):
    if (x == 3):
        if (rng() > 0.5):
            return 0
        else:
            return 1
    else:
        return x - 1

In [None]:
trans3rd = udf(randomFlip3rd, IntegerType())
df = df.withColumn('_c0', trans3rd(df._c0))

In [None]:
df.sample(False, 0.1).show()

In [None]:
df.groupby('_c0').count().toPandas()

We see that the two classes appear with equal proportions so stratified sampling is not required.

Note that in local mode even with the [4] only one partition is being used:

In [None]:
df.rdd.getNumPartitions()

In [None]:
df.printSchema()

Let's change the data type of _c5 and _c13 to double:

In [None]:
df = df.withColumn('_c5', df['_c5'].cast('double'))
df = df.withColumn('_c13', df['_c13'].cast('double'))
df.printSchema()

Let's give the columns more meaningful names:

In [None]:
columns = ['Class', 'Alcohol', 'Malic acid', 'Ash', 'Alcalinity of ash', 'Magnesium', 'Total phenols', \
           'Flavanoids', 'Nonflavanoid phenols', 'Proanthocyanins', 'Color intensity', 'Hue', \
           'OD280/OD315 of diluted wines', 'Proline']

In [None]:
for u, v in zip(df.schema.names, columns):
    df = df.withColumnRenamed(u, v)

In [None]:
df.printSchema()

Here is an alternative version of assigning the column names:

In [None]:
wineRaw = reduce(lambda data, i: data.withColumnRenamed(df.schema.names[i], columns[i]), xrange(len(columns)), df)
wineRaw.sample(False, 0.05).toPandas().applymap(lambda x: round(x, 1))

Here are the descriptive statistics -- of course, no standardization has been performed yet:

In [None]:
wineRaw.select(wineRaw.schema.names[1:]).toPandas().describe().applymap(lambda x: round(x, 1))

Reformat the data into a new dataframe with features as a vector:

In [None]:
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

In [None]:
wineRaw = wineRaw.select('Class', 'Ash', 'Hue', 'Alcohol', 'Flavanoids').rdd.map(lambda row: Row(label=row.Class, features=Vectors.dense(row[1:]))).toDF()
wineRaw.sample(False, 0.1).show(5)

Now that we have the correct format, a train-test split can be performed before we standardize:

In [None]:
trainUnSTD, testUnSTD = wineRaw.randomSplit([0.7, 0.3])

Let's standardize the data by making the mean and variance 0 and 1, respectively, for each column:

In [None]:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)
scalerModel = scaler.fit(trainUnSTD)
train = scalerModel.transform(trainUnSTD).cache()

In [None]:
train.printSchema()

In [None]:
train.show(5)

Let's check that the standardized features have a mean of 0 and a variance of 1:

In [None]:
train.rdd.map(lambda row: row.scaledFeatures.values.tolist()).toDF().toPandas().describe().applymap(lambda x: round(x, 1))

Now that the wine dataFrame is properly formatted, we create a ML model with cross-validation and hyperparameter optimization:

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol='scaledFeatures', labelCol='label', maxIter=10, threshold=0.5)
pipeline = Pipeline(stages=[lr])

paramGrid = ParamGridBuilder().addGrid(lr.regParam, [1.0, 0.1, 0.01]).addGrid(lr.elasticNetParam, [1.0, 0.1, 0.01]).build()
bce = BinaryClassificationEvaluator(metricName="areaUnderROC")
crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=bce, numFolds=5)
cvModel = crossval.fit(train)

Here are some details about the cross-validation procedure and the final coefficients:

In [None]:
cvModel.avgMetrics

In [None]:
cvModel.bestModel.stages[0].coefficients

In [None]:
cvModel.bestModel.stages[0].intercept

Let's evaluate the model using the test data. We begin by standardizing the test data using the previous standardizer object:

In [None]:
test = scalerModel.transform(testUnSTD).cache()
prediction = cvModel.transform(test)
prediction.sample(False, 0.2).show()

Here are the probabilites for class 0 and class 1 for each record:

In [None]:
prediction.select('probability').rdd.map(lambda row: row.probability.values.tolist()).toDF().toPandas().applymap(lambda x: round(x, 7))[:5]

The raw predition is the inner product of the feature vector and the coefficients:

In [None]:
rawPred = prediction.select('rawPrediction').rdd.map(lambda row: row.rawPrediction.values.tolist()).toDF().toPandas().applymap(lambda x: round(x, 4))[:5]
rawPred

In [None]:
import numpy as np
dp = rawPred.iloc[0, 0]
np.exp(dp) / (1 + np.exp(dp)), np.exp(-dp) / (1 + np.exp(-dp))

Let's compute two quantities for evaluation purposes:

In [None]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label")
evaluator.evaluate(prediction, {evaluator.metricName: "areaUnderROC"})

In [None]:
evaluator.evaluate(prediction, {evaluator.metricName: "areaUnderPR"})

In [None]:
predictions_and_labels = prediction.select('prediction', 'label')
predictions_and_labels = predictions_and_labels.withColumn('prediction', predictions_and_labels['prediction'].cast('integer'))
predictions_and_labels.show(48)

In [None]:
tp = predictions_and_labels.filter('prediction == 1 and label == 1').count()
tp

In [None]:
fp = predictions_and_labels.filter('prediction == 1 and label == 0').count()
fp

In [None]:
tn = predictions_and_labels.filter('prediction == 0 and label == 0').count()
tn

In [None]:
fn = predictions_and_labels.filter('prediction == 0 and label == 1').count()
fn

In [None]:
accuracy = float(tp + tn) / (tp + tn + fp + fn)
accuracy

In [None]:
# when the answer is 1, how often were you right
# or the proportion of positive cases that were correctly identified
precision = float(tp) / (tp + fn)
precision

In [None]:
# classification error
ce = float(fn + fp) / (tp + tn + fp + fn)
ce

In [None]:
# sensitivity (or recall or true positive rate): when the actual value is positive, how often is the prediction correct
# the proportion of actual positive cases which are correctly identified
recall = float(tp) / (tp + fn)
recall

In [None]:
# the proportion of actual negative cases which are correctly identified
specificity = float(tn) / (fp + tn)
specificity

In [None]:
# (false positive rate) when the actual value is negative, how often is the prediction incorrect
fpr = float(fp) / (tn + fp)
fpr