Work as a group, try to improve the performance (measured by rmse and r2) of the airbnb rental price prediction model.  As a reference: current rmse: 215, r2: 0.20
####Ways to improve the model:
1. remove outliers, especailly for the target label
2. find better ways to deal with missing values
3. add/delete/modify features, create additional features based on existing features
4. conduct hyper-parameters tuning and cross-validation
5. try different models/algorithms.
6. use more data or anything else you find helpful

### please document your steps clearly, and discuss your best rmse and r2 score. 

### Due Date: Tuesday 11/16 at midnight

### Deliverables: submit the completed notebook with results/visualization in html format.

## Data cleaning/pre-processing

In [0]:
from pyspark.sql.functions import col, translate, when
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import Imputer

filePath = "/databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb.csv"
 
rawDF = spark.read.csv(filePath, header="true", inferSchema="true", multiLine="true", escape='"')

columnsToKeep = [
  "host_is_superhost",
  "cancellation_policy",
  "instant_bookable",
  "host_total_listings_count",
  "neighbourhood_cleansed",
  "latitude",
  "longitude",
  "property_type",
  "room_type",
  "accommodates",
  "bathrooms",
  "bedrooms",
  "beds",
  "bed_type",
  "minimum_nights",
  "number_of_reviews",
  "review_scores_rating",
  "review_scores_accuracy",
  "review_scores_cleanliness",
  "review_scores_checkin",
  "review_scores_communication",
  "review_scores_location",
  "review_scores_value",
  "price"]
 
baseDF = rawDF.select(columnsToKeep)

# fix data types
 
fixedPriceDF = baseDF.withColumn("price", translate(col("price"), "$,", "").cast("double"))
 
# remove nulls from host_is_superhost
noNullsDF = fixedPriceDF.na.drop(subset=["host_is_superhost"])

# cast integer to Double

integerColumns = [x.name for x in baseDF.schema.fields if x.dataType == IntegerType()]

doublesDF = noNullsDF

for c in integerColumns:
  doublesDF = doublesDF.withColumn(c, col(c).cast("double"))
  
# select the columns to imputer
 
imputeCols = [
  "bedrooms",
  "bathrooms",
  "beds", 
  "review_scores_rating",
  "review_scores_accuracy",
  "review_scores_cleanliness",
  "review_scores_checkin",
  "review_scores_communication",
  "review_scores_location",
  "review_scores_value"
]

imputer = Imputer(strategy="median", inputCols=imputeCols, outputCols=imputeCols)
 
imputedDF = imputer.fit(doublesDF).transform(doublesDF)

# deal with outliers
#only keep rows with a strictly positive price

posPricesDF = imputedDF.filter(col("price") > 0)

# Filter out those records where the minimum_nights is greater then 365:
cleanDF = posPricesDF.filter(col("minimum_nights") <= 365)

# save cleaned data for future analysis
outputPath = "/tmp/sf-airbnb/sf-airbnb-clean.parquet"
 
cleanDF.write.mode("overwrite").parquet(outputPath)

## Regression Analysis

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator
 
# load the cleaned data
filePath = "/tmp/sf-airbnb/sf-airbnb-clean.parquet"
airbnbDF = spark.read.parquet(filePath)
 
# split into train and test dataset
(trainDF, testDF) = airbnbDF.randomSplit([.8, .2], seed=42)
 
# Feature engineering
categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]
 
indexOutputCols = [x + "Index" for x in categoricalCols]
 
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid="skip")

oheOutputCols = [x + "OHE" for x in categoricalCols]

oheEncoder = OneHotEncoder(inputCols=indexOutputCols, 
                           outputCols=oheOutputCols)
 
numericCols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") & (field != "price"))]

assemblerInputs = oheOutputCols + numericCols

vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
 
# initiate linear regression

lr = LinearRegression(labelCol="price", featuresCol="features")
 
# define hyper-parameter. Please be aware each algorithm has different hyper-parameters to tune.
paramGrid = (ParamGridBuilder()
            .addGrid(lr.regParam, [0.01, 1, 2.0])
            .addGrid(lr.elasticNetParam, [0.01,0.1, 0.5, 1.0])
            .build())
 
#define evaluator
evaluator = RegressionEvaluator(labelCol="price", 
                                predictionCol="prediction", 
                                metricName="rmse")
# cross-validation
cv = CrossValidator( estimator=lr, 
                    evaluator=evaluator, 
                    estimatorParamMaps=paramGrid, 
                    numFolds=3, 
                    parallelism=10, 
                    seed=42)
 
#create pipeline
pipeline = Pipeline(stages = [stringIndexer, oheEncoder, vecAssembler, cv])
 
pipelineModel_lr = pipeline.fit(trainDF)

In [0]:
# evaluate the model
predDF = pipelineModel_lr.transform(testDF)
 
