# MLib

Reference:
https://spark.apache.org/docs/2.2.0/ml-guide.html


MLlib is Sparkâ€™s machine learning (ML) library. Its goal is to make practical machine learning scalable and easy. At a high level, it provides tools such as:

* ML Algorithms: common learning algorithms such as classification, regression, clustering, and collaborative filtering
* Featurization: feature extraction, transformation, dimensionality reduction, and selection
* Pipelines: tools for constructing, evaluating, and tuning ML Pipelines
* Persistence: saving and load algorithms, models, and Pipelines
* Utilities: linear algebra, statistics, data handling, etc.

## correlation



In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *

from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation

In [2]:
spark = SparkSession.builder.appName("MDs example of Spark MLib").config("spark.some.config.option", "some-value").getOrCreate()

In [4]:
data = [
        (Vectors.sparse(
            4, 
            [(0, 1.0), (3, -2.0)]),
        ),
        (Vectors.dense([4.0, 5.0, 0.0, 3.0]),),
        (Vectors.dense([6.0, 7.0, 0.0, 8.0]),),
        (Vectors.sparse(
            4, 
            [(0, 9.0), (3, 1.0)]),
        )]

In [5]:
df = spark.createDataFrame(data, ["features"])

In [6]:
r1 = Correlation.corr(df, "features").head()
print("Pearson correlation matrix:\n" + str(r1[0]))

Pearson correlation matrix:
DenseMatrix([[1.        , 0.05564149,        nan, 0.40047142],
             [0.05564149, 1.        ,        nan, 0.91359586],
             [       nan,        nan, 1.        ,        nan],
             [0.40047142, 0.91359586,        nan, 1.        ]])


In [7]:
r2 = Correlation.corr(df, "features", "spearman").head()
print("Spearman correlation matrix:\n" + str(r2[0]))

Spearman correlation matrix:
DenseMatrix([[1.        , 0.10540926,        nan, 0.4       ],
             [0.10540926, 1.        ,        nan, 0.9486833 ],
             [       nan,        nan, 1.        ,        nan],
             [0.4       , 0.9486833 ,        nan, 1.        ]])


## hypothesis testing

In [8]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import ChiSquareTest

data = [(0.0, Vectors.dense(0.5, 10.0, 100.0)),
        (0.0, Vectors.dense(1.5, 20.0, 100.0)),
        (1.0, Vectors.dense(1.5, 30.0, 200.0)),
        (0.0, Vectors.dense(3.5, 30.0, 100.0)),
        (0.0, Vectors.dense(3.5, 40.0, 100.0)),
        (1.0, Vectors.dense(3.5, 40.0, 200.0))]

df = spark.createDataFrame(data, ["label", "features"])

r = ChiSquareTest.test(df, "features", "label").head()

In [9]:
print("pValues: " + str(r.pValues))

pValues: [0.6872892787909721,0.6822703303362126,0.01430587843542952]


In [10]:
print("degreesOfFreedom: " + str(r.degreesOfFreedom))

degreesOfFreedom: [2, 3, 1]


In [11]:
print("statistics: " + str(r.statistics))

statistics: [0.75,1.5,6.000000000000001]


## estimator, transformer and param on LogisticRegression example

In [12]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression

In [13]:
# Prepare training data from a list of (label, features) tuples.
training = spark.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.0, -1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, -0.5]))], 
    ["label", "features"])

### estimator

In [14]:
# Create a LogisticRegression instance. This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01)

In [16]:
# Print out the parameters, documentation, and any default values.
print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

LogisticRegression parameters:
aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. The bou

In [17]:
# Learn a LogisticRegression model. This uses the parameters stored in lr.
model1 = lr.fit(training)

### a transformer

In [18]:
# Since model1 is a Model (i.e., a transformer produced by an Estimator),
# we can view the parameters it used during fit().
# This prints the parameter (name: value) pairs, where names are unique IDs for this
# LogisticRegression instance.
print("Model 1 was fit using parameters: ")
print(model1.extractParamMap())

