# Machine Learning Library (MLlib) Guide

(1) Goals of Spark MLib
- to make practical machine learning scalable and easy


(2) What Mlib provides
- ML Algorithms: common learning algorithms such as classification, regression, clustering, and collaborative filtering
- Featurization: feature extraction, transformation, dimensionality reduction, and selection
- Pipelines: tools for constructing, evaluating, and tuning ML Pipelines
- Persistence: saving and load algorithms, models, and Pipelines
- Utilities: linear algebra, statistics, data handling, etc.

## Announcement: DataFrame-based API is primary API

(1) In Spark 2.0, 
- the RDD-based APIs : maintenance mode
- the DataFrame-based API in the spark.ml package : main ML in Spark 

(2) the DataFrame-based API in the spark.ml package is better, because
 - more user-friendly API than RDDs. 
 - Spark Datasources, SQL/DataFrame queries, 
 - a uniform API across ML algorithms and across multiple languages.

## See 6-Appendix Notebook : Data Type

##  Basic Statistics 

### Correlation

In [0]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation

data = [(Vectors.sparse(4, [(0, 1.0), (3, -2.0)]),),
        (Vectors.dense([4.0, 5.0, 0.0, 3.0]),),
        (Vectors.dense([6.0, 7.0, 0.0, 8.0]),),
        (Vectors.sparse(4, [(0, 9.0), (3, 1.0)]),)]
df = spark.createDataFrame(data, ["features"])
df.show()
r1 = Correlation.corr(df, "features").head()
print("Pearson correlation matrix:\n" + str(r1[0]))

r2 = Correlation.corr(df, "features", "spearman").head()
print("Spearman correlation matrix:\n" + str(r2[0]))


+--------------------+
|            features|
+--------------------+
|(4,[0,3],[1.0,-2.0])|
|   [4.0,5.0,0.0,3.0]|
|   [6.0,7.0,0.0,8.0]|
| (4,[0,3],[9.0,1.0])|
+--------------------+

Pearson correlation matrix:
DenseMatrix([[1.        , 0.05564149,        nan, 0.40047142],
             [0.05564149, 1.        ,        nan, 0.91359586],
             [       nan,        nan, 1.        ,        nan],
             [0.40047142, 0.91359586,        nan, 1.        ]])
Spearman correlation matrix:
DenseMatrix([[1.        , 0.10540926,        nan, 0.4       ],
             [0.10540926, 1.        ,        nan, 0.9486833 ],
             [       nan,        nan, 1.        ,        nan],
             [0.4       , 0.9486833 ,        nan, 1.        ]])


In [0]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import ChiSquareTest

data = [(0.0, Vectors.dense(0.5, 10.0)),
        (0.0, Vectors.dense(1.5, 20.0)),
        (1.0, Vectors.dense(1.5, 30.0)),
        (0.0, Vectors.dense(3.5, 30.0)),
        (0.0, Vectors.dense(3.5, 40.0)),
        (1.0, Vectors.dense(3.5, 40.0))]
df = spark.createDataFrame(data, ["label", "features"])
df.show()
r = ChiSquareTest.test(df, "features", "label").head()
print("pValues: " + str(r.pValues))
print("degreesOfFreedom: " + str(r.degreesOfFreedom))
print("statistics: " + str(r.statistics))

+-----+----------+
|label|  features|
+-----+----------+
|  0.0|[0.5,10.0]|
|  0.0|[1.5,20.0]|
|  1.0|[1.5,30.0]|
|  0.0|[3.5,30.0]|
|  0.0|[3.5,40.0]|
|  1.0|[3.5,40.0]|
+-----+----------+

pValues: [0.6872892787909721,0.6822703303362126]
degreesOfFreedom: [2, 3]
statistics: [0.75,1.5]


### Summarizer

* vector column summary statistics for Dataframe. 
* Available metrics are the column-wise max, min, mean, sum, variance, std, and number of nonzeros, as well as the total count.
* https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.stat.Summarizer.html

In [0]:
from pyspark.ml.stat import Summarizer
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

df = sc.parallelize([Row(weight=1.0, features=Vectors.dense(1.0, 1.0, 1.0)),
                     Row(weight=0.0, features=Vectors.dense(1.0, 2.0, 3.0))]).toDF()
