# ML Pipelines

Content in this notebook are referenced from https://spark.apache.org/docs/latest/ml-pipeline.html#main-concepts-in-pipelines.

## Setting up Spark Session

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("MLPipelines")\
    .getOrCreate()

22/01/06 22:04:49 WARN Utils: Your hostname, Winsons-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.77 instead (on interface en0)
22/01/06 22:04:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/01/06 22:04:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Example: Estimator, Transformer, and Param

In [5]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression

In [6]:
# Prepare training data from a list of (lable, features) tuples.
training = spark.createDataFrame(
    [
        (1.0, Vectors.dense([0.0, 1.1, 0.1])),
        (0.0, Vectors.dense([2.0, 1.0, -1.0])),
        (0.0, Vectors.dense([2.0, 1.3, 1.0])),
        (1.0, Vectors.dense([0.0, 1.2, -0.5]))
    ],
    ['label', 'features']
)
training.show()

[Stage 0:>                                                          (0 + 1) / 1]

+-----+--------------+
|label|      features|
+-----+--------------+
|  1.0| [0.0,1.1,0.1]|
|  0.0|[2.0,1.0,-1.0]|
|  0.0| [2.0,1.3,1.0]|
|  1.0|[0.0,1.2,-0.5]|
+-----+--------------+



                                                                                

In [8]:
# Create a LogisticRegression instance. This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01)

# Print out the parameters, documentation, and any default values.
print(f"LogisticRegression parameters:\n{lr.explainParams()}")

LogisticRegression parameters:
aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. The bou

In [10]:
# Fit a LogisticRegression model. This uses the parameters stored in lr.
model_1 = lr.fit(training)

22/01/06 22:05:43 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/01/06 22:05:43 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


In [11]:
# Since model_1 is a Model (i.e., a transformer produced by an Estimator), we can view the parameters it used during 
# fit().
# This prints the parameters (name: value) pairs, where names are unique IDs for this LogisticRegression instance.
print("model_1 was fit using these parameters:")
print(model_1.extractParamMap())

