In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("test") \
    .config("spark.some.config.option", "some-value").getOrCreate()
sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/23 07:30:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
filePath = """/Users/chongbei/Music/LearningSparkV2/databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb-clean.parquet/"""
airbnbDF = spark.read.parquet(filePath)
airbnbDF.select("neighbourhood_cleansed", "room_type", "bedrooms", "bathrooms",
                "number_of_reviews", "price").show(5)

+----------------------+---------------+--------+---------+-----------------+-----+
|neighbourhood_cleansed|      room_type|bedrooms|bathrooms|number_of_reviews|price|
+----------------------+---------------+--------+---------+-----------------+-----+
|      Western Addition|Entire home/apt|     1.0|      1.0|            180.0|170.0|
|        Bernal Heights|Entire home/apt|     2.0|      1.0|            111.0|235.0|
|        Haight Ashbury|   Private room|     1.0|      4.0|             17.0| 65.0|
|        Haight Ashbury|   Private room|     1.0|      4.0|              8.0| 65.0|
|      Western Addition|Entire home/apt|     2.0|      1.5|             27.0|785.0|
+----------------------+---------------+--------+---------+-----------------+-----+
only showing top 5 rows



In [3]:
trainDF, testDF = airbnbDF.randomSplit([.8, .2], seed=42) 
print(f"""There are {trainDF.count()} rows in the training set, and {testDF.count()} in the test set""")

24/02/23 07:30:39 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


There are 5780 rows in the training set, and 1366 in the test set


In [4]:
airbnbDF.printSchema()

root
 |-- host_is_superhost: string (nullable = true)
 |-- cancellation_policy: string (nullable = true)
 |-- instant_bookable: string (nullable = true)
 |-- host_total_listings_count: double (nullable = true)
 |-- neighbourhood_cleansed: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- property_type: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- accommodates: double (nullable = true)
 |-- bathrooms: double (nullable = true)
 |-- bedrooms: double (nullable = true)
 |-- beds: double (nullable = true)
 |-- bed_type: string (nullable = true)
 |-- minimum_nights: double (nullable = true)
 |-- number_of_reviews: double (nullable = true)
 |-- review_scores_rating: double (nullable = true)
 |-- review_scores_accuracy: double (nullable = true)
 |-- review_scores_cleanliness: double (nullable = true)
 |-- review_scores_checkin: double (nullable = true)
 |-- review_scores_communication: double (nullable = true

In [5]:
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=["bedrooms"], outputCol="features") 
vecTrainDF = vecAssembler.transform(trainDF)
vecTrainDF.select("bedrooms", "features", "price").show(10)

+--------+--------+-----+
|bedrooms|features|price|
+--------+--------+-----+
|     1.0|   [1.0]|200.0|
|     1.0|   [1.0]|130.0|
|     1.0|   [1.0]| 95.0|
|     1.0|   [1.0]|250.0|
|     3.0|   [3.0]|250.0|
|     1.0|   [1.0]|115.0|
|     1.0|   [1.0]|105.0|
|     1.0|   [1.0]| 86.0|
|     1.0|   [1.0]|100.0|
|     2.0|   [2.0]|220.0|
+--------+--------+-----+
only showing top 10 rows



In [6]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol="features", labelCol="price")
lrModel = lr.fit(vecTrainDF)

24/02/23 07:30:48 WARN Instrumentation: [466f728b] regParam is zero, which might cause numerical instability and overfitting.
24/02/23 07:30:48 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/02/23 07:30:48 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [7]:
m = round(lrModel.coefficients[0], 2)
b = round(lrModel.intercept, 2)
print(f"""The formula for the linear regression line is price = {m}*bedrooms + {b}""")

The formula for the linear regression line is price = 123.68*bedrooms + 47.51


In [8]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vecAssembler, lr]) 
pipelineModel = pipeline.fit(trainDF)

24/02/23 07:30:53 WARN Instrumentation: [9767276e] regParam is zero, which might cause numerical instability and overfitting.


In [9]:
predDF = pipelineModel.transform(testDF)
predDF.select("bedrooms", "features", "price", "prediction").show(10)

