# Spark ML package
* This is an introduction to machine learning library in spark. 
* There are two implementations in spark. note the differences below.
    - Spark Mllib
        - spark.mllib contains the legacy API built on top of RDDs.
    - Spark ML
        - spark.ml provides higher-level API built on top of DataFrames for constructing ML pipelines.
        - very similar to sklearn

In [3]:
# The code was removed by Watson Studio for sharing.

+---+---+---+--------------------+------------+
|  x|  y|  z|              source|       class|
+---+---+---+--------------------+------------+
| 30| 36| 52|Accelerometer-201...|Climb_stairs|
| 30| 36| 32|Accelerometer-201...|Climb_stairs|
| 32| 30| 36|Accelerometer-201...|Climb_stairs|
| 32| 30| 36|Accelerometer-201...|Climb_stairs|
| 32| 30| 36|Accelerometer-201...|Climb_stairs|
| 32| 33| 36|Accelerometer-201...|Climb_stairs|
| 32| 30| 36|Accelerometer-201...|Climb_stairs|
| 30| 37| 50|Accelerometer-201...|Climb_stairs|
| 31| 37| 50|Accelerometer-201...|Climb_stairs|
| 29| 38| 50|Accelerometer-201...|Climb_stairs|
| 32| 39| 48|Accelerometer-201...|Climb_stairs|
| 32| 41| 46|Accelerometer-201...|Climb_stairs|
| 34| 39| 44|Accelerometer-201...|Climb_stairs|
| 34| 39| 41|Accelerometer-201...|Climb_stairs|
| 31| 39| 42|Accelerometer-201...|Climb_stairs|
| 32| 42| 43|Accelerometer-201...|Climb_stairs|
| 31| 43| 43|Accelerometer-201...|Climb_stairs|
| 28| 46| 44|Accelerometer-201...|Climb_

## pyspark.ml.feature
* StringIndexer
    -  maps a string column of labels to an ML column of label indices.If the input column is numeric, we cast it to string and index the string values. The indices are in [0, numLabels). By default, this is ordered by label frequencies so the most frequent label gets index 0.
    
* OneHotEncoder
    - maps a column of category indices to a column of binary vectors. The last category is not included by default (configurable via dropLast) to avoid colinearity.
    
* VectorAssembler
    - merges multiple columns into a vector column.
    
* Normalizer
    - Normalize a vector to have unit norm using the given p-norm.

### StringIndexer

In [4]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="class", outputCol="classIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()
indexed.select('classIndex').distinct().show()

+---+---+---+--------------------+------------+----------+
|  x|  y|  z|              source|       class|classIndex|
+---+---+---+--------------------+------------+----------+
| 30| 36| 52|Accelerometer-201...|Climb_stairs|       3.0|
| 30| 36| 32|Accelerometer-201...|Climb_stairs|       3.0|
| 32| 30| 36|Accelerometer-201...|Climb_stairs|       3.0|
| 32| 30| 36|Accelerometer-201...|Climb_stairs|       3.0|
| 32| 30| 36|Accelerometer-201...|Climb_stairs|       3.0|
| 32| 33| 36|Accelerometer-201...|Climb_stairs|       3.0|
| 32| 30| 36|Accelerometer-201...|Climb_stairs|       3.0|
| 30| 37| 50|Accelerometer-201...|Climb_stairs|       3.0|
| 31| 37| 50|Accelerometer-201...|Climb_stairs|       3.0|
| 29| 38| 50|Accelerometer-201...|Climb_stairs|       3.0|
| 32| 39| 48|Accelerometer-201...|Climb_stairs|       3.0|
| 32| 41| 46|Accelerometer-201...|Climb_stairs|       3.0|
| 34| 39| 44|Accelerometer-201...|Climb_stairs|       3.0|
| 34| 39| 41|Accelerometer-201...|Climb_stairs|       3.

### OneHotEncoder

In [5]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCol="classIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.show()