df.show()
# create summarizer for multiple metrics "mean" and "count"
summarizer = Summarizer.metrics("mean", "count")

# compute statistics for multiple metrics with weight
df.select(summarizer.summary(df.features, df.weight)).show(truncate=False)

# compute statistics for multiple metrics without weight
df.select(summarizer.summary(df.features)).show(truncate=False)

# compute statistics for single metric "mean" with weight
df.select(Summarizer.mean(df.features, df.weight)).show(truncate=False)

# compute statistics for single metric "mean" without weight
df.select(Summarizer.mean(df.features)).show(truncate=False)

+------+-------------+
|weight|     features|
+------+-------------+
|   1.0|[1.0,1.0,1.0]|
|   0.0|[1.0,2.0,3.0]|
+------+-------------+

+-----------------------------------+
|aggregate_metrics(features, weight)|
+-----------------------------------+
|{[1.0,1.0,1.0], 1}                 |
+-----------------------------------+

+--------------------------------+
|aggregate_metrics(features, 1.0)|
+--------------------------------+
|{[1.0,1.5,2.0], 2}              |
+--------------------------------+

+--------------+
|mean(features)|
+--------------+
|[1.0,1.0,1.0] |
+--------------+

+--------------+
|mean(features)|
+--------------+
|[1.0,1.5,2.0] |
+--------------+



## Data sources

* image data source
* LIBSVM data source : sample_libsvm_data.txt