+--------+--------+------+------------------+
|bedrooms|features| price|        prediction|
+--------+--------+------+------------------+
|     1.0|   [1.0]|  85.0|171.18598011578285|
|     1.0|   [1.0]|  45.0|171.18598011578285|
|     1.0|   [1.0]|  70.0|171.18598011578285|
|     1.0|   [1.0]| 128.0|171.18598011578285|
|     1.0|   [1.0]| 159.0|171.18598011578285|
|     2.0|   [2.0]| 250.0|294.86172649777757|
|     1.0|   [1.0]|  99.0|171.18598011578285|
|     1.0|   [1.0]|  95.0|171.18598011578285|
|     1.0|   [1.0]| 100.0|171.18598011578285|
|     1.0|   [1.0]|2010.0|171.18598011578285|
+--------+--------+------+------------------+
only showing top 10 rows



In [10]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]
indexOutputCols = [x + "Index" for x in categoricalCols]
oheOutputCols = [x + "OHE" for x in categoricalCols]
stringIndexer = StringIndexer(inputCols=categoricalCols,
                              outputCols=indexOutputCols,
                              handleInvalid="skip")
oheEncoder = OneHotEncoder(inputCols=indexOutputCols,
                           outputCols=oheOutputCols)
numericCols = [field for (field, dataType) in trainDF.dtypes
               if ((dataType == "double") & (field != "price"))]
assemblerInputs = oheOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs,
                               outputCol="features")

In [11]:
from pyspark.ml.feature import RFormula

rFormula = RFormula(formula="price ~ .",
                    featuresCol="features",
                    labelCol="price",
                    handleInvalid="skip")


In [12]:
lr = LinearRegression(labelCol="price", featuresCol="features")
pipeline = Pipeline(stages=[stringIndexer, oheEncoder, vecAssembler, lr])  # Or use RFormula
# pipeline = Pipeline(stages = [rFormula, lr])
pipelineModel = pipeline.fit(trainDF)
predDF = pipelineModel.transform(testDF)
predDF.select("features", "price", "prediction").show(5)


24/02/23 07:31:03 WARN Instrumentation: [9234e413] regParam is zero, which might cause numerical instability and overfitting.