rmse = evaluator.evaluate(predDF)
r2 = evaluator.setMetricName("r2").evaluate(predDF)
print(f"RMSE is {rmse}")
print(f"R2 is {r2}")

In [0]:
def lr_feature_names(df):
  featureIndex=df.schema["features"].metadata["ml_attr"]["attrs"]
 
  feature_names=[]
  # print numeric feature
  for x in range(len(df.schema["features"].metadata["ml_attr"]["attrs"]['numeric'])):
    try:
      feature_names.append(featureIndex["numeric"][x]['name'])
    except:
      continue
 # print binary feature   
  for x in range(len(df.schema["features"].metadata["ml_attr"]["attrs"]['binary'])):
    try:
       feature_names.append(featureIndex["binary"][x]['name'])
    except:
      continue
  return feature_names

In [0]:
# feature importance
import pandas as pd

lrModel = pipelineModel_lr.stages[-1]

coefficients =lrModel.bestModel.coefficients
 
feature_names=lr_feature_names(predDF)
 
weightsDF = pd.DataFrame(zip(feature_names, coefficients), columns=['feature', 'coefficients'])

# get absoluate value of weight
weightsDF['abs_coefficients']=weightsDF['coefficients'].abs()
 
#weightsDF
 
display(weightsDF.sort_values('abs_coefficients', ascending=False).head(20))

feature,coefficients,abs_coefficients
property_typeOHE_Villa,1790.8590233897642,1790.8590233897642
property_typeOHE_Hotel,261.9309546431089,261.9309546431089
neighbourhood_cleansedOHE_Inner Sunset,107.80549120430356,107.80549120430356
neighbourhood_cleansedOHE_Castro/Upper Market,86.04002233577792,86.04002233577792
neighbourhood_cleansedOHE_Lakeshore,-83.37207437108485,83.37207437108485
neighbourhood_cleansedOHE_Visitacion Valley,-70.40199890526016,70.40199890526016
neighbourhood_cleansedOHE_Downtown/Civic Center,-64.52617190308993,64.52617190308993
cancellation_policyOHE_moderate,-61.86444964072768,61.86444964072768
property_typeOHE_Tiny house,61.61110620029868,61.61110620029868
property_typeOHE_House,56.625342755077575,56.625342755077575


## Random Forest (put everything together)

In [0]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator
 
# load the cleaned data
filePath = "/tmp/sf-airbnb/sf-airbnb-clean.parquet"
airbnbDF = spark.read.parquet(filePath)
 
# split into train and test dataset
(trainDF, testDF) = airbnbDF.randomSplit([.8, .2], seed=42)
 
# Feature engineering
categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]
 
indexOutputCols = [x + "Index" for x in categoricalCols]
 
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid="skip")
 
numericCols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") & (field != "price"))]

assemblerInputs = indexOutputCols + numericCols

vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
 
# iniatiate random forest

rf = RandomForestRegressor(labelCol="price", maxBins=40, seed=42)
 
# define hyper-parameter. Please be aware each algorithm has different hyper-parameter to tune.
paramGrid = (ParamGridBuilder()
            .addGrid(rf.maxDepth, [4, 6, 8])
            .addGrid(rf.maxBins, [40, 50])
            .addGrid(rf.numTrees, [50, 100])
            .build())
 
#define evaluator
evaluator = RegressionEvaluator(labelCol="price", 
                                predictionCol="prediction", 
                                metricName="rmse")
# cross-validation
cv = CrossValidator(estimator=rf, 
                    evaluator=evaluator, 
                    estimatorParamMaps=paramGrid, 
                    numFolds=3, 
                    parallelism=10, 
                    seed=42)
#create pipeline 
 
pipeline = Pipeline(stages=[stringIndexer, vecAssembler, cv])

# fit the model 
pipelineModel_rf = pipeline.fit(trainDF)

### Evalaute the model

In [0]:
predDF = pipelineModel_rf.transform(testDF)
 
rmse = evaluator.evaluate(predDF)
r2 = evaluator.setMetricName("r2").evaluate(predDF)
print(f"RMSE is {rmse}")
print(f"R2 is {r2}")

### Get the feature importance

In [0]:
import pandas as pd
cvModel = pipelineModel_rf.stages[-1]
featureImp = pd.DataFrame(
  list(zip(vecAssembler.getInputCols(), cvModel.bestModel.featureImportances)),
  columns=["feature", "importance"])
display(featureImp.sort_values(by="importance", ascending=False).head(10))

feature,importance
accommodates,0.137902280787741
bedrooms,0.1258226830311767
cancellation_policyIndex,0.1038725960906299
neighbourhood_cleansedIndex,0.0983087335245836
beds,0.0875291420244688
minimum_nights,0.0617573761505163
latitude,0.0602366982067059
property_typeIndex,0.0595509691510626
bathrooms,0.054711357716748
number_of_reviews,0.0411582368105692
