# ML Pipelines with Spark ML

This example covers the concepts of Estimator, Transformer, and Param.


Pipelines API concept is mostly inspired by the scikit-learn project.

https://spark.apache.org/docs/latest/ml-pipeline.html

In [2]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression

## keywords

* Vectors  : A vector has magnitude (size) and direction. Vectors are the metrics. A local vector has integer-typed and 0-based indices and double-typed values, stored on a single machine.

* MLlib supports two types of local vectors: dense and sparse. A dense vector is backed by a double array representing its entry values, while a sparse vector is backed by two parallel arrays: indices and values. Being sparse means that it won’t explicitly contains each coordinate.

Ex. 

-a vector (1.0, 0.0, 3.0) can be represented in dense format as [1.0, 0.0, 3.0] or in sparse format as (3, [0, 2], [1.0, 3.0]), where 3 is the size of the vector.

-dense vector (1, 2, 0, 0, 5, 0, 9, 0, 0) will be represented as sparse like {(0,1,4,6), (1, 2, 5, 9)}

<br>

* Transformers : converting datafeame into another dataframe

-A feature transformer might take a DataFrame, read a column (e.g., text), map it into a new column (e.g., feature vectors), and output a new DataFrame with the mapped column appended.
-A learning model might take a DataFrame, read the column containing feature vectors, predict the label for each feature vector, and output a new DataFrame with predicted labels appended as a column.

<br>

* Estimators: concept of a learning algorithm or any algorithm that fits or trains on data.  Technically, an Estimator implements a method fit(), which accepts a DataFrame and produces a Model, which is a Transformer. 

-For example, a learning algorithm such as LogisticRegression is an Estimator, and calling fit() trains a LogisticRegressionModel, which is a Model and hence a Transformer.

<br>

* Pipeline

In the Machine Learning Process data runs through different Stages, e.g. StringIndexer, VectorAssembler, VectorIndexer and RandomForestClassifier. These Stages can be combined to one workflow with a Pipeline. The DataFrame as the central data structure will be enriched at each Stage of the Pipeline.

<br>

* There are 2 types of Stages:

-Estimator creates a model by calling the fit() method, e.g. StringIndexer, VectorIndexer and RandomForestClassifier

-Transformer is a model and transforms a DataFrame by usually adding columns to the Input-DataFrame

In [4]:
# Prepare training data from a list of (label, features) tuples.
df_training = spark.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.0, -1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])

# features and lables

Briefly, feature is input; label is output.

A feature is one column of the data in your input set. For instance, if you're trying to predict the type of pet someone will choose, your input features might include age, home region, family income, etc. The label is the final choice, such as dog, fish, iguana, rock, etc.

In [6]:
#the training dataset
df_training.show()

In [7]:
# Create a LogisticRegression instance. This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01)

## Logistic Regression


* is widely used to predict a binary response.
* is used to ascertain the probability of an event. And this event is captured in binary format, i.e. 0 or 1.
* Example - I want to ascertain if a customer will buy my product or not. For this, I would run a Logistic Regression on the (relevant) data and my dependent variable would be a binary variable (1=Yes; 0=No).
* that is, where the output can take only two values, "0" and "1", which represent outcomes such as pass/fail, win/lose, alive/dead or healthy/sick.
* maxIter -  The number of iterations.
* regParam - The regularizer parameter


## Linear Regression

 * is used to establish a relationship between Dependent and Independent variables, which is useful in estimating the resultant dependent variable in case independent variable change.


<br>

* In terms of graphical representation, Linear Regression gives a linear line as an output, once the values are plotted on the graph. Whereas, the logistic regression gives an S-shaped line

In [9]:
# Print out the parameters, documentation, and any default values.
print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

- explainParams() - Returns the documentation of all params with their optionally default values and user-supplied values.

In [11]:
# Learn a LogisticRegression model. This uses the parameters stored in lr.
model1 = lr.fit(df_training)

* fit(dataset, params=None) 

Fits a model to the input dataset with optional parameters.

Returns the fitted model(s)

In [13]:
model1.write().overwrite().save("/FileStore/tables/modle1")

The result of save for pipeline model is a JSON file for metadata while Parquet for model data, e.g. coefficients.

In [15]:
%fs ls /FileStore/tables/modle1/data

In [16]:
%fs head FileStore/tables/modle1/data/part-00000-tid-6344610780192366248-71efd7d1-6830-4c3c-8f6a-bacc284fff6f-144-c000.snappy.parquet

In [17]:
%fs ls /FileStore/tables/modle1/metadata

In [18]:
%fs head /FileStore/tables/modle1/metadata/part-00000 

In [19]:
# Since model1 is a Model (i.e., a transformer produced by an Estimator),
# we can view the parameters it used during fit().
# This prints the parameter (name: value) pairs, where names are unique IDs for this
# LogisticRegression instance.
print("Model 1 was fit using parameters: ")
print(model1.extractParamMap())

* extractParamMap

Extracts the embedded default param values and user-supplied values, and then merges them with extra values from input into a flat param map, where the latter value is used if there exist conflicts, i.e., with ordering: default param values < user-supplied values < extra.

Returns the	merged param map

In [21]:
# We may alternatively specify parameters using a Python dictionary as a paramMap
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30  # Specify 1 Param, overwriting the original maxIter.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55})  # Specify multiple Params.

In [22]:
# You can combine paramMaps, which are python dictionaries.
paramMap2 = {lr.probabilityCol: "myProbability"}  # Change output column name
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2)

In [23]:
# Now learn a new model using the paramMapCombined parameters.
# paramMapCombined overrides all parameters set earlier via lr.set* methods.
model2 = lr.fit(df_training, paramMapCombined)
print("Model 2 was fit using parameters: ")
print(model2.extractParamMap())

In [24]:
# the input dataset
# Prepare test data
df_test = spark.createDataFrame([
    (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
    (0.0, Vectors.dense([3.0, 2.0, -0.1])),
    (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])

In [25]:
# Make predictions on test data using the Transformer.transform() method.
# LogisticRegression.transform will only use the 'features' column.
# Note that model2.transform() outputs a "myProbability" column instead of the usual
# 'probability' column since we renamed the lr.probabilityCol parameter previously.
prediction = model2.transform(df_test)
result = prediction.select("features", "label", "myProbability", "prediction") \
    .collect()

* transform - Transforms the input dataset with optional parameters.

Parameters:	

dataset – input dataset, which is an instance of pyspark.sql.DataFrame

params – an optional param map that overrides embedded params.

Returns the transformed dataset

In [27]:
for row in result:
    print("features=%s, label=%s -> prob=%s, prediction=%s"
          % (row.features, row.label, row.myProbability, row.prediction))