In [0]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer


#df = spark.read.format('parquet').table('training_dataset_numeric')
df = spark.read.format('parquet').table('training_dataset_numeric')
df.write.format("delta").mode("overwrite").save("/delta/events")

df = spark.read.format('delta').load('/delta/events') 
df = df.select('bathrooms_na','price','minimum_nights','property_type','beds')
df = df.na.drop()

In [0]:
df = df.filter(df["property_type"].isin(['Private room in condominium (condo)', 'Entire residential home', 'Entire rental unit']))
                        
display(df.
       groupby('property_type').count())

property_type,count
Private room in condominium (condo),99
Entire residential home,536
Entire rental unit,1260


In [0]:
# without hyperparameters tuning

# Transform our Train & Test in delta format
(Train, Test) = df.randomSplit([.8,.2], seed=42)

# categorial vs numerical
categoricalCols = [field for (field, dataType) in Train.dtypes if dataType == "string"]
indexOutputCols = [x + "Index" for x in categoricalCols]
stringIndexer = StringIndexer(inputCols = categoricalCols, outputCols = indexOutputCols, handleInvalid='skip')

numericalCols = [field for (field, dataType) in Train.dtypes if ((dataType == 'double') & (field != 'price'))]

assemblerInputs = indexOutputCols + numericalCols
vecAssembler = VectorAssembler(inputCols = assemblerInputs, outputCol = 'features')


rf = RandomForestRegressor(labelCol="price", maxBins=40)
stages = [stringIndexer, vecAssembler, rf]
pipeline = Pipeline(stages = stages)

predDF = pipeline.fit(Train).transform(Test)


In [0]:
print(rf.explainParams())

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol = 'price', predictionCol = 'prediction')
rmse = evaluator.evaluate(predDF)
r2 = evaluator.setMetricName('r2').evaluate(predDF)

print(rmse)
print(r2)

In [0]:
# with hyperparameters tuning => directly connected with mlflow in background, allows an interative comparaison
from pyspark.ml.tuning import ParamGridBuilder

paramGrid = (ParamGridBuilder()
            .addGrid(rf.maxDepth, [2,5])
            .addGrid(rf.numTrees, [5,10])
            .build())

paramGrid

In [0]:
# 3K cross validation
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator

evaluator = RegressionEvaluator(labelCol = 'price', predictionCol = 'prediction')
cv = CrossValidator(estimator = pipeline, evaluator = evaluator, estimatorParamMaps = paramGrid, numFolds = 3, seed = 42)

In [0]:
cvModel = cv.fit(Train)

In [0]:
# problem : manually search space so we can miss the optimum one and does have an expense
# Hyperopt solves this by doing the hyperparameter based on a range of value, could be run with Serial or Parallel
# 3 options : 
# Random search
# TPE (tree of parzen Estimators)
# Adaptive TPE - freeze parameter 1 after result then search for parameter 2

In [0]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import mlflow

def objective_function(params):
  max_depth = params['max_depth']
  num_trees = params['num_trees']
  print(max_depth)
  
  grid = (ParamGridBuilder()
            .addGrid(rf.maxDepth, ['max_depth'])
            .addGrid(rf.numTrees, ['num_trees'])
            .build())
  
  cv = CrossValidator(estimator = pipeline, evaluator = evaluator, estimatorParamMaps = paramGrid, numFolds = 3, seed = 42)
  cvModel = cv.fit(Train)
  
  rmse = cvModel.avgMetrics[0]
  
  return ('loss', rmse, "status", STATUS_OK)

In [0]:
from hyperopt import hp

search_space = {
  "max_depth" : hp.choice('max_depth', np.arange(1, 10,dtype=int)),
  "num_trees" : hp.choice('num_trees', np.arange(10, 20,dtype=int))
}

In [0]:
from hyperopt import fmin, tpe, STATUS_OK, Trials
import numpy as np 

with mlflow.start_run():
  num_evals = 4
  trials = Trials()
  
  best_hyperparam = fmin(fn = objective_function,
                         space = search_space,
                         algo = tpe.suggest,
                         max_evals = num_evals, 
                         trials = trials,
                         rstate = np.random.RandomState(42)
                        )
  
  best_max_depth = best_hyperparam['max_depth']
  best_num_trees = best_hyperparam['num_trees']
  
  # run RF with the best param
  rf.setMaxDepth(best_max_depth)
  rf.setNumTrees(best_num_trees)
  
  pipelineModel = pipeline.fit(trainDF)
  
  predDF = pipelineModel.transform(Test)
  rmse = regressionEvaluator.evaluate(predDF)
  
  mlflow.log_param('max_depth', best_max_depth)
  mlflow.log_param('num_trees', best_max_depth)
  mlflow.log_metric('rmse', rmse)