<h4 style="font-variant-caps: small-caps;font-size:35pt;font-family:'Courier New'">MLlib Grid search</h4>

# 1. test

In [0]:
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator



<h4 style="font-variant-caps: small-caps;font-weight:700">Load dataset</h4>

In [0]:
amsterdam_airbnb_df_url = "http://data.insideairbnb.com/the-netherlands/north-holland/amsterdam/2023-09-03/visualisations/listings.csv"
amsterdam_airbnb_pandas_df = pd.read_csv(amsterdam_airbnb_df_url)

<h4 style="font-weight:700; font-variant-caps: small-caps;">Drop some columns</h4>

In [0]:
columns_to_exclude = ["id",
                      "name",
                      "host_id",
                      "host_name",
                      "neighbourhood_group",
                      "license",
                      "last_review",
                      "reviews_per_month"]
#
amsterdam_airbnb_pandas_df = amsterdam_airbnb_pandas_df.drop(columns=columns_to_exclude)

<h4 style="font-weight:700; font-variant-caps: small-caps;">Convert to Spark dataframe</h4>

In [0]:
schema = StructType([
    StructField("neighbourhood", StringType(), nullable=True),
    StructField("latitude", DoubleType(), nullable=True),
    StructField("longitude", DoubleType(), nullable=True),
    StructField("room_type", StringType(), nullable=True),
    StructField("price", IntegerType(), nullable=True),
    StructField("minimum_nights", IntegerType(), nullable=True),
    StructField("number_of_reviews", IntegerType(), nullable=True),
    StructField("calculated_host_listings_count", IntegerType(), nullable=True),
    StructField("availability_365", IntegerType(), nullable=True),
    StructField("number_of_reviews_ltm", IntegerType(), nullable=True)
])

In [0]:
amsterdam_airbnb_df = spark.createDataFrame(amsterdam_airbnb_pandas_df, schema=schema)

<h4 style="font-weight:700; font-variant-caps: small-caps;">Optional: write to delta table</h4>

In [0]:
(amsterdam_airbnb_df.write
                    .mode("overwrite")
                    .option("overwriteSchema", "True")
                    .format("delta")
                    .saveAsTable("amsterdam_airbnb_df"))
#
amsterdam_airbnb_df = spark.table("amsterdam_airbnb_df")

<h4 style="font-weight:700; font-variant-caps: small-caps;">Prepare data for ML</h4>
<div>For the exercise of a binary classification, a fake column is created from <b>price</b> column which can take two values - <i>true</i> or <i>false</i> - depending on price above or below 150.</div>
<div>Note that converting the boolean column to type integer<code>.cast("int")</code> automatically changes <i>true</i> to <code>1</code> and <i>false</i> to <code>0</code>.</div>

In [0]:
airbnb_df = (amsterdam_airbnb_df.withColumn("priceClass", (col("price") >= 150).cast("int"))
                                .drop("price"))

train_df, test_df = airbnb_df.randomSplit([.8, .2], seed=42)

categorical_cols = [field for (field, dataType) in train_df.dtypes if dataType == "string"]
index_output_cols = [x + "Index" for x in categorical_cols]

string_indexer = StringIndexer(inputCols=categorical_cols, outputCols=index_output_cols, handleInvalid="skip")

numeric_cols = [field for (field, dataType) in train_df.dtypes if ((dataType in ["double", "int"]) & (field != "priceClass"))]
assembler_inputs = index_output_cols + numeric_cols
vec_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

<h4 style="font-weight:700; font-variant-caps: small-caps;">Instantiate ML model: Random Forest classifier</h4>
<div>The following command let us verify that the minimum number of bins needed for RandomForestClassifier is <code>22</code>. It corresponds to the maximum number of unique values for columns of type string.</div>

In [0]:
count_distinct = [(column, amsterdam_airbnb_df.select(column).distinct().count(), amsterdam_airbnb_df.select(column).dtypes[0][-1]) for column in amsterdam_airbnb_df.columns]
display(spark.createDataFrame(count_distinct, ['Column', 'Distinct values', 'type']).orderBy(['type', 'Distinct values'], ascending=[0, 1]))

Column,Distinct values,type
room_type,4,string
neighbourhood,22,string
calculated_host_listings_count,17,int
minimum_nights,50,int
number_of_reviews_ltm,141,int
availability_365,366,int
number_of_reviews,485,int
price,631,int
latitude,5865,double
longitude,6845,double


