In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285388 sha256=2e772a6c1e1174ad73e3184bc132cf3193977a5119eb6512e7c8ca5cc2edf91b
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [None]:
from pyspark.sql import SparkSession

# Create a Spark Session
spark = SparkSession.builder\
        .master("local[*]")\
        .appName("ML Pipelines Tutorial")\
        .getOrCreate()

spark

In [None]:
spark

# **ML Pipelines API**
* [ML Pipelines](https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html#pipeline-apis) is a set of high-level APIs built on top of [Spark DataFrames](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html) to help you organize machine learning pipelines.
* An ML Pipeline is composed of a series of stages inlcuding Transformers and Estimators.
* This tutorial is based on [Learning Spark, 2nd Edition Book](https://www.oreilly.com/library/view/learning-spark-2nd/9781492050032/) (Chapter 10), and [Spark Programming Guide: ML Pipelines](https://spark.apache.org/docs/latest/ml-pipeline.html).

# Dataset
* You will use the San Francisco housing data set from [Inside
Airbnb](http://insideairbnb.com/get-the-data/).

* A cleaned version of this data is provided in databricks-datasets: `sf-airbnb-clean.parquet`

* The goal is to build a model to predict the price per night for a rental property.

* This is a regression problem, because price is a continuous variable.



In [None]:
# set the file path depending on your environment
filePath = "/content/drive/MyDrive/sample_data/learning-spark-v2/sf-airbnb/sf-airbnb-clean.parquet"

In [None]:
airbnbDF = spark.read.parquet(dataPath)

In [None]:
airbnbDF.printSchema()

root
 |-- host_is_superhost: string (nullable = true)
 |-- cancellation_policy: string (nullable = true)
 |-- instant_bookable: string (nullable = true)
 |-- host_total_listings_count: double (nullable = true)
 |-- neighbourhood_cleansed: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- property_type: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- accommodates: double (nullable = true)
 |-- bathrooms: double (nullable = true)
 |-- bedrooms: double (nullable = true)
 |-- beds: double (nullable = true)
 |-- bed_type: string (nullable = true)
 |-- minimum_nights: double (nullable = true)
 |-- number_of_reviews: double (nullable = true)
 |-- review_scores_rating: double (nullable = true)
 |-- review_scores_accuracy: double (nullable = true)
 |-- review_scores_cleanliness: double (nullable = true)
 |-- review_scores_checkin: double (nullable = true)
 |-- review_scores_communication: double (nullable = true

In [None]:
airbnbDF.show(5, truncate=False)

+-----------------+---------------------------+----------------+-------------------------+----------------------+--------+----------+-------------+---------------+------------+---------+--------+----+--------+--------------+-----------------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+-----+-----------+------------+-------+-----------------------+-------------------------+----------------------------+------------------------+------------------------------+-------------------------+----------------------+
|host_is_superhost|cancellation_policy        |instant_bookable|host_total_listings_count|neighbourhood_cleansed|latitude|longitude |property_type|room_type      |accommodates|bathrooms|bedrooms|beds|bed_type|minimum_nights|number_of_reviews|review_scores_rating|review_scores_accuracy|review_scores_cleanliness|review_scores_checkin|review_scores_communication|review_sc

# Creating Training and Test Data Sets

* [randomSplit](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.randomSplit.html):  randomly split a DataFrame into two sets: for instance, training (80%) and test (20%).

In [None]:
trainDF, testDF = airbnbDF.randomSplit([.8, .2], seed=42)

print(f"""There are {trainDF.count()} rows in the training set, and {testDF.count()} in the test set""")

There are 5780 rows in the training set, and 1366 in the test set


* Would the output of the split be the same in case you repartition your DataFrame?



#Preparing Features with Transformers
* Linear regression requires that all the input features are contained within a single vector in your
DataFrame.

* [VectorAssembler](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html): a Transformer that merges multiple columns into a vector column. Use it to put all of the features into a single vector.




In [None]:
from pyspark.ml.feature import VectorAssembler

#create the transformer
#inputCols: as an example, we only add some columns (should be numeric)
vecAssembler = VectorAssembler(inputCols=["accommodates","bathrooms","bedrooms","beds","minimum_nights"],
                               outputCol="features")

#call the transform method, and select only the features column & the target column (price)
vecTrainDF = vecAssembler.transform(trainDF).select("features", "price")

vecTrainDF.show(5, False)

+-----------------------+-----+
|features               |price|
+-----------------------+-----+
|[2.0,1.0,1.0,1.0,1.0]  |200.0|
|[3.0,1.0,1.0,1.0,90.0] |130.0|
|[4.0,1.0,1.0,3.0,1.0]  |95.0 |
|[2.0,1.0,1.0,1.0,180.0]|250.0|
|[6.0,3.0,3.0,3.0,30.0] |250.0|
+-----------------------+-----+
only showing top 5 rows



In [None]:
#you can see the features column is represented as a vector
vecTrainDF.take(5)

[Row(features=DenseVector([2.0, 1.0, 1.0, 1.0, 1.0]), price=200.0),
 Row(features=DenseVector([3.0, 1.0, 1.0, 1.0, 90.0]), price=130.0),
 Row(features=DenseVector([4.0, 1.0, 1.0, 3.0, 1.0]), price=95.0),
 Row(features=DenseVector([2.0, 1.0, 1.0, 1.0, 180.0]), price=250.0),
 Row(features=DenseVector([6.0, 3.0, 3.0, 3.0, 30.0]), price=250.0)]

# Using Estimators to Build Models

* [LinearRegression](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.regression.LinearRegression.html) is
a type of estimator—it takes in a `DataFrame` and returns a [LinearRegressionModel](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.regression.LinearRegressionModel.html).

In [None]:
from pyspark.ml.regression import LinearRegression

lr1 = LinearRegression(featuresCol="features", labelCol="price")

lrModel1 = lr1.fit(vecTrainDF)
lrModel1

LinearRegressionModel: uid=LinearRegression_ad66f3cd1a8b, numFeatures=5

* The output of an estimator's fit() method is a transformer. Once the
estimator has learned the parameters, the transformer can apply these parameters to
new data points to generate predictions.

In [None]:
# the learned parameters
print("coefficients:", lrModel1.coefficients)
print("intercept:", lrModel1.intercept)

coefficients: [41.08785094623204,12.413821017644553,64.34316258394297,-10.377255092373604,-0.7445442235199358]
intercept: 9.076028961598917


# Creating a Pipeline

* A [`Pipeline`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.Pipeline.html?highlight=pipeline#pyspark.ml.Pipeline) is an estimator, whereas a [`PipelineModel`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.PipelineModel.html?highlight=pipelinemodel#pyspark.ml.PipelineModel) —fitted Pipelines— is a transformer.

* You specify the stages you want your data to
pass through, in order, and Spark takes care of the processing for you.


In [None]:
from pyspark.ml import Pipeline

lrPipeline1 = Pipeline(stages=[vecAssembler, lr1])

lrPipelineModel1 = lrPipeline1.fit(trainDF)

* lrPipelineModel is a transformer. You can now apply it to your test data:

In [None]:
lrPredDF1 = lrPipelineModel1.transform(testDF)

In [None]:
#two new columns should appear: features vector and predictions
lrPredDF1.show(5, truncate=False)

+-----------------+-------------------+----------------+-------------------------+----------------------+--------+----------+-------------+---------------+------------+---------+--------+----+--------+--------------+-----------------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+-----+-----------+------------+-------+-----------------------+-------------------------+----------------------------+------------------------+------------------------------+-------------------------+----------------------+----------------------+------------------+
|host_is_superhost|cancellation_policy|instant_bookable|host_total_listings_count|neighbourhood_cleansed|latitude|longitude |property_type|room_type      |accommodates|bathrooms|bedrooms|beds|bed_type|minimum_nights|number_of_reviews|review_scores_rating|review_scores_accuracy|review_scores_cleanliness|review_scores_checkin|review_scor

In [None]:
#you may select only the columns related to your case
lrPredDF1.select("features", "price", "prediction").show(5, truncate=False)

+----------------------+-----+------------------+
|features              |price|prediction        |
+----------------------+-----+------------------+
|[2.0,1.0,1.0,1.0,2.0] |85.0 |156.14237091623704|
|[1.0,1.0,1.0,1.0,31.0]|45.0 |93.46273748792687 |
|[1.0,1.0,1.0,1.0,30.0]|70.0 |94.20728171144681 |
|[1.0,1.0,1.0,1.0,30.0]|128.0|94.20728171144681 |
|[1.0,1.0,1.0,1.0,1.0] |159.0|115.79906419352494|
+----------------------+-----+------------------+
only showing top 5 rows



# One-hot encoding

* To convert categorical values into numeric values, you can use a technique called one-hot encoding (OHE).

* A common approach to do one-hot encoding in Spark is
to use [StringIndexer](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.StringIndexer.html) and [OneHotEncoder](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.OneHotEncoder.html).
* The first step is to
apply the StringIndexer estimator to convert categorical values into category indices.
These category indices are ordered by label frequencies, so the most frequent
label gets index 0, which provides us with reproducible results across various runs of
the same data.
* Then, you pass those category indices as input to the
OneHotEncoder which maps a column of category indices to a column of binary vectors.
* Spark internally uses a [SparseVector](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.linalg.SparseVector.html) when
the majority of the entries are 0, as is often the case after OHE, so it does not waste
space storing 0 values.

In [None]:
from pyspark.ml.feature import  StringIndexer, OneHotEncoder

#get list of columns with string data type
categoricalCols = [field for (field, dataType) in trainDF.dtypes
                  if dataType == "string"]
categoricalCols

['host_is_superhost',
 'cancellation_policy',
 'instant_bookable',
 'neighbourhood_cleansed',
 'property_type',
 'room_type',
 'bed_type']

In [None]:
#names for StringIndexer's output columns
indexOutputCols = [x + "Index" for x in categoricalCols]

#create your StringIndexer
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid="skip")

* `handleInvalid`: specifies how you want to handle new categories that may appear in the test data set.

* The options are `skip` (filter out rows with invalid data), `error` (throw an error), or `keep` (put invalid
data in a special additional bucket, at index `numLabels`).

In [None]:
#names for OneHotEncoder's output columns
oheOutputCols = [x + "OHE" for x in categoricalCols]

#create your OneHotEncoder
oheEncoder = OneHotEncoder(inputCols=indexOutputCols, outputCols=oheOutputCols)

In [None]:
#get list of numeric columns
numericCols = [field for (field, dataType) in trainDF.dtypes
              if ((dataType == "double") & (field != "price"))]

assemblerInputs = oheOutputCols + numericCols

assemblerInputs

['host_is_superhostOHE',
 'cancellation_policyOHE',
 'instant_bookableOHE',
 'neighbourhood_cleansedOHE',
 'property_typeOHE',
 'room_typeOHE',
 'bed_typeOHE',
 'host_total_listings_count',
 'latitude',
 'longitude',
 'accommodates',
 'bathrooms',
 'bedrooms',
 'beds',
 'minimum_nights',
 'number_of_reviews',
 'review_scores_rating',
 'review_scores_accuracy',
 'review_scores_cleanliness',
 'review_scores_checkin',
 'review_scores_communication',
 'review_scores_location',
 'review_scores_value',
 'bedrooms_na',
 'bathrooms_na',
 'beds_na',
 'review_scores_rating_na',
 'review_scores_accuracy_na',
 'review_scores_cleanliness_na',
 'review_scores_checkin_na',
 'review_scores_communication_na',
 'review_scores_location_na',
 'review_scores_value_na']

In [None]:
#create your vectorAssembler with the new list of numeric input columns
vecAssembler = VectorAssembler(inputCols = assemblerInputs, outputCol = "features")

#Now you can add a linear regression model using all of the features as input.
lr2 = LinearRegression(labelCol="price", featuresCol="features")

#create a pipeline with the four stages
lrPipeline2 = Pipeline(stages = [stringIndexer, oheEncoder, vecAssembler, lr2])

lrPipelineModel2 = lrPipeline2.fit(trainDF)

predDF2 = lrPipelineModel2.transform(testDF)
predDF2.select("features", "price", "prediction").show(5, False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------------------+
|features                                                                                                                                                                                                                            |price|prediction        |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------------------+
|(98,[0,3,6,22,43,66,68,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,37.72001,-122.39249,2.0,1.0,1.0,1.0,2.0,128.0,97.0,10.0,10.0,10.0,10.0,9.0,10.0])                                          |85

* As you can see, the features column is represented as a SparseVector. There are 98
features after one-hot encoding, followed by the nonzero indices and then the values
themselves.

# Evaluating Models
* [RegressionEvaluator](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.evaluation.RegressionEvaluator.html)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

regressionEvaluator = RegressionEvaluator(predictionCol="prediction",
                                          labelCol="price",
                                          metricName="rmse")

rmse = regressionEvaluator.evaluate(predDF2)
rmse

220.6659096945873

In [None]:
# use another metric with the same evaluator: R-squared
r2_metric = regressionEvaluator.setMetricName("r2").evaluate(predDF2)
r2_metric

0.15965119209930734

# Saving and Loading Pipeline Models


In [None]:
pipelinePath = "/tmp/lrPipelineModel2"
lrPipelineModel2.save(pipelinePath)

* You can later load this model and use it to predict on new data again.

In [None]:
from pyspark.ml import PipelineModel
savedPipelineModel = PipelineModel.load(pipelinePath)

In [None]:
savedPipelineModel.stages

[StringIndexerModel: uid=StringIndexer_6189856236c8, handleInvalid=skip, numInputCols=7, numOutputCols=7,
 OneHotEncoderModel: uid=OneHotEncoder_57c14dabf4fb, dropLast=true, handleInvalid=error, numInputCols=7, numOutputCols=7,
 VectorAssembler_f5b4b547a894,
 LinearRegressionModel: uid=LinearRegression_c90678311099, numFeatures=98]

# Pipeline with Decision Trees
* [DecisionTreeRegressor](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.DecisionTreeRegressor.html)
* [DecisionTreeRegressionModel](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.DecisionTreeRegressionModel.html)


In [None]:
from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor(labelCol="price")

# Filter for just numeric columns (and exclude price, our label)
numericCols = [field for (field, dataType) in trainDF.dtypes
              if ((dataType == "double") & (field != "price"))]

# Combine output of StringIndexer defined above and numeric columns
assemblerInputs = indexOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

In [None]:
# Create pipeline
pipeline = Pipeline(stages=[stringIndexer, vecAssembler, dt])

#fit the pipeline
pipelineModel = pipeline.fit(trainDF) # This line should error

In [None]:
# in this example, we reset the maxbin to solve the issue
dt.setMaxBins(40)
pipelineModel = pipeline.fit(trainDF)

In [None]:
#now, we can extract the if-then-else rules learned by the decision tree
dtModel = pipelineModel.stages[-1]
print(dtModel.toDebugString)

'DecisionTreeRegressionModel: uid=DecisionTreeRegressor_6ea45801baec, depth=5, numNodes=47, numFeatures=33\n  If (feature 12 <= 2.5)\n   If (feature 12 <= 1.5)\n    If (feature 5 in {1.0,2.0})\n     If (feature 4 in {0.0,1.0,3.0,5.0,9.0,10.0,11.0,13.0,14.0,16.0,18.0,24.0})\n      If (feature 3 in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0,20.0,21.0,23.0,24.0,25.0,26.0,27.0,28.0,29.0,30.0,31.0,32.0,33.0,34.0})\n       Predict: 104.23992784125075\n      Else (feature 3 not in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0,20.0,21.0,23.0,24.0,25.0,26.0,27.0,28.0,29.0,30.0,31.0,32.0,33.0,34.0})\n       Predict: 250.7111111111111\n     Else (feature 4 not in {0.0,1.0,3.0,5.0,9.0,10.0,11.0,13.0,14.0,16.0,18.0,24.0})\n      If (feature 3 in {0.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0,20.0,21.0,22.0,23.0,27.0,33.0,35.0})\n       Predict: 151.94179894179894\n   

In [None]:
#feature importance scores
import pandas as pd

featureImp = pd.DataFrame(
list(zip(vecAssembler.getInputCols(), dtModel.featureImportances)),columns=["feature", "importance"])

featureImp.sort_values(by="importance", ascending=False)

# Pipeline with Random Forests
* [RandomForestRegressor](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.RandomForestRegressor.html)
* [DecisionTreeRegressionModel](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.DecisionTreeRegressionModel.html)

In [None]:
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(labelCol="price", maxBins=40, seed=42)

rfPipeline = Pipeline(stages = [stringIndexer, vecAssembler, rf])

rfPipelineModel = rfPipeline.fit(trainDF)

rfModel = rfPipelineModel.stages[-1]

In [None]:
rfModel.getNumTrees

20

In [None]:
print(rfModel.trees)

DecisionTreeRegressionModel: uid=dtr_e06d844f60ca, depth=5, numNodes=61, numFeatures=33


In [None]:
print(rfModel.toDebugString)

# Tuning
* [pyspark.ml.tuning](https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html#tuning): tuning ML algorithms and Pipelines.

* This includes cross-validation and other tools for hyperparameter tuning and model selection  as described in the [ML Tuning Official Guide](https://spark.apache.org/docs/latest/ml-tuning.html).

* To perform a hyperparameter search in Spark, you need to: (1) define the estimator you want to evaluate, (2) specify which hyperparameters you want to vary, as well as their respective values, and (3) define an evaluator to specify which metric to use to compare the various
models and determine which one performed best.
* In this example, we take the RandomForest pipeline define dbefore as an estimator and we vary two parameters: maxDepth to be 2, 4, or 6 and [numTrees](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.RandomForestClassificationModel.html) to be 10 or 100.

* This will give us a grid of 6 (3
x 2) different hyperparameter configurations in total:

(maxDepth=2, numTrees=10)

(maxDepth=2, numTrees=100)

(maxDepth=4, numTrees=10)

(maxDepth=4, numTrees=100)

(maxDepth=6, numTrees=10)

(maxDepth=6, numTrees=100)


## ParamGridBuilder
*  [ParamGridBuilder](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.ParamGridBuilder.html): Specify which hyperparameters to tune, as well as their respective values.

In [None]:
from pyspark.ml.tuning import ParamGridBuilder

paramGrid = (ParamGridBuilder()
            .addGrid(rf.maxDepth, [2, 4, 6])
            .addGrid(rf.numTrees, [10, 100])
            .build())

## CrossValidator


* [CrossValidator](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.CrossValidator.html): perform k-fold cross-validation, evaluating each of the various
models.
* [CrossValidatorModel](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.CrossValidatorModel.html)

In [None]:
from pyspark.ml.tuning import CrossValidator

cv = CrossValidator(estimator=rfPipeline,
                    evaluator=regressionEvaluator,
                    estimatorParamMaps=paramGrid,
                    numFolds=3,
                    seed=42)

cvModel = cv.fit(trainDF)


* Spark retrains your model on the entire training data set once it has
identified the optimal hyperparameter configuration



In [None]:
#To inspect the results of the cross-validator, you can take a look at the avgMetrics:
cvModel.avgMetrics

[0.1558541951669324,
 0.18705504489990407,
 0.1793619183973377,
 0.22691576438617903,
 0.1320026456261747,
 0.2489339449458354]


## Optimzing Pipelines
* [CrossValidator.parallelism](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.CrossValidator.html#pyspark.ml.tuning.CrossValidator.parallelism): determine the number of models to train in parallel.


In [None]:
cvModel = cv.setParallelism(4).fit(trainDF)

* There's another trick we can use to speed up model training:
putting the cross-validator inside the pipeline (e.g., `Pipeline(stages=[..., cv])`
instead of putting the pipeline inside the cross-validator (e.g., `CrossValidator(esti
mator=pipeline, ...)`). As a result, StringIndexer (or any other estimator/transformer) will not be reevaluated each time a different model is trained during the cross validation process.

In [None]:
cv = CrossValidator(estimator=rf,
                    evaluator=regressionEvaluator,
                    estimatorParamMaps=paramGrid,
                    numFolds=3,
                    parallelism=4,
                    seed=42)

pipeline = Pipeline(stages=[stringIndexer, vecAssembler, cv])

pipelineModel = pipeline.fit(trainDF)