+---+---+---+--------------------+------------+----------+--------------+
|  x|  y|  z|              source|       class|classIndex|   categoryVec|
+---+---+---+--------------------+------------+----------+--------------+
| 30| 36| 52|Accelerometer-201...|Climb_stairs|       3.0|(12,[3],[1.0])|
| 30| 36| 32|Accelerometer-201...|Climb_stairs|       3.0|(12,[3],[1.0])|
| 32| 30| 36|Accelerometer-201...|Climb_stairs|       3.0|(12,[3],[1.0])|
| 32| 30| 36|Accelerometer-201...|Climb_stairs|       3.0|(12,[3],[1.0])|
| 32| 30| 36|Accelerometer-201...|Climb_stairs|       3.0|(12,[3],[1.0])|
| 32| 33| 36|Accelerometer-201...|Climb_stairs|       3.0|(12,[3],[1.0])|
| 32| 30| 36|Accelerometer-201...|Climb_stairs|       3.0|(12,[3],[1.0])|
| 30| 37| 50|Accelerometer-201...|Climb_stairs|       3.0|(12,[3],[1.0])|
| 31| 37| 50|Accelerometer-201...|Climb_stairs|       3.0|(12,[3],[1.0])|
| 29| 38| 50|Accelerometer-201...|Climb_stairs|       3.0|(12,[3],[1.0])|
| 32| 39| 48|Accelerometer-201...|Clim

In [7]:
encoded.select('classIndex','categoryVec').distinct().show()   

+----------+---------------+
|classIndex|    categoryVec|
+----------+---------------+
|       8.0| (12,[8],[1.0])|
|       4.0| (12,[4],[1.0])|
|      12.0|     (12,[],[])|
|       7.0| (12,[7],[1.0])|
|       1.0| (12,[1],[1.0])|
|       0.0| (12,[0],[1.0])|
|       5.0| (12,[5],[1.0])|
|       2.0| (12,[2],[1.0])|
|      10.0|(12,[10],[1.0])|
|      11.0|(12,[11],[1.0])|
|       6.0| (12,[6],[1.0])|
|       3.0| (12,[3],[1.0])|
|       9.0| (12,[9],[1.0])|
+----------+---------------+



### VectorAssembler

In [8]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols=["x","y","z"],outputCol="features")
features_vectorized = vectorAssembler.transform(encoded)
features_vectorized.show(5)

+---+---+---+--------------------+------------+----------+--------------+----------------+
|  x|  y|  z|              source|       class|classIndex|   categoryVec|        features|
+---+---+---+--------------------+------------+----------+--------------+----------------+
| 30| 36| 52|Accelerometer-201...|Climb_stairs|       3.0|(12,[3],[1.0])|[30.0,36.0,52.0]|
| 30| 36| 32|Accelerometer-201...|Climb_stairs|       3.0|(12,[3],[1.0])|[30.0,36.0,32.0]|
| 32| 30| 36|Accelerometer-201...|Climb_stairs|       3.0|(12,[3],[1.0])|[32.0,30.0,36.0]|
| 32| 30| 36|Accelerometer-201...|Climb_stairs|       3.0|(12,[3],[1.0])|[32.0,30.0,36.0]|
| 32| 30| 36|Accelerometer-201...|Climb_stairs|       3.0|(12,[3],[1.0])|[32.0,30.0,36.0]|
+---+---+---+--------------------+------------+----------+--------------+----------------+
only showing top 5 rows



### Normalizer

In [10]:
from pyspark.ml.feature import Normalizer

normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)
l1NormData = normalizer.transform(features_vectorized)
l1NormData.show(5)