+--------------------+-----+------------------+
|            features|price|        prediction|
+--------------------+-----+------------------+
|(98,[0,3,6,22,43,...| 85.0| 55.24365707389188|
|(98,[0,3,6,22,43,...| 45.0|23.357685914717877|
|(98,[0,3,6,22,43,...| 70.0|28.474464479034395|
|(98,[0,3,6,12,42,...|128.0| -91.6079079594947|
|(98,[0,3,6,12,43,...|159.0| 95.05688229945372|
+--------------------+-----+------------------+
only showing top 5 rows



In [13]:
from pyspark.ml.evaluation import RegressionEvaluator

regressionEvaluator = RegressionEvaluator(
    predictionCol="prediction",
    labelCol="price",
    metricName="rmse")
rmse = regressionEvaluator.evaluate(predDF)
print(f"RMSE is {rmse:.1f}")


RMSE is 220.6


In [32]:
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF) 
print(f"R2 is {r2}")

R2 is 0.16043316698848087


In [14]:
pipelinePath = "/Users/chongbei/Music/test"
pipelineModel.write().overwrite().save(pipelinePath)

In [17]:
from pyspark.ml import PipelineModel 
savedPipelineModel = PipelineModel.load(pipelinePath)

In [16]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(labelCol="price")
# Filter for just numeric columns (and exclude price, our label)
numericCols = [field for (field, dataType) in trainDF.dtypes
           if ((dataType == "double") & (field != "price"))]
# Combine output of StringIndexer defined above and numeric columns
assemblerInputs = indexOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

# Combine stages into pipeline
stages = [stringIndexer, vecAssembler, dt]
pipeline = Pipeline(stages=stages)
#pipelineModel = pipeline.fit(trainDF) # This line should error
dt.setMaxBins(40)
pipelineModel = pipeline.fit(trainDF)

In [18]:
dtModel = pipelineModel.stages[-1] 
print(dtModel.toDebugString)

DecisionTreeRegressionModel: uid=DecisionTreeRegressor_8135be4d67bd, depth=5, numNodes=47, numFeatures=33
  If (feature 12 <= 2.5)
   If (feature 12 <= 1.5)
    If (feature 5 in {1.0,2.0})
     If (feature 4 in {0.0,1.0,3.0,5.0,9.0,10.0,11.0,13.0,14.0,16.0,18.0,24.0})
      If (feature 3 in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0,20.0,21.0,23.0,24.0,25.0,26.0,27.0,28.0,29.0,30.0,31.0,32.0,33.0,34.0})
       Predict: 104.23992784125075
      Else (feature 3 not in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0,20.0,21.0,23.0,24.0,25.0,26.0,27.0,28.0,29.0,30.0,31.0,32.0,33.0,34.0})
       Predict: 250.7111111111111
     Else (feature 4 not in {0.0,1.0,3.0,5.0,9.0,10.0,11.0,13.0,14.0,16.0,18.0,24.0})
      If (feature 3 in {0.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0,20.0,21.0,22.0,23.0,27.0,33.0,35.0})
       Predict: 151.94179894179894
      Else (feat

In [21]:
import pandas as pd
featureImp = pd.DataFrame(
      list(zip(vecAssembler.getInputCols(), dtModel.featureImportances)),
columns=["feature", "importance"])
featureImp.sort_values(by="importance", ascending=False)

Unnamed: 0,feature,importance
12,bedrooms,0.283406
1,cancellation_policyIndex,0.167893
2,instant_bookableIndex,0.140081
4,property_typeIndex,0.128179
15,number_of_reviews,0.126233
3,neighbourhood_cleansedIndex,0.0562
9,longitude,0.03881
14,minimum_nights,0.029473
13,beds,0.015218
5,room_typeIndex,0.010905


In [22]:
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(labelCol="price", maxBins=40, seed=42)

In [23]:
pipeline = Pipeline(stages = [stringIndexer, vecAssembler, rf])

In [24]:
from pyspark.ml.tuning import ParamGridBuilder 
paramGrid = (ParamGridBuilder()
            .addGrid(rf.maxDepth, [2, 4, 6])
            .addGrid(rf.numTrees, [10, 100])
            .build())

In [25]:
evaluator = RegressionEvaluator(labelCol="price",
                                    predictionCol="prediction",
                                    metricName="rmse")

In [26]:
from pyspark.ml.tuning import CrossValidator

In [27]:
cv = CrossValidator(estimator=pipeline,
                        evaluator=evaluator,
                        estimatorParamMaps=paramGrid,
                        numFolds=3,
                        seed=42)
cvModel = cv.fit(trainDF)

24/02/23 08:41:27 WARN DAGScheduler: Broadcasting large task binary with size 1323.1 KiB
24/02/23 08:41:33 WARN DAGScheduler: Broadcasting large task binary with size 1156.6 KiB
24/02/23 08:41:39 WARN DAGScheduler: Broadcasting large task binary with size 1197.5 KiB
24/02/23 08:41:41 WARN DAGScheduler: Broadcasting large task binary with size 1223.7 KiB


In [28]:
list(zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics))

[({Param(parent='RandomForestRegressor_2249db9d114f', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 2,
   Param(parent='RandomForestRegressor_2249db9d114f', name='numTrees', doc='Number of trees to train (>= 1).'): 10},
  291.1822640924783),
 ({Param(parent='RandomForestRegressor_2249db9d114f', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 2,
   Param(parent='RandomForestRegressor_2249db9d114f', name='numTrees', doc='Number of trees to train (>= 1).'): 100},
  286.7714750274078),
 ({Param(parent='RandomForestRegressor_2249db9d114f', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 4,
   Param(parent='RandomForestRegressor_2249db9d114f', name

In [29]:
cvModel = cv.setParallelism(4).fit(trainDF)

24/02/23 08:44:25 WARN BlockManager: Block rdd_1161_0 already exists on this machine; not re-adding it
24/02/23 08:44:28 WARN DAGScheduler: Broadcasting large task binary with size 1323.1 KiB
24/02/23 08:44:30 WARN DAGScheduler: Broadcasting large task binary with size 1156.6 KiB
24/02/23 08:44:33 WARN DAGScheduler: Broadcasting large task binary with size 1197.5 KiB
24/02/23 08:44:35 WARN DAGScheduler: Broadcasting large task binary with size 1223.7 KiB


In [30]:
cv = CrossValidator(estimator=rf,
                    evaluator=evaluator,
                    estimatorParamMaps=paramGrid,
                    numFolds=3,
                    parallelism=4,
                    seed=42)
pipeline = Pipeline(stages=[stringIndexer, vecAssembler, cv])
pipelineModel = pipeline.fit(trainDF)

24/02/23 08:46:07 WARN DAGScheduler: Broadcasting large task binary with size 1322.5 KiB
24/02/23 08:46:09 WARN DAGScheduler: Broadcasting large task binary with size 1159.2 KiB
24/02/23 08:46:11 WARN DAGScheduler: Broadcasting large task binary with size 1196.9 KiB
24/02/23 08:46:13 WARN DAGScheduler: Broadcasting large task binary with size 1223.7 KiB


In [31]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler 
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [42]:
airbnbDF = spark.read.parquet(filePath)
(trainDF, testDF) = airbnbDF.randomSplit([.8, .2], seed=42)
categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]
indexOutputCols = [x + "Index" for x in categoricalCols]
stringIndexer = StringIndexer(inputCols=categoricalCols,
                                  outputCols=indexOutputCols,
                                  handleInvalid="skip")
numericCols = [field for (field, dataType) in trainDF.dtypes
if ((dataType == "double") & (field != "price"))]
assemblerInputs = indexOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs,
                                   outputCol="features")
rf = RandomForestRegressor(labelCol="price", maxBins=40, maxDepth=5,
                               numTrees=100, seed=42)
pipeline = Pipeline(stages=[stringIndexer, vecAssembler, rf])

In [44]:
import mlflow
import mlflow.spark
import pandas as pd

In [46]:
with mlflow.start_run(run_name="random-forest") as run: 
    # Log params: num_trees and max_depth 
    mlflow.log_param("num_trees", rf.getNumTrees()) 
    mlflow.log_param("max_depth", rf.getMaxDepth())
    # Log model
    pipelineModel = pipeline.fit(trainDF)
    mlflow.spark.log_model(pipelineModel, "model")
    # Log metrics: RMSE and R2
    predDF = pipelineModel.transform(testDF)
    regressionEvaluator = RegressionEvaluator(predictionCol="prediction",
                                              labelCol="price")
    rmse = regressionEvaluator.setMetricName("rmse").evaluate(predDF)
    r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
    mlflow.log_metrics({"rmse": rmse, "r2": r2})
    # Log artifact: feature importance scores
    rfModel = pipelineModel.stages[-1]
    pandasDF = (pd.DataFrame(list(zip(vecAssembler.getInputCols(),
                                        rfModel.featureImportances)),
                               columns=["feature", "importance"])
                  .sort_values(by="importance", ascending=False))
# First write to local filesystem, then tell MLflow where to find that file
pandasDF.to_csv("feature-importance.csv", index=False)
mlflow.log_artifact("feature-importance.csv")



In [47]:
from mlflow.tracking import MlflowClient

In [49]:
client = MlflowClient()
runs = client.search_runs(run.info.experiment_id,
                              order_by=["attributes.start_time desc"],
                              max_results=1)
run_id = runs[0].info.run_id
runs[0].data.metrics

{}

In [50]:
 mlflow.run(
      "https://github.com/databricks/LearningSparkV2/#mlflow-project-example",
      parameters={"max_depth": 5, "num_trees": 100})


2024/02/23 14:27:59 INFO mlflow.projects.utils: === Fetching project from https://github.com/databricks/LearningSparkV2/#mlflow-project-example into /var/folders/4z/t2r1mqq10s59nz21h0y_16900000gr/T/tmpbb4dfdac ===
2024/02/23 14:28:12 INFO mlflow.projects.utils: Fetched 'master' branch
2024/02/23 14:28:15 INFO mlflow.utils.conda: === Creating conda environment mlflow-a91eb9b529409372aa4585b19c73952959a7a296 ===


Retrieving notices: ...working... done
Collecting package metadata (repodata.json): ...working... done
Solving environment: ...working... failed



ResolvePackageNotFound: 
  - python=3.7
  - pandas=0.24
  - pip=19.0.3



ShellCommandException: Non-zero exit code: 1
Command: ['/Users/chongbei/anaconda3/bin/conda', 'env', 'create', '-n', 'mlflow-a91eb9b529409372aa4585b19c73952959a7a296', '--file', '/var/folders/4z/t2r1mqq10s59nz21h0y_16900000gr/T/tmpbb4dfdac/mlflow-project-example/conda.yaml', '--quiet']