Model 1 was fit using parameters: 
{Param(parent='LogisticRegression_27bde4b6537d', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2, Param(parent='LogisticRegression_27bde4b6537d', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0, Param(parent='LogisticRegression_27bde4b6537d', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto', Param(parent='LogisticRegression_27bde4b6537d', name='featuresCol', doc='features column name'): 'features', Param(parent='LogisticRegression_27bde4b6537d', name='fitIntercept', doc='whether to fit an intercept term'): True, Param(parent='LogisticRegression_27bde4b6537d', name='labelCol', doc='label column name'): 'label', Param(parent='LogisticRegression_27bde4b6537d', name='maxIter', doc='maximum nu

In [19]:
# We may alternatively specify parameters using a Python dictionary as a paramMap
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30  # Specify 1 Param, overwriting the original maxIter.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55})  # Specify multiple Params.

# You can combine paramMaps, which are python dictionaries.
paramMap2 = {lr.probabilityCol: "myProbability"}  # Change output column name
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2)

In [20]:
# Now learn a new model using the paramMapCombined parameters.
# paramMapCombined overrides all parameters set earlier via lr.set* methods.
model2 = lr.fit(training, paramMapCombined)
print("Model 2 was fit using parameters: ")
print(model2.extractParamMap())

Model 2 was fit using parameters: 
{Param(parent='LogisticRegression_27bde4b6537d', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2, Param(parent='LogisticRegression_27bde4b6537d', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0, Param(parent='LogisticRegression_27bde4b6537d', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto', Param(parent='LogisticRegression_27bde4b6537d', name='featuresCol', doc='features column name'): 'features', Param(parent='LogisticRegression_27bde4b6537d', name='fitIntercept', doc='whether to fit an intercept term'): True, Param(parent='LogisticRegression_27bde4b6537d', name='labelCol', doc='label column name'): 'label', Param(parent='LogisticRegression_27bde4b6537d', name='maxIter', doc='maximum nu

In [21]:
# Prepare test data
test = spark.createDataFrame([
    (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
    (0.0, Vectors.dense([3.0, 2.0, -0.1])),
    (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])

In [22]:
# Make predictions on test data using the Transformer.transform() method.
# LogisticRegression.transform will only use the 'features' column.
# Note that model2.transform() outputs a "myProbability" column instead of the usual
# 'probability' column since we renamed the lr.probabilityCol parameter previously.
prediction = model2.transform(test)
result = prediction.select("features", "label", "myProbability", "prediction") \
    .collect()

for row in result:
    print("features=%s, label=%s -> prob=%s, prediction=%s"
          % (row.features, row.label, row.myProbability, row.prediction))

features=[-1.0,1.5,1.3], label=1.0 -> prob=[0.05707304171033977,0.9429269582896603], prediction=1.0
features=[3.0,2.0,-0.1], label=0.0 -> prob=[0.9238522311704088,0.07614776882959128], prediction=0.0
features=[0.0,2.2,-1.5], label=1.0 -> prob=[0.10972776114779119,0.8902722388522087], prediction=1.0


## using pipelines

In [23]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

In [24]:
# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    ( 0,     "a b c d e spark",       1.0),
    ( 1,     "b d",                   0.0),
    ( 2,     "spark f g h",           1.0),
    ( 3,     "hadoop mapreduce",      0.0)
], ["id",    "text",                  "label"])

In [25]:
# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
tokenizer.getOutputCol()

'words'

In [26]:
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")

In [27]:
print(hashingTF.getOutputCol())
print(hashingTF.explainParams())

features
binary: If True, all non zero counts are set to 1. This is useful for discrete probabilistic models that model binary events rather than integer counts. Default False. (default: False)
inputCol: input column name. (current: words)
numFeatures: number of features. (default: 262144)
outputCol: output column name. (default: HashingTF_c6dbec469fbe__output, current: features)


In [28]:
lr = LogisticRegression(maxIter=10, regParam=0.001)

In [29]:
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

In [30]:
# Fit the pipeline to training documents.
model = pipeline.fit(training)

In [31]:
# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
    (  4,        "spark i j k"),
    (  5,        "l m n"),
    (  6,        "spark hadoop spark"),
    (  7,        "apache hadoop")
], ["id",        "text"])

In [32]:
# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)

selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

(4, spark i j k) --> prob=[0.1596407738787475,0.8403592261212525], prediction=1.000000
(5, l m n) --> prob=[0.8378325685476744,0.16216743145232562], prediction=0.000000
(6, spark hadoop spark) --> prob=[0.06926633132976037,0.9307336686702395], prediction=1.000000
(7, apache hadoop) --> prob=[0.9821575333444218,0.01784246665557808], prediction=0.000000


## feature extractors TD-IDF

Reference:<br>
https://spark.apache.org/docs/2.2.0/ml-features.html

In [33]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

In [37]:

# each line represent a document
sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat"),
    (1.0, "neat is regression with spark")
], ["label", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")

idfModel = idf.fit(featurizedData)

rescaledData = idfModel.transform(featurizedData)

In [38]:
rescaledData.select("label","words", "features").show(10, False)

+-----+------------------------------------------+---------------------------------------------------------------------------------------------------------------------+
|label|words                                     |features                                                                                                             |
+-----+------------------------------------------+---------------------------------------------------------------------------------------------------------------------+
|0.0  |[hi, i, heard, about, spark]              |(20,[0,5,9,17],[0.9162907318741551,0.5108256237659907,0.5108256237659907,1.8325814637483102])                        |
|0.0  |[i, wish, java, could, use, case, classes]|(20,[2,7,9,13,15],[0.9162907318741551,0.9162907318741551,1.5324768712979722,0.5108256237659907,0.22314355131420976]) |
|1.0  |[logistic, regression, models, are, neat] |(20,[4,6,13,15,18],[0.9162907318741551,0.5108256237659907,0.5108256237659907,0.22314355131420976,0.916290

## feature extractors Word2Vec

In [40]:
from pyspark.ml.feature import Word2Vec

# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])

# Learn a mapping from words to Vectors.
# we choose a vector size of 3
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

result = model.transform(documentDF)
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

Text: [Hi, I, heard, about, Spark] => 
Vector: [-0.02925943098962307,0.00024161161854863167,-0.0852911853697151]

Text: [I, wish, Java, could, use, case, classes] => 
Vector: [-0.027511230869484798,0.05495954630896449,-0.04135974070855549]

Text: [Logistic, regression, models, are, neat] => 
Vector: [-0.024284353759139777,0.002063886821269989,0.021670418605208398]



## tokenizer

In [41]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

+-----------------------------------+------------------------------------------+------+
|sentence                           |words                                     |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     |
|Logistic,regression,models,are,neat|[logistic,regression,models,are,neat]     |1     |
+-----------------------------------+------------------------------------------+------+

+-----------------------------------+------------------------------------------+------+
|sentence                           |words                                     |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case cla

## stop words

In [42]:
from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame([
    (0, ["I", "saw", "the", "red", "balloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])

remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)

+---+----------------------------+--------------------+
|id |raw                         |filtered            |
+---+----------------------------+--------------------+
|0  |[I, saw, the, red, balloon] |[saw, red, balloon] |
|1  |[Mary, had, a, little, lamb]|[Mary, little, lamb]|
+---+----------------------------+--------------------+



## PCA

In [7]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix

sc = spark.sparkContext

rows = sc.parallelize([
    Vectors.dense(1,   2,   3,   4,   5  ),
    Vectors.dense(2,   0,   3,   4,   5  ),
    Vectors.dense(4,   0,   0,   6,   7  )
])

In [18]:
type(rows)

pyspark.rdd.RDD

#### *computePrincipalComponent()* returns the rotation matrix 

In [21]:
mat = RowMatrix(rows)
# Compute the top 4 principal components.
# Principal components are stored in a local dense matrix.
pc = mat.computePrincipalComponents(2)


In [22]:
print(rows.collect())

[DenseVector([1.0, 2.0, 3.0, 4.0, 5.0]), DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]), DenseVector([4.0, 0.0, 0.0, 6.0, 7.0])]


In [56]:
print("mat type = ", type(mat))
print("pc type =", type(pc))
print(pc.numRows)
print(pc.numCols)
print(pc)

mat type =  <class 'pyspark.mllib.linalg.distributed.RowMatrix'>
pc type = <class 'pyspark.mllib.linalg.DenseMatrix'>
5
2
DenseMatrix([[-0.52342208,  0.22969979],
             [ 0.25947071, -0.87427577],
             [ 0.59053009,  0.31115713],
             [-0.39368673, -0.20743809],
             [-0.39368673, -0.20743809]])


In [53]:
# Project the rows to the linear space spanned by the top 2 principal components.
projected = mat.multiply(pc)

In [59]:
print(projected.numRows())
print(projected.numCols())

# at his point projected is a distributed data (RowMatrix)
print(type(projected))


3
2
<class 'pyspark.mllib.linalg.distributed.RowMatrix'>


NameError: name 'DistributedMatrix' is not defined

In [48]:
projected2 = RowMatrix(projected.rows)

In [49]:
print(type(projected2))

<class 'pyspark.mllib.linalg.distributed.RowMatrix'>


In [51]:
projected2.rows.collect()

[DenseVector([-1.7761, -2.4523]),
 DenseVector([-2.8184, -0.4741]),
 DenseVector([-7.2116, -1.7779])]

#### *computePrincipalComponentsAndExplainedVariance()*

In [57]:
pca = mat.computePrincipalComponentsAndExplainedVariance(2)

AttributeError: 'RowMatrix' object has no attribute 'computePrincipalComponentsAndExplainedVariance'