+---+---+---+--------------------+------------+----------+--------------+----------------+--------------------+
|  x|  y|  z|              source|       class|classIndex|   categoryVec|        features|       features_norm|
+---+---+---+--------------------+------------+----------+--------------+----------------+--------------------+
| 30| 36| 52|Accelerometer-201...|Climb_stairs|       3.0|(12,[3],[1.0])|[30.0,36.0,52.0]|[0.25423728813559...|
| 30| 36| 32|Accelerometer-201...|Climb_stairs|       3.0|(12,[3],[1.0])|[30.0,36.0,32.0]|[0.30612244897959...|
| 32| 30| 36|Accelerometer-201...|Climb_stairs|       3.0|(12,[3],[1.0])|[32.0,30.0,36.0]|[0.32653061224489...|
| 32| 30| 36|Accelerometer-201...|Climb_stairs|       3.0|(12,[3],[1.0])|[32.0,30.0,36.0]|[0.32653061224489...|
| 32| 30| 36|Accelerometer-201...|Climb_stairs|       3.0|(12,[3],[1.0])|[32.0,30.0,36.0]|[0.32653061224489...|
+---+---+---+--------------------+------------+----------+--------------+----------------+--------------

In [11]:
df_train = l1NormData.drop("source").drop("class").drop("classIndex").drop("features").drop("x").drop("y").drop("z")
df_train.show(5)


+--------------+--------------------+
|   categoryVec|       features_norm|
+--------------+--------------------+
|(12,[3],[1.0])|[0.25423728813559...|
|(12,[3],[1.0])|[0.30612244897959...|
|(12,[3],[1.0])|[0.32653061224489...|
|(12,[3],[1.0])|[0.32653061224489...|
|(12,[3],[1.0])|[0.32653061224489...|
+--------------+--------------------+
only showing top 5 rows



### combine all preprocessing together

In [13]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, encoder, vectorAssembler, normalizer])
model = pipeline.fit(df)
prediction = model.transform(df)
prediction.show(5)

+---+---+---+--------------------+------------+----------+--------------+----------------+--------------------+
|  x|  y|  z|              source|       class|classIndex|   categoryVec|        features|       features_norm|
+---+---+---+--------------------+------------+----------+--------------+----------------+--------------------+
| 30| 36| 52|Accelerometer-201...|Climb_stairs|       3.0|(12,[3],[1.0])|[30.0,36.0,52.0]|[0.25423728813559...|
| 30| 36| 32|Accelerometer-201...|Climb_stairs|       3.0|(12,[3],[1.0])|[30.0,36.0,32.0]|[0.30612244897959...|
| 32| 30| 36|Accelerometer-201...|Climb_stairs|       3.0|(12,[3],[1.0])|[32.0,30.0,36.0]|[0.32653061224489...|
| 32| 30| 36|Accelerometer-201...|Climb_stairs|       3.0|(12,[3],[1.0])|[32.0,30.0,36.0]|[0.32653061224489...|
| 32| 30| 36|Accelerometer-201...|Climb_stairs|       3.0|(12,[3],[1.0])|[32.0,30.0,36.0]|[0.32653061224489...|
+---+---+---+--------------------+------------+----------+--------------+----------------+--------------

## spark ML models
### Classification
* LinearSVC: new in 2.2
    - only support L2 regularization
    - optimize Hinge loss
    - only binary
* LogisticRegression
* DecisionTreeClassifier
* GBTClassifier - binary only
* RandomForestClassifier
* NaiveBayes


In [80]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="class", outputCol="classIndex")
dfindexed = indexer.fit(df).transform(df)
# dfindexed = dfindexed.where('classIndex==0 or classIndex==1')
# rename column
dfindexed = dfindexed.withColumnRenamed('classIndex','label')
training_df,test_df=dfindexed.randomSplit([0.75,0.25])

In [81]:

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer

vectorAssembler = VectorAssembler(inputCols=['x','y','z'],outputCol='features')
normalizer = Normalizer(inputCol='features',outputCol='features_norm',p=1)
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10,featuresCol='features_norm')

In [86]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(maxDepth=5, featuresCol='features_norm')
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=2,maxDepth=3,featuresCol='features_norm')
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(numTrees=3,maxDepth=5,featuresCol='features_norm')
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(featuresCol='features_norm')

In [87]:
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

for fitmodel in [lr,dt,rf,nb]:
    pipeline = Pipeline(stages=[vectorAssembler,normalizer,fitmodel])
    model = pipeline.fit(training_df)
    pred = model.transform(test_df)

    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
    print(evaluator.evaluate(pred,{evaluator.metricName: "accuracy"}))

0.2946619855753592
0.3829147645059617
0.3865601830560946
0.13043868405998676


### Regression
* LinearRegression 
* DecisionTreeRegressor - supports both continuous and categorical features.
* GeneralizedLinearRegression
* GBTRegressor
* RandomForest