## PySpark Classification
#### Reference Libraries

1. [StringIndexer](#stringindexer)
2. [OneHotEncoder](#onehotencoder)
3. [VectorAssembler](#vectorassembler)
4. [Pipeline](#pipeline)
5. [BinaryClassificationEvaluator & MulticlassClassificationEvaluator](#classificationevaluator)
6. [ParamGrid & CrossValidator](#paramgrid&crossvalidator)
7. [StandardScaler](#stdscaler)

Go to [Main Scripts](#maincode)

<a id="stringindexer"></a>
#### 1. StringIndexer
Encode a **string column of labels** to a column of label indices. 
- The indices are in [0, numLabels), ordered by label frequencies, so the most frequent label gets index 0. 
- If the input column is numeric, we cast it to string and index the string values.
- There are two strategies to handle unseen labels when you have fit on one dataset and use it to transform another:
  - throw an exception (which is the default)
  - skip the row containing the unseen label entirely -> setHandleInvalid("skip")

<a id="onehotencoder"></a>
#### 2. OneHotEncoder
Map a column of label indices to a column of binary vectors, with at most a single one-value. 
- **Input column must be Numeric** (use StringIndexer prior to OneHotEncoder if the categorical is labelled as string)
- This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features.
- Return as a sparse vector SparseVector(vector_size, [nonzero_indices], [nonzero_values]}) e.g. SparseVector(3, [1, 2], [1.0, 1.0]) => [1.0 0.0 1.0]

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

df = spark.createDataFrame([
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "a"),
    (4, "a"),
    (5, "c"),
    (6, "d")
], ["id", "category"])

stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)

encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.show()

<a id="vectorassembler"></a>
#### 3. VectorAssembler
Combine a list of columns into a single vector column. It is useful to combine raw features and features generated by different transformers into a single feature vector, to train ML models (e.g. logistic regression and decision trees)
- In each row, the values of the input columns will be concatenated into a vector (in the specified order)
- Accepts input types: **numeric, boolean, vector**
- only "fit", no "transform"

In [None]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["categoryIndex", "categoryVec"], outputCol="features")
output = assembler.transform(encoded)
output.show()

<a id="pipeline"></a>
#### 4. Pipeline
Tie multiple stages of ML tasks together (e.g. feature transformations) in order to simplify the code

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

df = spark.createDataFrame([
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "a"),
    (4, "a"),
    (5, "c"),
    (6, "d")
], ["id", "category"])

stages = [] # stages in our Pipeline
stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
assembler = VectorAssembler(inputCols=["categoryIndex", "categoryVec"], outputCol="features")
stages += [stringIndexer, encoder, assembler]
pipeline = Pipeline(stages=stages) # create a Pipeline

pipelineModel = pipeline.fit(df)
encoded = pipelineModel.transform(df)
encoded.show()

<a id="classificationevaluator"></a>
#### 5. BinaryClassificationEvaluator & MultiClassificationEvaluator
Evaluator for binary classification, which expects two input columns: **rawPrediction and label** 
- Column "rawPrediction" can be double (binary 0/1 prediction, or probability of label 1) or of type vector (length-2 vector of raw predictions, scores, or label probabilities).

<a id="paramgrid&crossvalidator"></a>
#### 6. ParamGrid & CrossValidator


In [None]:
from pyspark.mllib.evaluation import MultilabelMetrics
predictionAndLabels = sc.parallelize([([0.0, 1.0], [0.0, 2.0]), ([0.0, 2.0], [0.0, 1.0]), 
                                      ([], [0.0]), ([2.0], [2.0]), ([2.0, 0.0], [2.0, 0.0]), 
                                      ([0.0, 1.0, 2.0], [0.0, 1.0]), ([1.0], [1.0, 2.0])])

print predictionAndLabels.take(2)
print type(predictionAndLabels)
metrics = MultilabelMetrics(predictionAndLabels)

print metrics.precision(0.0)
print metrics.recall(1.0)
print metrics.f1Measure(2.0)
print metrics.precision()
print metrics.recall()
print metrics.f1Measure()
print metrics.microPrecision
print metrics.microRecall
print metrics.microF1Measure
print metrics.hammingLoss
print metrics.subsetAccuracy
print metrics.accuracy

In [None]:
from pyspark.ml.feature import StandardScaler, MinMaxScaler 
from pyspark.ml.linalg import Vectors
# from pyspark.sql.functions import col, round

df = spark.createDataFrame([(Vectors.dense([-1.0, 2.5, 0.0]),),\
                            (Vectors.dense([2.0, 0.0, 1.5]),),\
                            (Vectors.dense([3.0, 1.0, -0.5]),)],\
                           ['features'])

scaler_minmax = MinMaxScaler(inputCol='features', outputCol='scaledMinmax_feature')
scalerModel = scaler_minmax.fit(df)
scaledData = scalerModel.transform(df)
scaledData.show()

scaler_std = StandardScaler(inputCol='features', outputCol="scaledStd_feature", withStd=True, withMean=False)
scalerModel = scaler_std.fit(df)
scaledData = scalerModel.transform(df)
scaledData.show()

# spark.createDataFrame([(2.546,)], ['a']).select(round('a', 1).alias('r')).collect()
# scaledData.select(round('scaledStd_feature', 1).alias('r')).collect()
# scaledData.rdd.map(lambda x: map(lambda y: round(y,4), x))

<a id="maincode"></a>
#### Main Scripts

In [4]:
import pandas as pd
import numpy as np

from pyspark.sql import Row

from sklearn import datasets
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler ###One-Hot Encoding

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import RandomForest

In [5]:
iris = datasets.load_iris()
X = iris.data
Y = iris.target

iris_rdd = sc.parallelize(np.c_[X,Y]).map(lambda x: Row(sepal_length=float(x[0]),\
                                                        sepal_width=float(x[1]),\
                                                        petal_length=float(x[2]),\
                                                        petal_width=float(x[3]),\
                                                        target=str(x[4])))
iris_rddDF = spark.createDataFrame(iris_rdd)
cols = iris_rddDF.columns
print 'Columns:', cols
print ''
iris_rddDF.show(3)

Columns: ['petal_length', 'petal_width', 'sepal_length', 'sepal_width', 'target']

+------------+-----------+------------+-----------+------+
|petal_length|petal_width|sepal_length|sepal_width|target|
+------------+-----------+------------+-----------+------+
|         1.4|        0.2|         5.1|        3.5|   0.0|
|         1.4|        0.2|         4.9|        3.0|   0.0|
|         1.3|        0.2|         4.7|        3.2|   0.0|
+------------+-----------+------------+-----------+------+
only showing top 3 rows



In [7]:
###One-Hot Encoding
categoricalColumns = []
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
  # Category Indexing with StringIndexer
  stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
  # Use OneHotEncoder to convert categorical variables into binary SparseVectors
  encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec")
  # Add stages.  These are not run here, but will run all at once later on.
  stages += [stringIndexer, encoder]

In [8]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
stages += [label_stringIdx]

In [9]:
# Transform all features into a vector using VectorAssembler
numericCols = ["sepal_length", "sepal_width", "petal_length", "petal_width"]
assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [10]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
pipelineModel = pipeline.fit(iris_rddDF)
iris_rddDF = pipelineModel.transform(iris_rddDF)

# Keep relevant columns
selectedcols = ["label", "features"] + cols
iris_rddDF = iris_rddDF.select(selectedcols)
iris_rddDF.show()

+-----+-----------------+------------+-----------+------------+-----------+------+
|label|         features|petal_length|petal_width|sepal_length|sepal_width|target|
+-----+-----------------+------------+-----------+------------+-----------+------+
|  0.0|[5.1,3.5,1.4,0.2]|         1.4|        0.2|         5.1|        3.5|   0.0|
|  0.0|[4.9,3.0,1.4,0.2]|         1.4|        0.2|         4.9|        3.0|   0.0|
|  0.0|[4.7,3.2,1.3,0.2]|         1.3|        0.2|         4.7|        3.2|   0.0|
|  0.0|[4.6,3.1,1.5,0.2]|         1.5|        0.2|         4.6|        3.1|   0.0|
|  0.0|[5.0,3.6,1.4,0.2]|         1.4|        0.2|         5.0|        3.6|   0.0|
|  0.0|[5.4,3.9,1.7,0.4]|         1.7|        0.4|         5.4|        3.9|   0.0|
|  0.0|[4.6,3.4,1.4,0.3]|         1.4|        0.3|         4.6|        3.4|   0.0|
|  0.0|[5.0,3.4,1.5,0.2]|         1.5|        0.2|         5.0|        3.4|   0.0|
|  0.0|[4.4,2.9,1.4,0.2]|         1.4|        0.2|         4.4|        2.9|   0.0|
|  0

In [None]:
print type(iris_rddDF)

In [None]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, validateData, testData) = iris_rddDF.randomSplit([0.5, 0.1, 0.4], seed=100)
print trainingData.count()
print validateData.count()
print testData.count()

In [None]:
from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=2)

# Train model with Training Data
rfModel = rf.fit(trainingData)

In [None]:
# Make predictions on test data using the Transformer.transform() method.
predictions = rfModel.transform(testData)

In [None]:
predictions.printSchema()

#"rawPrediction": Vector of length classes, with the counts of training instance labels at the tree node which makes the prediction
#"probability": Vector of length # classes equal to rawPrediction normalized to a multinomial distribution

In [None]:
selected = predictions.select("label", "prediction", "probability", "rawPrediction", "features")
selected.show()

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Evaluate model
evaluator = MulticlassClassificationEvaluator()
print evaluator.evaluate(predictions)

from pyspark.mllib.evaluation import MultilabelMetrics
result = predictions.select(['prediction','label']).rdd
# print result.map(lambda x: (x.prediction, x.label)).take(5)
metrics = MultilabelMetrics(result.map(lambda x: ([x.prediction], [x.label])))

# Summary stats
print("Recall = %s" % metrics.recall())
print("Precision = %s" % metrics.precision())
print("F1 measure = %s" % metrics.f1Measure())
print("Accuracy = %s" % metrics.accuracy)

In [None]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())
# 3 values for maxDepth, 2 values for maxBin, and 2 values for numTrees. 
# This grid will have 3 x 2 x 2 = 12 parameter settings for CrossValidator to choose from. 

