#  Spark Mlib

![](https://blog.osservatori.net/hubfs/AI/machine-learning.jpg)
[Osservatori.net](https://blog.osservatori.net/it_it/machine-learning-come-funziona-apprendimento-automatico)

# @reboot

In [1]:
import findspark
import pyspark
findspark.find( ) 
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("TapDataFrame").getOrCreate()
spark

<div class="jumbotron">
    <center>
        <b>MLlib</b> is Apache Spark's scalable machine learning library.
    </center>
</div>

## Ease of Use

***Usable in Java, Scala, Python, and R.***

MLlib fits into Spark's APIs and interoperates with NumPy in Python (as of Spark 0.9) and R libraries (as of Spark 1.5). You can use any Hadoop data source (e.g. HDFS, HBase, or local files), making it easy to plug into Hadoop workflows.

```python
data = spark.read.format("libsvm")\
  .load("hdfs://...")

model = KMeans(k=10).fit(data)
```

## Performance

***High-quality algorithms, 100x faster than MapReduce.***

Spark excels at iterative computation, enabling MLlib to run fast. At the same time, we care about algorithmic performance: MLlib contains high-quality algorithms that leverage iteration, and can yield better results than the one-pass approximations sometimes used on MapReduce.

![](https://spark.apache.org/images/logistic-regression.png)

## Algorithms and Utilities

*Algorithms*

* Classification: logistic regression, naive Bayes,...
* Regression: generalized linear regression, survival regression,...
* Decision trees, random forests, and gradient-boosted trees
* Recommendation: alternating least squares (ALS)
* Clustering: K-means, Gaussian mixtures (GMMs),...
* Topic modeling: latent Dirichlet allocation (LDA)
* Frequent itemsets, association rules, and sequential pattern mining


*Utilities*

* Feature transformations: standardization, normalization, hashing,...
* ML Pipeline construction
* Model evaluation and hyper-parameter tuning
* ML persistence: saving and loading models and Pipelines
* Distributed linear algebra: SVD, PCA,...

## Announcement: DataFrame-based API is primary API
https://spark.apache.org/docs/latest/ml-guide.html

The MLlib RDD-based API is now in maintenance mode.

As of Spark 2.0, the RDD-based APIs in the spark.mllib package have entered maintenance mode. The primary Machine Learning API for Spark is now the DataFrame-based API in the spark.ml package.

DataFrames provide a more user-friendly API than RDDs. The many benefits of DataFrames include Spark Datasources, SQL/DataFrame queries, Tungsten and Catalyst optimizations, and uniform APIs across languages.
The DataFrame-based API for MLlib provides a uniform API across ML algorithms and across multiple languages.
DataFrames facilitate practical ML Pipelines, particularly feature transformations. See the Pipelines guide for details.

What is “Spark ML”?

“Spark ML” is not an official name but occasionally used to refer to the MLlib DataFrame-based API.

## Highlights in 3.0[](https://spark.apache.org/docs/latest/ml-guide.html#highlights-in-30)

The list below highlights some of the new features and enhancements added to MLlib in the `3.0`
release of Spark:

* Multiple columns support was added to `Binarizer` ([SPARK-23578](https://issues.apache.org/jira/browse/SPARK-23578)), `StringIndexer` ([SPARK-11215](https://issues.apache.org/jira/browse/SPARK-11215)), `StopWordsRemover` ([SPARK-29808](https://issues.apache.org/jira/browse/SPARK-29808)) and PySpark `QuantileDiscretizer` ([SPARK-22796](https://issues.apache.org/jira/browse/SPARK-22796)).
* Tree-Based Feature Transformation was added
    ([SPARK-13677](https://issues.apache.org/jira/browse/SPARK-13677)).
* Two new evaluators `MultilabelClassificationEvaluator` ([SPARK-16692](https://issues.apache.org/jira/browse/SPARK-16692)) and `RankingEvaluator` ([SPARK-28045](https://issues.apache.org/jira/browse/SPARK-28045)) were added.
* Sample weights support was added in `DecisionTreeClassifier/Regressor` ([SPARK-19591](https://issues.apache.org/jira/browse/SPARK-19591)), `RandomForestClassifier/Regressor` ([SPARK-9478](https://issues.apache.org/jira/browse/SPARK-9478)), `GBTClassifier/Regressor` ([SPARK-9612](https://issues.apache.org/jira/browse/SPARK-9612)),  `MulticlassClassificationEvaluator` ([SPARK-24101](https://issues.apache.org/jira/browse/SPARK-24101)), `RegressionEvaluator` ([SPARK-24102](https://issues.apache.org/jira/browse/SPARK-24102)), `BinaryClassificationEvaluator` ([SPARK-24103](https://issues.apache.org/jira/browse/SPARK-24103)), `BisectingKMeans` ([SPARK-30351](https://issues.apache.org/jira/browse/SPARK-30351)), `KMeans` ([SPARK-29967](https://issues.apache.org/jira/browse/SPARK-29967)) and `GaussianMixture` ([SPARK-30102](https://issues.apache.org/jira/browse/SPARK-30102)).
* R API for `PowerIterationClustering` was added
    ([SPARK-19827](https://issues.apache.org/jira/browse/SPARK-19827)).
* Added Spark ML listener for tracking ML pipeline status
    ([SPARK-23674](https://issues.apache.org/jira/browse/SPARK-23674)).
* Fit with validation set was added to Gradient Boosted Trees in Python
    ([SPARK-24333](https://issues.apache.org/jira/browse/SPARK-24333)).
* [`RobustScaler`](https://spark.apache.org/docs/latest/ml-features.html#robustscaler) transformer was added
    ([SPARK-28399](https://issues.apache.org/jira/browse/SPARK-28399)).
* [`Factorization Machines`](https://spark.apache.org/docs/latest/ml-classification-regression.html#factorization-machines) classifier and regressor were added
    ([SPARK-29224](https://issues.apache.org/jira/browse/SPARK-29224)).
* Gaussian Naive Bayes Classifier ([SPARK-16872](https://issues.apache.org/jira/browse/SPARK-16872)) and Complement Naive Bayes Classifier ([SPARK-29942](https://issues.apache.org/jira/browse/SPARK-29942)) were added.
* ML function parity between Scala and Python
    ([SPARK-28958](https://issues.apache.org/jira/browse/SPARK-28958)).
* `predictRaw` is made public in all the Classification models. `predictProbability` is made public in all the Classification models except `LinearSVCModel`
    ([SPARK-30358](https://issues.apache.org/jira/browse/SPARK-30358)).

## Data Types

## Local vector
A local vector has integer-typed and 0-based indices and double-typed values, stored on a single machine. 

MLlib supports two types of local vectors: dense and sparse. 

A dense vector is backed by a double array representing its entry values, while a sparse vector is backed by two parallel arrays: indices and values. 

For example, a vector (1.0, 0.0, 3.0) can be represented in dense format as [1.0, 0.0, 3.0] or in sparse format as (3, [0, 2], [1.0, 3.0]), where 3 is the size of the vector.


# Basic Statistics

## Correlation

*Correlation* computes the correlation matrix for the input Dataset of Vectors using the specified method. 
The output will be a DataFrame that contains the correlation matrix of the column of vectors.

In [4]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation

datasetA = [1.0,2.0,3.0,4.0]
datasetB = [2.0,4.0,6.0,8.0]

data = [
        (Vectors.dense(datasetA),),
        (Vectors.dense(datasetB),)
       ]
# the comma after the variable forces Python to consider it as a tuple
# https://www.w3schools.com/python/gloss_python_tuple_one_item.asp
# Credits Ernesto Casablanca, TAP 2021-04-26

df = spark.createDataFrame(data, ["features"])
df.show()

+-----------------+
|         features|
+-----------------+
|[1.0,2.0,3.0,4.0]|
|[2.0,4.0,6.0,8.0]|
+-----------------+



In [5]:
r1 = Correlation.corr(df, "features").head()
print("Pearson correlation matrix:\n" + str(r1[0]))

r2 = Correlation.corr(df, "features", "spearman").head()
print("Spearman correlation matrix:\n" + str(r2[0]))

Pearson correlation matrix:
DenseMatrix([[1., 1., 1., 1.],
             [1., 1., 1., 1.],
             [1., 1., 1., 1.],
             [1., 1., 1., 1.]])
Spearman correlation matrix:
DenseMatrix([[1., 1., 1., 1.],
             [1., 1., 1., 1.],
             [1., 1., 1., 1.],
             [1., 1., 1., 1.]])


## Hypothesis testing
Hypothesis testing is a powerful tool in statistics to determine whether a result is statistically significant, whether this result occurred by chance or not. spark.ml currently supports Pearson’s Chi-squared ( χ2) tests for independence.

ChiSquareTest conducts Pearson’s independence test for every feature against the label. For each feature, the (feature, label) pairs are converted into a contingency matrix for which the Chi-squared statistic is computed. All label and feature values must be categorical.

In [6]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import ChiSquareTest

#https://it.wikipedia.org/wiki/Test_chi_quadrato
data = [(1, Vectors.dense(333,388)),
        (2, Vectors.dense(333,322)),
        (3, Vectors.dense(333,314)),
        (4, Vectors.dense(333,316)),
        (5, Vectors.dense(333,344)),
        (6, Vectors.dense(333,316))]
df = spark.createDataFrame(data, ["label", "features"])
df.show()
r = ChiSquareTest.test(df, "features", "label").head()
print("pValues: " + str(r.pValues))
print("degreesOfFreedom: " + str(r.degreesOfFreedom))
print("statistics: " + str(r.statistics))

+-----+-------------+
|label|     features|
+-----+-------------+
|    1|[333.0,388.0]|
|    2|[333.0,322.0]|
|    3|[333.0,314.0]|
|    4|[333.0,316.0]|
|    5|[333.0,344.0]|
|    6|[333.0,316.0]|
+-----+-------------+

pValues: [1.0,0.24239216167051258]
degreesOfFreedom: [0, 20]
statistics: [0.0,24.00000000000001]


## Summarizer
We provide vector column summary statistics for Dataframe through Summarizer.

Available metrics are the column-wise max, min, mean, variance, and number of nonzeros, as well as the total count.

In [7]:
from pyspark.ml.stat import Summarizer
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

# Usage of spark.sparkContext to get sc 

df = spark.sparkContext.parallelize(
    [Row(weight=1.0, features=Vectors.dense(1.0, 1.0, 1.0)),
     Row(weight=0.0, features=Vectors.dense(1.0, 2.0, 3.0))]
    ).toDF()

# create summarizer for multiple metrics "mean" and "count"
summarizer = Summarizer.metrics("mean", "count")

summarizer

<pyspark.ml.stat.SummaryBuilder at 0x7f4cd86484d0>

In [8]:
# compute statistics for multiple metrics with weight
df.select(summarizer.summary(df.features, df.weight)).show(truncate=False)

+-----------------------------------+
|aggregate_metrics(features, weight)|
+-----------------------------------+
|{[1.0,1.0,1.0], 1}                 |
+-----------------------------------+



In [9]:
# compute statistics for multiple metrics without weight
df.select(summarizer.summary(df.features)).show(truncate=False)

+--------------------------------+
|aggregate_metrics(features, 1.0)|
+--------------------------------+
|{[1.0,1.5,2.0], 2}              |
+--------------------------------+



In [22]:
# compute statistics for single metric "mean" with weight
df.select(Summarizer.mean(df.features, df.weight)).show(truncate=False)

+--------------+
|mean(features)|
+--------------+
|[1.0,1.0,1.0] |
+--------------+



In [8]:
# compute statistics for single metric "mean" without weight
df.select(Summarizer.mean(df.features)).show(truncate=False)

+--------------+
|mean(features)|
+--------------+
|[1.0,1.5,2.0] |
+--------------+



# Pipelines

MLlib standardizes APIs for machine learning algorithms to make it easier to combine multiple algorithms into a single pipeline, or workflow. 

This section covers the key concepts introduced by the Pipelines API, where the pipeline concept is mostly inspired by the scikit-learn project.

**DataFrame**

This ML API uses DataFrame from Spark SQL as an ML dataset, which can hold a variety of data types. 

E.g., a DataFrame could have different columns storing text, feature vectors, true labels, and predictions.

**Transformer** 

A Transformer is an algorithm which can transform one DataFrame into another DataFrame.

E.g., an ML model is a Transformer which transforms a DataFrame with features into a DataFrame with predictions.

**Estimator**

An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. 

E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.

**Pipeline**

A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.

**Parameter** 

All Transformers and Estimators now share a common API for specifying parameters.

### DataFrame
Machine learning can be applied to a wide variety of data types, such as vectors, text, images, and structured data. 

This API adopts the DataFrame from Spark SQL in order to support a variety of data types.

DataFrame supports many basic and structured types; see the Spark SQL datatype reference for a list of supported types. In addition to the types listed in the Spark SQL guide, DataFrame can use ML Vector types.

A DataFrame can be created either implicitly or explicitly from a regular RDD. See the code examples below and the Spark SQL programming guide for examples.

Columns in a DataFrame are named. The code examples below use names such as “text,” “features,” and “label.”

### Transformers
A Transformer is an abstraction that includes feature transformers and learned models. 

Technically, a Transformer implements a method transform(), which converts one DataFrame into another, generally by appending one or more columns. 

For example:
* A feature transformer might take a DataFrame, read a column (e.g., text), map it into a new column (e.g., feature vectors), and output a new DataFrame with the mapped column appended.
* A learning model might take a DataFrame, read the column containing feature vectors, predict the label for each feature vector, and output a new DataFrame with predicted labels appended as a column.

### Estimators
An Estimator abstracts the concept of a learning algorithm or any algorithm that fits or trains on data. 

Technically, an Estimator implements a method fit(), which accepts a DataFrame and produces a Model, which is a Transformer. 

For example, a learning algorithm such as LogisticRegression is an Estimator, 
and calling fit() trains a LogisticRegressionModel, which is a Model and hence a Transformer.

## Pipeline
In machine learning, it is common to run a sequence of algorithms to process and learn from data. E.g., a simple text document processing workflow might include several stages:

* Split each document’s text into words.
* Convert each document’s words into a numerical feature vector.
* Learn a prediction model using the feature vectors and labels.

MLlib represents such a workflow as a Pipeline, which consists of a sequence of PipelineStages (Transformers and Estimators) to be run in a specific order. 

# How it works
A Pipeline is specified as a sequence of stages, and each stage is either a Transformer or an Estimator. 

These stages are run in order, and the input DataFrame is transformed as it passes through each stage. 

For Transformer stages, the transform() method is called on the DataFrame. 

For Estimator stages, the fit() method is called to produce a Transformer (which becomes part of the PipelineModel, or fitted Pipeline), and that Transformer’s transform() method is called on the DataFrame.

# A simple Text document workflow

![](https://spark.apache.org/docs/latest/img/ml-Pipeline.png)

Above, the top row represents a Pipeline with three stages. The first two (Tokenizer and HashingTF) are Transformers (blue), and the third (LogisticRegression) is an Estimator (red). The bottom row represents data flowing through the pipeline, where cylinders indicate DataFrames. The Pipeline.fit() method is called on the original DataFrame, which has raw text documents and labels. 

The Tokenizer.transform() method splits the raw text documents into words, adding a new column with words to the DataFrame. 

The HashingTF.transform() method converts the words column into feature vectors, adding a new column with those vectors to the DataFrame. Now, since LogisticRegression is an Estimator, the Pipeline first calls LogisticRegression.fit() to produce a LogisticRegressionModel.

![](https://spark.apache.org/docs/latest/img/ml-PipelineModel.png)

In the figure above, the PipelineModel has the same number of stages as the original Pipeline, but all Estimators in the original Pipeline have become Transformers. When the PipelineModel’s transform() method is called on a test dataset, the data are passed through the fitted pipeline in order. Each stage’s transform() method updates the dataset and passes it to the next stage.

Pipelines and PipelineModels help to ensure that training and test data go through identical feature processing steps.

### ML persistence: Saving and Loading Pipelines
Often times it is worth it to save a model or a pipeline to disk for later use. In Spark 1.6, a model import/export functionality was added to the Pipeline API. As of Spark 2.3, the DataFrame-based API in spark.ml and pyspark.ml has complete coverage.

ML persistence works across Scala, Java and Python. However, R currently uses a modified format, so models saved in R can only be loaded back in R;

In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])
training.show()

+---+----------------+-----+
| id|            text|label|
+---+----------------+-----+
|  0| a b c d e spark|  1.0|
|  1|             b d|  0.0|
|  2|     spark f g h|  1.0|
|  3|hadoop mapreduce|  0.0|
+---+----------------+-----+



In [11]:
# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])


In [12]:
# Fit the pipeline to training documents.
model = pipeline.fit(training)
model

PipelineModel_bfc473e6780c

In [13]:
model

PipelineModel_bfc473e6780c

In [21]:
# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "batman"),
    (6, "spark a hadoop"),
    (7, "apache hadoop")
], ["id", "text"])
test.show()

+---+--------------+
| id|          text|
+---+--------------+
|  4|   spark i j k|
|  5|        batman|
|  6|spark a hadoop|
|  7| apache hadoop|
+---+--------------+



In [22]:
# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)

In [23]:
prediction.show()

+---+--------------+------------------+--------------------+--------------------+--------------------+----------+
| id|          text|             words|            features|       rawPrediction|         probability|prediction|
+---+--------------+------------------+--------------------+--------------------+--------------------+----------+
|  4|   spark i j k|  [spark, i, j, k]|(262144,[19036,68...|[-1.6609033227473...|[0.15964077387874...|       1.0|
|  5|        batman|          [batman]|(262144,[178334],...|[1.64218895265635...|[0.83783256854766...|       0.0|
|  6|spark a hadoop|[spark, a, hadoop]|(262144,[107107,1...|[-2.0753881790526...|[0.11151207471547...|       1.0|
|  7| apache hadoop|  [apache, hadoop]|(262144,[68303,19...|[4.00817033336806...|[0.98215753334442...|       0.0|
+---+--------------+------------------+--------------------+--------------------+--------------------+----------+



In [24]:
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

(4, spark i j k) --> prob=[0.1596407738787412,0.8403592261212588], prediction=1.000000
(5, batman) --> prob=[0.8378325685476614,0.16216743145233858], prediction=0.000000
(6, spark a hadoop) --> prob=[0.11151207471547728,0.8884879252845227], prediction=1.000000
(7, apache hadoop) --> prob=[0.9821575333444208,0.017842466655579203], prediction=0.000000


In [25]:
# Make predictions on training documents and print columns of interest.
prediction = model.transform(training)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

(0, a b c d e spark) --> prob=[0.0021342419881406768,0.9978657580118593], prediction=1.000000
(1, b d) --> prob=[0.9959176174854043,0.004082382514595695], prediction=0.000000
(2, spark f g h) --> prob=[0.0014541569986711246,0.9985458430013289], prediction=1.000000
(3, hadoop mapreduce) --> prob=[0.9982978367343561,0.0017021632656438745], prediction=0.000000


In [26]:
model.stages

[Tokenizer_896363d5c6be,
 HashingTF_03b04dbc6b7d,
 LogisticRegressionModel: uid=LogisticRegression_df4027938854, numClasses=2, numFeatures=262144]

In [27]:
# Extract 
# Extract the summary from the returned LogisticRegressionModel instance trained
# in the earlier example
modelSummary = model.stages[2].summary

modelSummary.accuracy

1.0

In [28]:
modelSummary

<pyspark.ml.classification.BinaryLogisticRegressionTrainingSummary at 0x7f4ce8291cd0>

In [29]:
test2 = spark.createDataFrame([
    (8, "mapreduce hadopp spark")
], ["id", "text"])

# Make predictions on training documents and print columns of interest.
prediction = model.transform(test2)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

(8, mapreduce hadopp spark) --> prob=[0.6693126798261014,0.33068732017389857], prediction=0.000000


## Extracting, transforming and selecting features
https://spark.apache.org/docs/latest/ml-features.html#extracting-transforming-and-selecting-features

* Extraction: Extracting features from “raw” data
* Transformation: Scaling, converting, or modifying features
* Selection: Selecting a subset from a larger set of features
* Locality Sensitive Hashing (LSH): This class of algorithms combines aspects of feature transformation with other algorithms.

# Word2Vec
Word2Vec computes distributed vector representation of words. 

The main advantage of the distributed representations is that similar words are close in the vector space, which makes generalization to novel patterns easier and model estimation more robust. 

Distributed vector representation is showed to be useful in many natural language processing applications such as named entity recognition, disambiguation, parsing, tagging and machine translation.


In [30]:
from pyspark.ml.feature import Word2Vec

# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])
 
documentDF.show(truncate=False)

+------------------------------------------+
|text                                      |
+------------------------------------------+
|[Hi, I, heard, about, Spark]              |
|[I, wish, Java, could, use, case, classes]|
|[Logistic, regression, models, are, neat] |
+------------------------------------------+



In [31]:
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)
model

Word2VecModel: uid=Word2Vec_7b2f7e4a7bea, numWords=16, vectorSize=3

In [32]:
result = model.transform(documentDF)
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

Text: [Hi, I, heard, about, Spark] => 
Vector: [0.038781297206878666,-0.0880258545279503,-0.015223285555839539]

Text: [I, wish, Java, could, use, case, classes] => 
Vector: [0.013235008610146386,-0.014181817748716899,-0.018456950783729553]

Text: [Logistic, regression, models, are, neat] => 
Vector: [0.02225852077826858,-0.0025158967822790147,-0.028219491243362427]



# Classification and regression
https://spark.apache.org/docs/latest/ml-classification-regression.html#classification-and-regression

## Logistic regression
Logistic regression is a popular method to predict a categorical response. It is a special case of Generalized Linear models that predicts the probability of the outcomes. In spark.ml logistic regression can be used to predict a binary outcome by using binomial logistic regression, or it can be used to predict a multiclass outcome by using multinomial logistic regression. Use the family parameter to select between these two algorithms, or leave it unset and Spark will infer the correct variant.

> Multinomial logistic regression can be used for binary classification by setting the family param to “multinomial”. It will produce two sets of coefficients and two intercepts.



In [33]:
from pyspark.ml.classification import LogisticRegression

# Load training data
training = spark.read.format("libsvm").load("../spark/dataset/sample_libsvm_data.txt")
#training.show(truncate=False)
training.count()

100

In [34]:
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

Coefficients: (692,[244,263,272,300,301,328,350,351,378,379,405,406,407,428,433,434,455,456,461,462,483,484,489,490,496,511,512,517,539,540,568],[-7.353983524188241e-05,-9.102738505589566e-05,-0.0001946743054690423,-0.00020300642473486603,-3.147618331486458e-05,-6.842977602660821e-05,1.5883626898236275e-05,1.4023497091368928e-05,0.0003543204752496838,0.00011443272898171099,0.00010016712383666487,0.0006014109303795511,0.0002840248179122765,-0.00011541084736508905,0.000385996886312906,0.0006350195574241097,-0.00011506412384575733,-0.0001527186586498689,0.0002804933808994214,0.0006070117471191665,-0.0002008459663247435,-0.00014210755792901347,0.0002739010341160883,0.0002773045624496811,-9.838027027269408e-05,-0.00038085224435175833,-0.00025315198008554285,0.0002774771477075434,-0.00024436197639191286,-0.0015394744687597679,-0.00023073328411330604])
Intercept: 0.22456315961250245


In [28]:
# We can also use the multinomial family for binary classification
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")

# Fit the model
mlrModel = mlr.fit(training)
 
# Print the coefficients and intercepts for logistic regression with multinomial family
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(mlrModel.interceptVector))

Multinomial coefficients: 2 X 692 CSRMatrix
(0,244) 0.0
(0,263) 0.0001
(0,272) 0.0001
(0,300) 0.0001
(0,350) -0.0
(0,351) -0.0
(0,378) -0.0
(0,379) -0.0
(0,405) -0.0
(0,406) -0.0006
(0,407) -0.0001
(0,428) 0.0001
(0,433) -0.0
(0,434) -0.0007
(0,455) 0.0001
(0,456) 0.0001
..
..
Multinomial intercepts: [-0.12065879445860596,0.12065879445860596]


In [38]:
# Extract the summary from the returned LogisticRegressionModel instance trained
# in the earlier example
trainingSummary = lrModel.summary

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

objectiveHistory:
0.6833149135741672
0.6662875751473734
0.6217068546034616
0.6127265245887888
0.6060347986802872
0.6031750687571563
0.5969621534836272
0.594074303198312
0.5906089243339021
0.5894724576491043
0.5882187775729588


In [36]:
# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
trainingSummary.roc.show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

+---+--------------------+
|FPR|                 TPR|
+---+--------------------+
|0.0|                 0.0|
|0.0|0.017543859649122806|
|0.0| 0.03508771929824561|
|0.0| 0.05263157894736842|
|0.0| 0.07017543859649122|
|0.0| 0.08771929824561403|
|0.0| 0.10526315789473684|
|0.0| 0.12280701754385964|
|0.0| 0.14035087719298245|
|0.0| 0.15789473684210525|
|0.0| 0.17543859649122806|
|0.0| 0.19298245614035087|
|0.0| 0.21052631578947367|
|0.0| 0.22807017543859648|
|0.0| 0.24561403508771928|
|0.0|  0.2631578947368421|
|0.0|  0.2807017543859649|
|0.0|  0.2982456140350877|
|0.0|  0.3157894736842105|
|0.0|  0.3333333333333333|
+---+--------------------+
only showing top 20 rows

areaUnderROC: 1.0


In [37]:
# Set the model threshold to maximize F-Measure
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']) \
    .select('threshold').head()['threshold']
lr.setThreshold(bestThreshold)

LogisticRegression_7c10b85de11b

# Clustering
https://spark.apache.org/docs/latest/ml-clustering.html#clustering

K-means
k-means is one of the most commonly used clustering algorithms that clusters the data points into a predefined number of clusters. The MLlib implementation includes a parallelized variant of the k-means++ method called kmeans||.

KMeans is implemented as an Estimator and generates a KMeansModel as the base model.

**Input Columns**

|Param name	|  Type(s)	| Default	| Description  | 
|-----------|  ------   | ----------| ----------   |
|featuresCol|  Vector	|"features"	|Feature vector|

**Output Columns**

|Param name	|  Type(s)	| Default	| Description  | 
|-----------|  ------   | ----------| ----------   |
|predictionCol|	Int|	"prediction"|	Predicted cluster center

In [41]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Loads data.
dataset = spark.read.format("libsvm").load("../spark/dataset/sample_kmeans_data.txt")
#dataset.show(truncate=False)
dataset.count()

6

In [42]:
# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

# Make predictions
predictions = model.transform(dataset)
predictions.show()

+-----+--------------------+----------+
|label|            features|prediction|
+-----+--------------------+----------+
|  0.0|           (3,[],[])|         1|
|  1.0|(3,[0,1,2],[0.1,0...|         1|
|  2.0|(3,[0,1,2],[0.2,0...|         1|
|  3.0|(3,[0,1,2],[9.0,9...|         0|
|  4.0|(3,[0,1,2],[9.1,9...|         0|
|  5.0|(3,[0,1,2],[9.2,9...|         0|
+-----+--------------------+----------+



In [43]:
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.9997530305375207


In [44]:
# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[9.1 9.1 9.1]
[0.1 0.1 0.1]


# Collaborative Filtering 
https://spark.apache.org/docs/latest/ml-collaborative-filtering.html#collaborative-filtering-1

Example on demand

![](https://media1.tenor.com/images/257a13ee5e204efdca4bb135a8f75a2e/tenor.gif?itemid=16088629)

In [45]:
spark.stop()

# Biblio

* https://spark.apache.org/mllib/
* https://spark.apache.org/docs/latest/ml-guide.html
* https://blog.osservatori.net/it_it/machine-learning-come-funziona-apprendimento-automatico
* https://towardsdatascience.com/hands-on-big-data-streaming-apache-spark-at-scale-fd89c15fa6b0
* https://towardsdatascience.com/apache-spark-mllib-tutorial-ec6f1cb336a9
* https://www.guru99.com/pyspark-tutorial.html
* https://towardsdatascience.com/sentiment-analysis-simplified-ac30720a5827
* http://web.cs.ucla.edu/~mtgarip/statistics.html
* https://towardsdatascience.com/machine-learning-with-pyspark-and-mllib-solving-a-binary-classification-problem-96396065d2aa
* https://runawayhorse001.github.io/LearningApacheSpark/index.html