In [0]:
import pandas as pd
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.types import DoubleType, IntegerType, StringType

In [0]:
project_path   = "/ames/data/"
train_path = project_path + "train"
test_path = project_path + "test"
LABEL_COLUMN = "SalePrice"

train_df = spark.read.format("delta").load(train_path)
test_df = spark.read.format("delta").load(test_path)

In [0]:
numeric_column_names = [field.name for field in train_df.schema.fields if (field.dataType == IntegerType() or field.dataType == DoubleType()) and field.name != LABEL_COLUMN]
categorical_column_names = [field.name for field in train_df.schema.fields if field.dataType == StringType()]

In [0]:
indexed_column_names = [c + "_index" for c in categorical_column_names]
feature_colums = numeric_column_names + indexed_column_names

string_indexer = StringIndexer(inputCols=categorical_column_names, outputCols=indexed_column_names, handleInvalid="skip")
assembler = VectorAssembler(inputCols=feature_colums, outputCol="features")
regressor = RandomForestRegressor(labelCol=LABEL_COLUMN, maxBins=40)

pipeline = Pipeline(stages=[string_indexer, assembler, regressor])

In [0]:
param_grid = (ParamGridBuilder()
             .addGrid(regressor.maxDepth, [9, 15])
             .addGrid(regressor.numTrees, [14, 20])
              .build()
             )

In [0]:
regression_evaluator = RegressionEvaluator(labelCol=LABEL_COLUMN, predictionCol="prediction")
cv = CrossValidator(estimator=pipeline, evaluator=regression_evaluator, estimatorParamMaps=param_grid, numFolds=3, seed=1, parallelism=4)
pipeline_model = cv.fit(train_df)

In [0]:
predictions_df = pipeline_model.transform(test_df)

print("RMSE = {0:.2f}, R2 = {1:.2f}".format(regression_evaluator.setMetricName("rmse").evaluate(predictions_df), regression_evaluator.setMetricName("r2").evaluate(predictions_df)))

In [0]:
feature_importances = pd.DataFrame(
  data = list(zip(assembler.getInputCols(), pipeline_model.bestModel.stages[-1].featureImportances)),         columns=["feature", "feature_importance"]).sort_values(by="feature_importance", ascending=False)
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
display(spark.createDataFrame(feature_importances[:15]))

feature,feature_importance
Overall_Qual,0.2505162725565271
Neighborhood_index,0.194663155408079
Gr_Liv_Area,0.0937489221817356
1st_Flr_SF,0.0759690871810148
Garage_Cars,0.0417181985534553
Exter_Qual_index,0.0416052300461049
Bsmt_Qual_index,0.0340726335451288
Year_Built,0.0297266794620859
Kitchen_Qual_index,0.0280732786403214
Total_Bsmt_SF,0.021991339339086