model_1 was fit using these parameters:
{Param(parent='LogisticRegression_38b38b744430', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2, Param(parent='LogisticRegression_38b38b744430', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0, Param(parent='LogisticRegression_38b38b744430', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto', Param(parent='LogisticRegression_38b38b744430', name='featuresCol', doc='features column name.'): 'features', Param(parent='LogisticRegression_38b38b744430', name='fitIntercept', doc='whether to fit an intercept term.'): True, Param(parent='LogisticRegression_38b38b744430', name='labelCol', doc='label column name.'): 'label', Param(parent='LogisticRegression_38b38b744430', name='maxBlockSizeInMB

In [12]:
# We may alternatively specify parameters using a Python dictionary as a paramMap
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30  # Specify 1 Param, overwriting the original maxIter

# Specify multiple Params
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55})  # type: ignore

# You can combine paramMaps, which are python dictionaries
# Change output column name
paramMap2 = {lr.probabilityCol: 'myProbability'}
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2)

# Now learn a new model using the paramMapCombined parameters.
# paramMapCombined overrides all parameters set earlier via lr.set* methods.
model_2 = lr.fit(training, paramMapCombined)
print("model_2 was fit using these parameters:")
print(model_2.extractParamMap())

model_2 was fit using these parameters:
{Param(parent='LogisticRegression_38b38b744430', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2, Param(parent='LogisticRegression_38b38b744430', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0, Param(parent='LogisticRegression_38b38b744430', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto', Param(parent='LogisticRegression_38b38b744430', name='featuresCol', doc='features column name.'): 'features', Param(parent='LogisticRegression_38b38b744430', name='fitIntercept', doc='whether to fit an intercept term.'): True, Param(parent='LogisticRegression_38b38b744430', name='labelCol', doc='label column name.'): 'label', Param(parent='LogisticRegression_38b38b744430', name='maxBlockSizeInMB

In [14]:
# Prepare test data
test = spark.createDataFrame(
    [(1.0, Vectors.dense([-1.0, 1.5, 1.3])),
     (0.0, Vectors.dense([3.0, 2.0, -0.1])),
     (1.0, Vectors.dense([0.0, 2.2, -1.5]))],
    ['label', 'features']
)
test.show()

+-----+--------------+
|label|      features|
+-----+--------------+
|  1.0|[-1.0,1.5,1.3]|
|  0.0|[3.0,2.0,-0.1]|
|  1.0|[0.0,2.2,-1.5]|
+-----+--------------+



In [16]:
# Make predictions on the test data using the Transformer.transform() method.
# LogisticRegression.transform will only use the 'features' column.
# Note that model_2.transform() outputs a 'myProbability' column instead of the usual 'probability' column
# since we renamed the lr.probabilityCol parameter previously.
prediction = model_2.transform(test)
result = prediction.select('features', 'label', 'myProbability', 'prediction')\
    .collect()

print("result looks like this:")
print(result)

print("After tidying up, it looks like this:")
for row in result:
    print(f"features={row.features}, label={row.label}, myProbability={row.myProbability}, prediction={row.prediction}")

result looks like this:
[Row(features=DenseVector([-1.0, 1.5, 1.3]), label=1.0, myProbability=DenseVector([0.0571, 0.9429]), prediction=1.0), Row(features=DenseVector([3.0, 2.0, -0.1]), label=0.0, myProbability=DenseVector([0.9239, 0.0761]), prediction=0.0), Row(features=DenseVector([0.0, 2.2, -1.5]), label=1.0, myProbability=DenseVector([0.1097, 0.8903]), prediction=1.0)]
After tidying up, it looks like this:
features=[-1.0,1.5,1.3], label=1.0, myProbability=[0.0570730499357254,0.9429269500642746], prediction=1.0
features=[3.0,2.0,-0.1], label=0.0, myProbability=[0.9238521956443227,0.07614780435567725], prediction=0.0
features=[0.0,2.2,-1.5], label=1.0, myProbability=[0.10972780286187782,0.8902721971381222], prediction=1.0


## Example: Pipeline

In [17]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

In [18]:
# Prepare training documents from a list of (id, text, label) tuples
training = spark.createDataFrame(
    [(0, "a b c d e spark", 1.0),
     (1, "b d", 0.0),
     (2, "spark f g h", 1.0),
     (3, "hadoop mapreduce", 0.0)], 
    ["id", "text", "label"]
)
training.show()

+---+----------------+-----+
| id|            text|label|
+---+----------------+-----+
|  0| a b c d e spark|  1.0|
|  1|             b d|  0.0|
|  2|     spark f g h|  1.0|
|  3|hadoop mapreduce|  0.0|
+---+----------------+-----+



In [19]:
# Configure ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol='text', outputCol='words')
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol='features')
lr = LogisticRegression(maxIter=10, regParam=0.001)

pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents
model = pipeline.fit(training)

22/01/06 22:16:31 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/01/06 22:16:31 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


In [20]:
# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame(
    [(4, "spark i j k"),
     (5, "l m n"),
     (6, "spark hadoop spark"),
     (7, "apache hadoop")],
    ['id', 'text']
)
test.show()

+---+------------------+
| id|              text|
+---+------------------+
|  4|       spark i j k|
|  5|             l m n|
|  6|spark hadoop spark|
|  7|     apache hadoop|
+---+------------------+



In [25]:
# Make predictions on test documents and print columns of interest
prediction = model.transform(test)
result = prediction.select('id', 'text', 'probability', 'prediction')
for row in result.collect():
    rid, text, prob, prediction = row
    print(f"({rid}, {text}) --> prob={prob}, prediction={prediction}")

(4, spark i j k) --> prob=[0.6292098489668488,0.37079015103315116], prediction=0.0
(5, l m n) --> prob=[0.984770006762304,0.015229993237696027], prediction=0.0
(6, spark hadoop spark) --> prob=[0.13412348342566147,0.8658765165743385], prediction=1.0
(7, apache hadoop) --> prob=[0.9955732114398529,0.00442678856014711], prediction=0.0


# Stop Spark Cluster

In [26]:
spark.stop()