## ML pipeline Example

In [1]:
#Refer: https://spark.apache.org/docs/latest/ml-pipeline.html

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import CountVectorizer, Tokenizer
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate() # get the sparkSession if it exists or else create it

# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])



In [2]:
training

DataFrame[id: bigint, text: string, label: double]

In [3]:
# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.

#Refer: https://spark.apache.org/docs/latest/ml-features#tokenizer
tokenizer = Tokenizer(inputCol="text", outputCol="words")

#Refer: https://spark.apache.org/docs/latest/ml-features.html#countvectorizer
cv = CountVectorizer(inputCol=tokenizer.getOutputCol(), outputCol="features", minDF=2.0)
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, cv, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(training)

In [4]:
# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])


In [5]:
# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

(4, spark i j k) --> prob=[0.0066510916073173445,0.9933489083926826], prediction=1.000000
(5, l m n) --> prob=[0.9932186374602783,0.006781362539721725], prediction=0.000000
(6, spark hadoop spark) --> prob=[3.060935589437575e-07,0.9999996939064411], prediction=1.000000
(7, apache hadoop) --> prob=[0.9932186374602783,0.006781362539721725], prediction=0.000000


## Hyper-param tuning

In [6]:
#Download dataset from github
! wget https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_linear_regression_data.txt

--2021-04-03 17:32:23--  https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_linear_regression_data.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.111.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 119069 (116K) [text/plain]
Saving to: ‘sample_linear_regression_data.txt’


2021-04-03 17:32:27 (180 KB/s) - ‘sample_linear_regression_data.txt’ saved [119069/119069]



In [7]:
# Refer: https://spark.apache.org/docs/latest/ml-tuning.html

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

# Prepare training and test data.
data = spark.read.format("libsvm")\
    .load("./sample_linear_regression_data.txt")


train, test = data.randomSplit([0.9, 0.1], seed=12345)



In [8]:
lr = LinearRegression(maxIter=10)


In [9]:
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# TrainValidationSplit will try all combinations of values and determine best model using
# the evaluator.
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()


In [10]:
# In this case the estimator is simply the linear regression.
# A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(), 
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

In [11]:
# Run TrainValidationSplit, and choose the best set of parameters.
model = tvs.fit(train)

# Make predictions on test data. model is the model with combination of parameters
# that performed best.
model.transform(test)\
    .select("features", "label", "prediction")\
    .show()

+--------------------+--------------------+--------------------+
|            features|               label|          prediction|
+--------------------+--------------------+--------------------+
|(10,[0,1,2,3,4,5,...| -17.026492264209548| -1.6265106840933026|
|(10,[0,1,2,3,4,5,...|  -16.71909683360509|-0.01129960392982...|
|(10,[0,1,2,3,4,5,...| -15.375857723312297|  0.9008270143746643|
|(10,[0,1,2,3,4,5,...| -13.772441561702871|   3.435609049373433|
|(10,[0,1,2,3,4,5,...| -13.039928064104615|  0.3670260850771136|
|(10,[0,1,2,3,4,5,...|   -9.42898793151394|   -3.26399994121536|
|(10,[0,1,2,3,4,5,...|    -9.2679651250406| -0.1762581278405398|
|(10,[0,1,2,3,4,5,...|  -9.173693798406978| -0.2824541263038875|
|(10,[0,1,2,3,4,5,...| -7.1500991588127265|   3.087239142258043|
|(10,[0,1,2,3,4,5,...|  -6.930603551528371| 0.12389571117374062|
|(10,[0,1,2,3,4,5,...|  -6.456944198081549| -0.7275144195427645|
|(10,[0,1,2,3,4,5,...| -3.2843694575334834| -0.9048235164747517|
|(10,[0,1,2,3,4,5,...|   