In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Hyper Parameter Tuning

A short revisit on Hyper Parameter Tuning (first described in Section 4) with an example applied to a LogisticRegression Pipeline.


In [2]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Prepare training documents, which are labeled.
training_data = [
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0),
    (4, "b spark who", 1.0),
    (5, "g d a y", 0.0),
    (6, "spark fly", 1.0),
    (7, "was mapreduce", 0.0),
    (8, "e spark program", 1.0),
    (9, "a e c l", 0.0),
    (10, "spark compile", 1.0),
    (11, "hadoop software", 0.0),
]
training = spark.createDataFrame(training_data, ["id", "text", "label"]).cache()
print("Dataset used for training (labeled):")
training.show()

# Prepare test documents, which are unlabeled.
test_data = [
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "mapreduce spark"),
    (7, "apache hadoop"),
]
test = spark.createDataFrame(test_data, ["id", "text"],).cache()
print("Dataset used for testing (unlabeled):")
test.show()

Dataset used for training (labeled):
+---+----------------+-----+
| id|            text|label|
+---+----------------+-----+
|  0| a b c d e spark|  1.0|
|  1|             b d|  0.0|
|  2|     spark f g h|  1.0|
|  3|hadoop mapreduce|  0.0|
|  4|     b spark who|  1.0|
|  5|         g d a y|  0.0|
|  6|       spark fly|  1.0|
|  7|   was mapreduce|  0.0|
|  8| e spark program|  1.0|
|  9|         a e c l|  0.0|
| 10|   spark compile|  1.0|
| 11| hadoop software|  0.0|
+---+----------------+-----+

Dataset used for testing (unlabeled):
+---+---------------+
| id|           text|
+---+---------------+
|  4|    spark i j k|
|  5|          l m n|
|  6|mapreduce spark|
|  7|  apache hadoop|
+---+---------------+



In [3]:
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
paramGrid = (
    ParamGridBuilder()
    .addGrid(hashingTF.numFeatures, [10, 100, 1000])
    .addGrid(lr.regParam, [0.1, 0.01])
    .build()
)

crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=BinaryClassificationEvaluator(),
    numFolds=2,
)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)

# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
selected.show(100, False)

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 52498)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/socketserver.py", line 316, in _ha

Py4JError: An error occurred while calling o172.fit

In [19]:
training.show()

+---+----------------+-----+
| id|            text|label|
+---+----------------+-----+
|  0| a b c d e spark|  1.0|
|  1|             b d|  0.0|
|  2|     spark f g h|  1.0|
|  3|hadoop mapreduce|  0.0|
|  4|     b spark who|  1.0|
|  5|         g d a y|  0.0|
|  6|       spark fly|  1.0|
|  7|   was mapreduce|  0.0|
|  8| e spark program|  1.0|
|  9|         a e c l|  0.0|
| 10|   spark compile|  1.0|
| 11| hadoop software|  0.0|
+---+----------------+-----+