In [None]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations.  This can take about 6 minutes since it is training over 20 trees!
cvModel = cv.fit(trainingData)

In [None]:
# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(validateData)

In [None]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [None]:
# View Best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "rawPrediction", "features")
selected.show()

In [None]:
# View Best model's parameter settings
bestModel = cvModel.bestModel
print 'numTrees:', bestModel.getNumTrees
print 'Tree Weights:', bestModel.treeWeights
print 'Feature Importances:', bestModel.featureImportances
print 'Total numNodes:', bestModel.totalNumNodes
print bestModel.params
# print 'Trees:', bestModel.trees
# print 'Description:', bestModel.toDebugString

In [None]:
# Generate predictions for entire dataset
finalPredictions = bestModel.transform(testData)

In [None]:
# Evaluate best model
evaluator.evaluate(finalPredictions)

In [None]:
test = pd.concat([finalPredictions.select('prediction').toPandas(), finalPredictions.select('label').toPandas()], axis=1)
any(test['prediction'] - test ['label'])

### Others

In [None]:
test1 = sc.parallelize([(0.0, 1.0, 3.0), (0.0, 1.0, 4.0), (0.0, 2.0, 5.0)])
print test1.take(1)
print test1.map(lambda x: ((x[0], x[1]), x[2])).distinct().take(10)

In [3]:
from pyspark.mllib.stat import Statistics
import numpy as np
data = sc.parallelize(
    [np.array([1.0, np.nan]), np.array([2.0, 20.0]), np.array([5.0, 33.0]), np.array([5.0, 33.0])]
)  # an RDD of Vectors

# calculate the correlation matrix using Pearson's method. Use "spearman" for Spearman's method.
# If a method is not specified, Pearson's method will be used by default.
print(Statistics.corr(data, method="pearson"))

[[  1.  nan]
 [ nan   1.]]