In [0]:
df = spark.read.format("libsvm").option("numFeatures", "780").load("/FileStore/sample_libsvm_data.txt")
df.show(10)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(780,[127,128,129...|
|  1.0|(780,[158,159,160...|
|  1.0|(780,[124,125,126...|
|  1.0|(780,[152,153,154...|
|  1.0|(780,[151,152,153...|
|  0.0|(780,[129,130,131...|
|  1.0|(780,[158,159,160...|
|  1.0|(780,[99,100,101,...|
|  0.0|(780,[154,155,156...|
|  0.0|(780,[127,128,129...|
+-----+--------------------+
only showing top 10 rows



## ML Pipelines

* the concept of ML Pipelines
   - ML Pipelines provide a uniform set of high-level APIs built on top of DataFrames that help users create and tune practical machine learning pipelines.
   - ML Pipelines is made with idea from scikit-learn pipe. https://scikit-learn.org/stable/modules/compose.html
   
### Main concepts in Pipelines
* MLlib standardizes APIs for machine learning algorithms to make it easier to combine multiple algorithms into a single pipeline, or workflow. 
  - DataFrame: This ML API uses DataFrame from Spark SQL as an ML dataset, which can hold a variety of data types. E.g., a DataFrame could have different columns storing text, feature vectors, true labels, and predictions.
  - Transformer: A Transformer is an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms a DataFrame with features into a DataFrame with predictions.

  - Estimator: An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.

  - Pipeline: A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.

  - Parameter: All Transformers and Estimators now share a common API for specifying parameters.

### DataFrame
* This API adopts the DataFrame from Spark SQL in order to support a variety of data types.
* DataFrame supports many basic and structured types
  - the types listed in the Spark SQL : https://spark.apache.org/docs/2.4.4/sql-reference.html
  - ML Vector: NumPy’s array, Python’s list, e.g., [1, 2, 3], MLlib’s SparseVector, SciPy’s csc_matrix with a single column 
* A DataFrame can be created either implicitly or explicitly from a regular RDD. 

### Pipeline components
#### Transformers
* A Transformer is an abstraction that includes feature transformers and learned models. 
* Technically, a Transformer implements a method transform(), which converts one DataFrame into another, generally by appending one or more columns. 
   - A feature transformer might take a DataFrame, read a column (e.g., text), map it into a new column (e.g., feature vectors), and output a new DataFrame with the mapped column appended.
   - A learning model might take a DataFrame, read the column containing feature vectors, predict the label for each feature vector, and output a new DataFrame with predicted labels appended as a column.

#### Estimators
* An Estimator abstracts the concept of a learning algorithm or any algorithm that fits or trains on data. 
* Estimator는 데이터에 피팅(fitting)하는 혹은 학습하는 알고리즘을 추상화한 것을 말합니다. 
* Technically, an Estimator implements a method fit(), which accepts a DataFrame and produces a Model, which is a Transformer.
* 기술적으로는, Estimator는 fit()함수를 구현하게됩니다. 이것은 데이터프레임을 받아 모델을 생성하는 트랜스포머 입니다.
* 예제
  - a learning algorithm such as LogisticRegression is an Estimator, 
  - and calling fit() trains a LogisticRegressionModel, which is a Model and hence a Transformer.
  
#### Properties of pipeline components  
* Transformer.transform()s and Estimator.fit()s are both stateless
* Each instance of a Transformer or Estimator has a unique ID, which is useful in specifying parameters (discussed below).

### Pipeline
*  it is common to run a sequence of algorithms to process and learn from data. 
  +  (Example)  a simple text document processing workflow might include several stages:
     - Split each document’s text into words.
     - Convert each document’s words into a numerical feature vector.
     - Learn a prediction model using the feature vectors and labels.
*   MLlib represents such a workflow as a Pipeline, which consists of a sequence of PipelineStages (Transformers and Estimators) to be run in a specific order. 

### How it works
* A Pipeline is specified as a sequence of stages, and each stage is either a Transformer or an Estimator.
* These stages are run in order, and the input DataFrame is transformed as it passes through each stage. 
  + For Transformer stages, the transform() method is called on the DataFrame. 
  + For Estimator stages, the fit() method is called to produce a Transformer (which becomes part of the PipelineModel, or fitted Pipeline), 
  + and that Transformer’s transform() method is called on the DataFrame.
  
* (Figure. Example)  training time usage of a Pipeline for simple text document workflow  
<img src="https://spark.apache.org/docs/latest/img/ml-Pipeline.png" width="500px" height="350px" title="training time usage of a Pipeline for simple text document workflow" ></img><br/>
  + Upper in figure : components in pipeline 
    - Transformers: Tokenizer and HashingTF (blue)
    - Estimator: LogisticRegression (red)
  + Lower in figure: pipeline flow processing when fit() is called 
    - cylinders indicate DataFrames. 
    - The Pipeline.fit() method is called on the original DataFrame, which has raw text documents and labels. 
    - The Tokenizer.transform() method splits the raw text documents into words, adding a new column with words to the DataFrame. 
    - The HashingTF.transform() method converts the words column into feature vectors, adding a new column with those vectors to the DataFrame. 
    - Now, since LogisticRegression is an Estimator, the Pipeline first calls LogisticRegression.fit() to produce a LogisticRegressionModel. 
    - If the Pipeline had more Estimators, it would call the LogisticRegressionModel’s transform() method on the DataFrame before passing the DataFrame to the next stage.
    

* (Figure. Example) Pipeline is an Estimator. 
  + A Pipeline is an Estimator. Thus, after a Pipeline’s fit() method runs, it produces a PipelineModel, which is a Transformer. 
  + This PipelineModel is used at test time; the figure below illustrates this usage.    
<img src="https://spark.apache.org/docs/latest/img/ml-PipelineModel.png" width="500px" height="350px" title="Pipeline is an Estimator." ></img><br/>  
  + the PipelineModel has the same number of stages as the original Pipeline, but all Estimators in the original Pipeline have become Transformers. 
    - When the PipelineModel’s transform() method is called on a test dataset, 
    - the data are passed through the fitted pipeline in order. 
    - Each stage’s transform() method updates the dataset and passes it to the next stage.
    
### Details
* DAG Pipelines: 
  + The examples given here are all for linear Pipelines,
  + It is possible to create non-linear Pipelines as long as the data flow graph forms a Directed Acyclic Graph (DAG). 
* Runtime checking:: 
  + Since Pipelines can operate on DataFrames with varied types, they cannot use compile-time type checking. 
  + Pipelines and PipelineModels instead do runtime checking before actually running the Pipeline. This type checking is done using the DataFrame schema, a description of the data types of columns in the DataFrame.
* Unique Pipeline stages: 
  + A Pipeline’s stages should be unique instances. 
  + E.g., the same instance myHashingTF should not be inserted into the Pipeline twice since Pipeline stages must have unique IDs.
  
### Parameters
* MLlib Estimators and Transformers use a uniform API for specifying parameters.
* There are two main ways to pass parameters to an algorithm:
  + Set parameters for an instance. E.g., if lr is an instance of LogisticRegression, one could call lr.setMaxIter(10) to make lr.fit() use at most 10 iterations. This API resembles the API used in spark.mllib package.
  + Pass a ParamMap to fit() or transform(). Any parameters in the ParamMap will override parameters previously specified via setter methods.
* Parameters belong to specific instances of Estimators and Transformers. 
  + For example, if we have two LogisticRegression instances lr1 and lr2, then we can build a ParamMap with both maxIter parameters specified: ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20). 
  + This is useful if there are two algorithms with the maxIter parameter in a Pipeline.  
  
### ML persistence: Saving and Loading Pipelines  
* ML persistence works (In otherwords, we can save ML model) across Scala, Java and Python. 
 
#### Backwards compatibility for ML persistence
* if you save an ML model or Pipeline in one version of Spark, then you should be able to load it back and use it in a future version of Spark. 
* However, there are rare exceptions, described below.
   + Model persistence: Is a model or Pipeline saved using Apache Spark ML persistence in Spark version X loadable by Spark version Y?
     - Major versions: No guarantees, but best-effort.
     - Minor and patch versions: Yes; these are backwards compatible.
     - Note about the format: There are no guarantees for a stable persistence format, but model loading itself is designed to be backwards compatible.
   + Model behavior: Does a model or Pipeline in Spark version X behave identically in Spark version Y?
     - Major versions: No guarantees, but best-effort.
     - Minor and patch versions: Identical behavior, except for bug fixes.

### Codel Examples

#### Example: Estimator, Transformer, and Param
This example covers the concepts of Estimator, Transformer, and Param.

In [0]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression

# Prepare training data from a list of (label, features) tuples.
training = spark.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.0, -1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])

# Create a LogisticRegression instance. This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Print out the parameters, documentation, and any default values.
print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

# Learn a LogisticRegression model. This uses the parameters stored in lr.
model1 = lr.fit(training)

# Since model1 is a Model (i.e., a transformer produced by an Estimator),
# we can view the parameters it used during fit().
# This prints the parameter (name: value) pairs, where names are unique IDs for this
# LogisticRegression instance.
print("Model 1 was fit using parameters: ")
print(model1.extractParamMap())

# We may alternatively specify parameters using a Python dictionary as a paramMap
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30  # Specify 1 Param, overwriting the original maxIter.
# Specify multiple Params.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55})  # type: ignore

# You can combine paramMaps, which are python dictionaries.
# Change output column name
paramMap2 = {lr.probabilityCol: "myProbability"}  # type: ignore
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2)  # type: ignore

# Now learn a new model using the paramMapCombined parameters.
# paramMapCombined overrides all parameters set earlier via lr.set* methods.
model2 = lr.fit(training, paramMapCombined)
print("Model 2 was fit using parameters: ")
print(model2.extractParamMap())

# Prepare test data
test = spark.createDataFrame([
    (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
    (0.0, Vectors.dense([3.0, 2.0, -0.1])),
    (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])

# Make predictions on test data using the Transformer.transform() method.
# LogisticRegression.transform will only use the 'features' column.
# Note that model2.transform() outputs a "myProbability" column instead of the usual
# 'probability' column since we renamed the lr.probabilityCol parameter previously.
prediction = model2.transform(test)
result = prediction.select("features", "label", "myProbability", "prediction") \
    .collect()

for row in result:
    print("features=%s, label=%s -> prob=%s, prediction=%s"
          % (row.features, row.label, row.myProbability, row.prediction))

LogisticRegression parameters:
aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. The bou

#### Example: Pipeline
* This example follows the simple text document Pipeline illustrated in the figures above.

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(training)

# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row  # type: ignore
    print(
        "(%d, %s) --> prob=%s, prediction=%f" % (
            rid, text, str(prob), prediction   # type: ignore
        )
    )

(4, spark i j k) --> prob=[0.6292098489668488,0.37079015103315116], prediction=0.000000
(5, l m n) --> prob=[0.984770006762304,0.015229993237696027], prediction=0.000000
(6, spark hadoop spark) --> prob=[0.13412348342566147,0.8658765165743385], prediction=1.000000
(7, apache hadoop) --> prob=[0.9955732114398529,0.00442678856014711], prediction=0.000000