<div>Then for the exercise, let's use <code>22</code> as maxBins parameter.</div>

In [0]:
rf = RandomForestClassifier(labelCol="priceClass", maxBins=22, seed = 42)

<h4 style="font-weight:700; font-variant-caps: small-caps;">Prepare grid for Grid Search</h4>
<div>Defining the grid as shown in the next cell will result in the training of 9 models. There are 3 x 3 parameter combinations.</div>
<a id="gridsearch">test</a>

In [0]:
grid = (ParamGridBuilder().addGrid(rf.maxDepth, [2, 5, 10])
                          .addGrid(rf.numTrees, [10, 20, 100]).build())

<h4 style="font-weight:700; font-variant-caps: small-caps;">Prepare evaluator</h4>

In [0]:
metric = "areaUnderROC"

In [0]:
evaluator = BinaryClassificationEvaluator(labelCol="priceClass", metricName=metric)

<h4 style="font-weight:700; font-variant-caps: small-caps;">Definition of the cross validation</h4>
<div>Setting numFolds parameter to 3 in the CrossValidator as shown in the next cell will result, for each parameter combination of the Grid Search, in the training of 3 different models based on three different set of rows from the dataset.</div>
<div>Consequently, with this parameterization, calling the fit method will result in the training of 27 models: 3 x 3 x 3.</div>

In [0]:
cv = CrossValidator(estimator=rf, evaluator=evaluator, estimatorParamMaps=grid, seed=42, numFolds=3)

<h4 style="font-weight:700; font-variant-caps: small-caps;">Definition of the pipeline and fit the pipeline</h4>

In [0]:
stages = [string_indexer, vec_assembler, cv]
#
pipeline = Pipeline(stages=stages)
#
pipeline_model = pipeline.fit(train_df)

<h4 style="font-weight:700; font-variant-caps: small-caps;">Get Grid Search parameters value of the best model:</h4>
<div>Note that there are 9 results, one for each parameter combination in the Grid Search. For each parameter combination, three models are trained and evaluated - according to the numFolds parameter of the cross validation. Then the average of the three scores of the cross validation is calculated and represents the final result for a given parameter combination. That's why 27 models are trained but at the end, there are only 9 scores.</div>

In [0]:
columns_name = [paramName.name for paramName in list(pipeline_model.stages[-1].getEstimatorParamMaps())[0]] + [metric]

<div>Best model is the model with the highest value for area under ROC. It is obtained with the parameters from the Grid Search shown in the first row of the below table.</div>

In [0]:
sets = [tuple([(v) for k,v in paramset.items()]+[str(avgmetric)]) for paramset,avgmetric in zip(list(pipeline_model.stages[-1].getEstimatorParamMaps()), pipeline_model.stages[-1].avgMetrics)]
#
display(spark.createDataFrame(sets, columns_name).orderBy(desc(metric)))

maxDepth,numTrees,areaUnderROC
10,100,0.8407864668331569
10,20,0.8327511187299091
10,10,0.8264245347627136
5,100,0.8207445767017671
5,20,0.8170598033231653
5,10,0.8132904263828932
2,100,0.7509357579175512
2,10,0.7364821016113127
2,20,0.7038806386896654


<h4 style="font-weight:700; font-variant-caps: small-caps;">Get more detailed information related to the best model:</h4>

In [0]:
cv_model = pipeline_model.stages[-1]
rf_model = cv_model.bestModel
print(rf_model.explainParams())

bootstrap: Whether bootstrap samples are used when building trees. (default: True)
cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
featureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'all' (use all features), 'onethird' (use 1/3 of the featur

<h4 style="font-weight:700; font-variant-caps: small-caps;">Get features by order of importance</h4>

In [0]:
pandas_df = pd.DataFrame(list(zip(vec_assembler.getInputCols(), rf_model.featureImportances)), columns=["feature", "importance"])
top_features = pandas_df.sort_values(["importance"], ascending=False)
top_features

Unnamed: 0,feature,importance
1,room_typeIndex,0.207192
0,neighbourhoodIndex,0.141898
5,number_of_reviews,0.135982
7,availability_365,0.127502
8,number_of_reviews_ltm,0.107315
3,longitude,0.091208
2,latitude,0.075542
4,minimum_nights,0.068298
6,calculated_host_listings_count,0.045064
