## Google Drive 연동

In [2]:
from google.colab import drive
drive.mount("/content/drive")

Mounted at /content/drive


## Spark 설치
- 주소 : https://spark.apache.org/downloads.html
- 주소 : https://www.apache.org/dyn/closer.lua/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz

In [1]:
!apt-get install openjdk-8-jdk-headless
!wget -q https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar -zxf spark-3.5.1-bin-hadoop3.tgz

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum fonts-ipafont-gothic
  fonts-ipafont-mincho fonts-wqy-microhei fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  libxtst6 openjdk-8-jdk-headless openjdk-8-jre-headless
0 upgraded, 3 newly installed, 0 to remove and 45 not upgraded.
Need to get 39.7 MB of archives.
After this operation, 144 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/main amd64 libxtst6 amd64 2:1.2.3-1build4 [13.4 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy-updates/universe amd64 openjdk-8-jre-headless amd64 8u402-ga-2ubuntu1~22.04 [30.8 MB]
Get:3 http://archive.ubuntu.com/ubuntu jammy-updates/universe amd64 openjdk-8-jdk-headless amd64 8u402-ga-2ubuntu1~22.04 [8,873 kB]

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [4]:
!pip install findspark -q

In [5]:
import findspark
findspark.init()

In [6]:
import pyspark
spark_version = pyspark.__version__
print("Apache Spark 버전 확인: " + spark_version)

Apache Spark 버전 확인: 3.5.1


# 데이터 수집 및 탐색

In [8]:
from pyspark.sql import SparkSession
import seaborn as sns

spark = SparkSession.builder.appName('chapter10').getOrCreate()

filePath = "/content/drive/MyDrive/Colab Notebooks/2024/멀티캠퍼스/PySpark/data/databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb.csv"
rawDF = spark.read.csv(filePath, header="true", inferSchema="true", multiLine="true", escape='"')
rawDF.show(1)

+---+--------------------+--------------+------------+--------------------+--------------------+--------------------+--------------------+-------------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+----------+--------------------+--------------+-------+--------------------+---------+----------+--------------------+--------------------+------------------+------------------+--------------------+-----------------+--------------------+--------------------+------------------+-------------------+-------------------------+--------------------+--------------------+----------------------+--------------------+---------------+----------------------+----------------------------+-------------+-----+-------+-------------+-----------------+------------+-------------+--------+----------+-----------------+-------------+---------------+------------+---------+--------+----+--------+--------------------+--

In [10]:
len(rawDF.columns)

106

## 데이터 특정 열 선택

In [11]:
columnsToKeep = [
  "host_is_superhost",
  "cancellation_policy",
  "instant_bookable",
  "host_total_listings_count",
  "neighbourhood_cleansed",
  "latitude",
  "longitude",
  "property_type",
  "room_type",
  "accommodates",
  "bathrooms",
  "bedrooms",
  "beds",
  "bed_type",
  "minimum_nights",
  "number_of_reviews",
  "review_scores_rating",
  "review_scores_accuracy",
  "review_scores_cleanliness",
  "review_scores_checkin",
  "review_scores_communication",
  "review_scores_location",
  "review_scores_value",
  "price"]

baseDF = rawDF.select(columnsToKeep)
baseDF.cache().count()
baseDF.show(1)

+-----------------+-------------------+----------------+-------------------------+----------------------+--------+----------+-------------+---------------+------------+---------+--------+----+--------+--------------+-----------------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+-------+
|host_is_superhost|cancellation_policy|instant_bookable|host_total_listings_count|neighbourhood_cleansed|latitude| longitude|property_type|      room_type|accommodates|bathrooms|bedrooms|beds|bed_type|minimum_nights|number_of_reviews|review_scores_rating|review_scores_accuracy|review_scores_cleanliness|review_scores_checkin|review_scores_communication|review_scores_location|review_scores_value|  price|
+-----------------+-------------------+----------------+-------------------------+----------------------+--------+----------+-------------+---------------+------------+---------+--------+-

- 데이터타입 수정

In [12]:
from pyspark.sql.functions import col, translate
fixedPriceDF = baseDF.withColumn("price", translate(col("price"), "$,", "").cast("double"))
fixedPriceDF.show(1)

+-----------------+-------------------+----------------+-------------------------+----------------------+--------+----------+-------------+---------------+------------+---------+--------+----+--------+--------------+-----------------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+-----+
|host_is_superhost|cancellation_policy|instant_bookable|host_total_listings_count|neighbourhood_cleansed|latitude| longitude|property_type|      room_type|accommodates|bathrooms|bedrooms|beds|bed_type|minimum_nights|number_of_reviews|review_scores_rating|review_scores_accuracy|review_scores_cleanliness|review_scores_checkin|review_scores_communication|review_scores_location|review_scores_value|price|
+-----------------+-------------------+----------------+-------------------------+----------------------+--------+----------+-------------+---------------+------------+---------+--------+----+

In [15]:
fixedPriceDF.describe().show()

+-------+-----------------+-------------------+----------------+-------------------------+----------------------+--------------------+--------------------+-------------+---------------+------------------+------------------+------------------+------------------+--------+------------------+-----------------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+------------------+
|summary|host_is_superhost|cancellation_policy|instant_bookable|host_total_listings_count|neighbourhood_cleansed|            latitude|           longitude|property_type|      room_type|      accommodates|         bathrooms|          bedrooms|              beds|bed_type|    minimum_nights|number_of_reviews|review_scores_rating|review_scores_accuracy|review_scores_cleanliness|review_scores_checkin|review_scores_communication|review_scores_location|review_scores_value|             price|
+-------+-----------

- 데이터 요약

In [16]:
fixedPriceDF.summary().show()

+-------+-----------------+-------------------+----------------+-------------------------+----------------------+--------------------+--------------------+-------------+---------------+------------------+------------------+------------------+------------------+--------+------------------+-----------------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+------------------+
|summary|host_is_superhost|cancellation_policy|instant_bookable|host_total_listings_count|neighbourhood_cleansed|            latitude|           longitude|property_type|      room_type|      accommodates|         bathrooms|          bedrooms|              beds|bed_type|    minimum_nights|number_of_reviews|review_scores_rating|review_scores_accuracy|review_scores_cleanliness|review_scores_checkin|review_scores_communication|review_scores_location|review_scores_value|             price|
+-------+-----------

- 일부 결측치 데이터 삭제

In [17]:
noNullsDF = fixedPriceDF.na.drop(subset=["host_is_superhost"])
noNullsDF.show(1)

+-----------------+-------------------+----------------+-------------------------+----------------------+--------+----------+-------------+---------------+------------+---------+--------+----+--------+--------------+-----------------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+-----+
|host_is_superhost|cancellation_policy|instant_bookable|host_total_listings_count|neighbourhood_cleansed|latitude| longitude|property_type|      room_type|accommodates|bathrooms|bedrooms|beds|bed_type|minimum_nights|number_of_reviews|review_scores_rating|review_scores_accuracy|review_scores_cleanliness|review_scores_checkin|review_scores_communication|review_scores_location|review_scores_value|price|
+-----------------+-------------------+----------------+-------------------------+----------------------+--------+----------+-------------+---------------+------------+---------+--------+----+

- 데이터 타입 변형

In [18]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

integerColumns = [x.name for x in baseDF.schema.fields if x.dataType == IntegerType()]
doublesDF = noNullsDF

for c in integerColumns:
  doublesDF = doublesDF.withColumn(c, col(c).cast("double"))

columns = "\n - ".join(integerColumns)
print(f"Columns converted from Integer to Double:\n - {columns}")

Columns converted from Integer to Double:
 - host_total_listings_count
 - accommodates
 - bedrooms
 - beds
 - minimum_nights
 - number_of_reviews
 - review_scores_rating
 - review_scores_accuracy
 - review_scores_cleanliness
 - review_scores_checkin
 - review_scores_communication
 - review_scores_location
 - review_scores_value


- 일부 컬럼 숫자 채우기

In [19]:
from pyspark.sql.functions import when

imputeCols = [
  "bedrooms",
  "bathrooms",
  "beds",
  "review_scores_rating",
  "review_scores_accuracy",
  "review_scores_cleanliness",
  "review_scores_checkin",
  "review_scores_communication",
  "review_scores_location",
  "review_scores_value"
]

for c in imputeCols:
  doublesDF = doublesDF.withColumn(c + "_na", when(col(c).isNull(), 1.0).otherwise(0.0))

doublesDF.describe().show()

+-------+-----------------+-------------------+----------------+-------------------------+----------------------+--------------------+--------------------+-------------+---------------+------------------+------------------+------------------+------------------+--------+------------------+-----------------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+------------------+--------------------+--------------------+--------------------+-----------------------+-------------------------+----------------------------+------------------------+------------------------------+-------------------------+----------------------+
|summary|host_is_superhost|cancellation_policy|instant_bookable|host_total_listings_count|neighbourhood_cleansed|            latitude|           longitude|property_type|      room_type|      accommodates|         bathrooms|          bedrooms|              beds|be

- 일부 결측치가 있는 데이터는 중간값으로 채우기

In [20]:
from pyspark.ml.feature import Imputer

imputer = Imputer(strategy="median", inputCols=imputeCols, outputCols=imputeCols)
imputedDF = imputer.fit(doublesDF).transform(doublesDF)

imputedDF.select("price").describe().show()

+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|              7151|
|   mean| 213.6540344007831|
| stddev|313.28222046853125|
|    min|               0.0|
|    max|           10000.0|
+-------+------------------+



In [23]:
imputedDF.filter(col("price") == 0).count()
posPricesDF = imputedDF.filter(col("price") > 0)
posPricesDF.select("minimum_nights").describe().show()

posPricesDF\
  .groupBy("minimum_nights")\
  .count()\
  .orderBy(col("count")\
           .desc(), col("minimum_nights")).show()

+-------+------------------+
|summary|    minimum_nights|
+-------+------------------+
|  count|              7150|
|   mean| 14002.25986013986|
| stddev|1182624.6002248244|
|    min|               1.0|
|    max|             1.0E8|
+-------+------------------+

+--------------+-----+
|minimum_nights|count|
+--------------+-----+
|          30.0| 2757|
|           2.0| 1455|
|           1.0| 1251|
|           3.0|  822|
|           4.0|  270|
|           5.0|  176|
|          31.0|  133|
|           7.0|   72|
|          60.0|   32|
|           6.0|   31|
|          32.0|   31|
|          90.0|   28|
|         180.0|   28|
|          45.0|    7|
|         365.0|    7|
|         120.0|    6|
|          14.0|    4|
|          10.0|    3|
|          40.0|    3|
|          28.0|    2|
+--------------+-----+
only showing top 20 rows



In [24]:
cleanDF = posPricesDF.filter(col("minimum_nights") <= 365)
cleanDF.show()

+-----------------+--------------------+----------------+-------------------------+----------------------+--------+----------+-------------+---------------+------------+---------+--------+----+--------+--------------+-----------------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+-----+-----------+------------+-------+-----------------------+-------------------------+----------------------------+------------------------+------------------------------+-------------------------+----------------------+
|host_is_superhost| cancellation_policy|instant_bookable|host_total_listings_count|neighbourhood_cleansed|latitude| longitude|property_type|      room_type|accommodates|bathrooms|bedrooms|beds|bed_type|minimum_nights|number_of_reviews|review_scores_rating|review_scores_accuracy|review_scores_cleanliness|review_scores_checkin|review_scores_communication|review_scores_location|

## 데이터 내보내기

In [25]:
outputPath = "sf-airbnb-clean.parquet"
cleanDF.write.mode("overwrite").parquet(outputPath)

# Linear Regression

## parquet 데이터 가져오기

In [27]:
filePath = "sf-airbnb-clean.parquet"
airbnbDF = spark.read.parquet(filePath)
airbnbDF.show(1)

+-----------------+-------------------+----------------+-------------------------+----------------------+--------+----------+-------------+---------------+------------+---------+--------+----+--------+--------------+-----------------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+-----+-----------+------------+-------+-----------------------+-------------------------+----------------------------+------------------------+------------------------------+-------------------------+----------------------+
|host_is_superhost|cancellation_policy|instant_bookable|host_total_listings_count|neighbourhood_cleansed|latitude| longitude|property_type|      room_type|accommodates|bathrooms|bedrooms|beds|bed_type|minimum_nights|number_of_reviews|review_scores_rating|review_scores_accuracy|review_scores_cleanliness|review_scores_checkin|review_scores_communication|review_scores_location|re

## 데이터셋 분리

In [28]:
trainDF, testDF = airbnbDF.randomSplit([.8, .2], seed=42)
print(f"There are {trainDF.cache().count()} rows in the training set, and {testDF.cache().count()} in the test set")

(trainRepartitionDF, testRepartitionDF) = (airbnbDF
                                           .repartition(24)
                                           .randomSplit([.8, .2], seed=42))

print(trainRepartitionDF.count())

There are 5780 rows in the training set, and 1366 in the test set
5738


- 가격 예측전 데이터 확인

In [29]:
# 일부 이상치 확인
trainDF.select("price", "bedrooms").summary().show()

+-------+------------------+------------------+
|summary|             price|          bedrooms|
+-------+------------------+------------------+
|  count|              5780|              5780|
|   mean|214.47249134948098|              1.35|
| stddev| 325.8499109968376|0.9396893597086263|
|    min|              10.0|               0.0|
|    25%|             100.0|               1.0|
|    50%|             150.0|               1.0|
|    75%|             240.0|               2.0|
|    max|           10000.0|              14.0|
+-------+------------------+------------------+



In [30]:
trainDF.select("price", "bedrooms").summary().show()

+-------+------------------+------------------+
|summary|             price|          bedrooms|
+-------+------------------+------------------+
|  count|              5780|              5780|
|   mean|214.47249134948098|              1.35|
| stddev| 325.8499109968376|0.9396893597086263|
|    min|              10.0|               0.0|
|    25%|             100.0|               1.0|
|    50%|             150.0|               1.0|
|    75%|             240.0|               2.0|
|    max|           10000.0|              14.0|
+-------+------------------+------------------+



## 모형 학습

In [31]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

vecAssembler = VectorAssembler(inputCols=["bedrooms"], outputCol="features")
vecTrainDF = vecAssembler.transform(trainDF)
vecTrainDF.select("bedrooms", "features", "price").show(10)

lr = LinearRegression(featuresCol="features", labelCol="price")
lrModel = lr.fit(vecTrainDF)

m = round(lrModel.coefficients[0], 2)
b = round(lrModel.intercept, 2)

print(f"The formula for the linear regression line is price = {m}*bedrooms + {b}")

+--------+--------+-----+
|bedrooms|features|price|
+--------+--------+-----+
|     1.0|   [1.0]|200.0|
|     1.0|   [1.0]|130.0|
|     1.0|   [1.0]| 95.0|
|     1.0|   [1.0]|250.0|
|     3.0|   [3.0]|250.0|
|     1.0|   [1.0]|115.0|
|     1.0|   [1.0]|105.0|
|     1.0|   [1.0]| 86.0|
|     1.0|   [1.0]|100.0|
|     2.0|   [2.0]|220.0|
+--------+--------+-----+
only showing top 10 rows

The formula for the linear regression line is price = 123.68*bedrooms + 47.51


## 모형 테스트

In [32]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[vecAssembler, lr])
pipelineModel = pipeline.fit(trainDF)
predDF = pipelineModel.transform(testDF)
predDF.select("bedrooms", "features", "price", "prediction").show(10)

+--------+--------+------+------------------+
|bedrooms|features| price|        prediction|
+--------+--------+------+------------------+
|     1.0|   [1.0]|  85.0|171.18598011578285|
|     1.0|   [1.0]|  45.0|171.18598011578285|
|     1.0|   [1.0]|  70.0|171.18598011578285|
|     1.0|   [1.0]| 128.0|171.18598011578285|
|     1.0|   [1.0]| 159.0|171.18598011578285|
|     2.0|   [2.0]| 250.0|294.86172649777757|
|     1.0|   [1.0]|  99.0|171.18598011578285|
|     1.0|   [1.0]|  95.0|171.18598011578285|
|     1.0|   [1.0]| 100.0|171.18598011578285|
|     1.0|   [1.0]|2010.0|171.18598011578285|
+--------+--------+------+------------------+
only showing top 10 rows



# Baseline 모델

In [33]:
from pyspark.sql.functions import avg, lit
from pyspark.ml.evaluation import RegressionEvaluator

filePath = "sf-airbnb-clean.parquet"
airbnbDF = spark.read.parquet(filePath)
trainDF, testDF = airbnbDF.randomSplit([.8, .2], seed=42)
avgPrice = float(trainDF.select(avg("price")).first()[0])
predDF = testDF.withColumn("avgPrediction", lit(avgPrice))

regressionMeanEvaluator = RegressionEvaluator(predictionCol="avgPrediction", labelCol="price", metricName="rmse")
print(f"The RMSE for predicting the average price is: {regressionMeanEvaluator.evaluate(predDF):.2f}")

The RMSE for predicting the average price is: 240.71


## Feature Engineering

### Option 1

In [36]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.feature import VectorAssembler

filePath = "sf-airbnb-clean.parquet"
airbnbDF = spark.read.parquet(filePath)
trainDF, testDF = airbnbDF.randomSplit([.8, .2], seed=42)

categoricalCols = [field for (field, dataType) in trainDF.dtypes
                   if dataType == "string"]
indexOutputCols = [x + "Index" for x in categoricalCols]
oheOutputCols = [x + "OHE" for x in categoricalCols]

stringIndexer = StringIndexer(inputCols=categoricalCols,
                              outputCols=indexOutputCols,
                              handleInvalid="skip")
oheEncoder = OneHotEncoder(inputCols=indexOutputCols,
                           outputCols=oheOutputCols)

numericCols = [field for (field, dataType) in trainDF.dtypes
               if ((dataType == "double") & (field != "price"))]
assemblerInputs = oheOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs,
                               outputCol="features")

In [37]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

lr = LinearRegression(labelCol="price", featuresCol="features")
stages = [stringIndexer, oheEncoder, vecAssembler, lr]
pipeline = Pipeline(stages=stages)

pipelineModel = pipeline.fit(trainDF)
predDF = pipelineModel.transform(testDF)
predDF.select("features", "price", "prediction").show(5)

+--------------------+-----+------------------+
|            features|price|        prediction|
+--------------------+-----+------------------+
|(98,[0,3,6,22,43,...| 85.0| 55.86406437022288|
|(98,[0,3,6,22,43,...| 45.0| 22.87558895398388|
|(98,[0,3,6,22,43,...| 70.0|27.382602888721067|
|(98,[0,3,6,12,42,...|128.0|-91.50712171182795|
|(98,[0,3,6,12,43,...|159.0| 94.66621817641771|
+--------------------+-----+------------------+
only showing top 5 rows



### Option 2

In [38]:
from pyspark.ml.feature import RFormula
rFormula = RFormula(formula="price ~ .", featuresCol="features", labelCol="price", handleInvalid="skip")

In [39]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

lr = LinearRegression(labelCol="price", featuresCol="features")
pipeline = Pipeline(stages = [rFormula, lr])

pipelineModel = pipeline.fit(trainDF)
predDF = pipelineModel.transform(testDF)
predDF.select("features", "price", "prediction").show(5)



+--------------------+-----+------------------+
|            features|price|        prediction|
+--------------------+-----+------------------+
|(98,[0,3,6,7,23,4...| 85.0| 55.40518338251741|
|(98,[0,3,6,7,23,4...| 45.0|22.558643930734434|
|(98,[0,3,6,7,23,4...| 70.0|27.035891181432817|
|(98,[0,3,6,7,13,4...|128.0|-91.29310091873367|
|(98,[0,3,6,7,13,4...|159.0| 94.66473870534765|
+--------------------+-----+------------------+
only showing top 5 rows



### 모형 평가

In [40]:
from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price", metricName="rmse")

rmse = round(regressionEvaluator.evaluate(predDF), 2)
print(f"RMSE is {rmse}")

r2 = round(regressionEvaluator.setMetricName("r2").evaluate(predDF), 2)
print(f"R2 is {r2}")

RMSE is 220.69
R2 is 0.16


### 모형 저장

In [41]:
from pyspark.ml import PipelineModel

pipelinePath = "lr-pipeline-model"
pipelineModel.write().overwrite().save(pipelinePath)
savedPipelineModel = PipelineModel.load(pipelinePath)

# Log Scale

In [42]:
from pyspark.sql.functions import col, log
from pyspark.ml import Pipeline
from pyspark.ml.feature import RFormula
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, exp

filePath = "sf-airbnb-clean.parquet"
airbnbDF = spark.read.parquet(filePath)
(trainDF, testDF) = airbnbDF.randomSplit([.8, .2], seed=42)

# Log 적용
logTrainDF = trainDF.withColumn("log_price", log(col("price")))
logTestDF = testDF.withColumn("log_price", log(col("price")))

rFormula = RFormula(formula="log_price ~ . - price", featuresCol="features", labelCol="log_price", handleInvalid="skip")

lr = LinearRegression(labelCol="log_price", predictionCol="log_pred")
pipeline = Pipeline(stages = [rFormula, lr])
pipelineModel = pipeline.fit(logTrainDF)
predDF = pipelineModel.transform(logTestDF)

expDF = predDF.withColumn("prediction", exp(col("log_pred")))

regressionEvaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction")
rmse = regressionEvaluator.setMetricName("rmse").evaluate(expDF)
r2 = regressionEvaluator.setMetricName("r2").evaluate(expDF)
print(f"RMSE is {rmse}")
print(f"R2 is {r2}")

RMSE is 208.41093081345025
R2 is 0.25039912467770375


# Decision Tree

In [48]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline

filePath = "sf-airbnb-clean.parquet"
airbnbDF = spark.read.parquet(filePath)
trainDF, testDF = airbnbDF.randomSplit([.8, .2], seed=42)

categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]
indexOutputCols = [x + "Index" for x in categoricalCols]
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid="skip")

numericCols = [field for (field, dataType) in trainDF.dtypes
               if ((dataType == "double") & (field != "price"))]

assemblerInputs = indexOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

dt = DecisionTreeRegressor(labelCol="price")

stages = [stringIndexer, vecAssembler, dt]
pipeline = Pipeline(stages=stages)
# pipelineModel = pipeline.fit(trainDF)
dt.setMaxBins(40)

pipelineModel = pipeline.fit(trainDF)
dtModel = pipelineModel.stages[-1]
print(dtModel.toDebugString)

DecisionTreeRegressionModel: uid=DecisionTreeRegressor_41511a609bfc, depth=5, numNodes=47, numFeatures=33
  If (feature 12 <= 2.5)
   If (feature 12 <= 1.5)
    If (feature 5 in {1.0,2.0})
     If (feature 4 in {0.0,1.0,3.0,5.0,9.0,10.0,11.0,13.0,14.0,16.0,18.0,24.0})
      If (feature 3 in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0,20.0,21.0,23.0,24.0,25.0,26.0,27.0,28.0,29.0,30.0,31.0,32.0,33.0,34.0})
       Predict: 104.23992784125075
      Else (feature 3 not in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0,20.0,21.0,23.0,24.0,25.0,26.0,27.0,28.0,29.0,30.0,31.0,32.0,33.0,34.0})
       Predict: 250.7111111111111
     Else (feature 4 not in {0.0,1.0,3.0,5.0,9.0,10.0,11.0,13.0,14.0,16.0,18.0,24.0})
      If (feature 3 in {0.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0,20.0,21.0,22.0,23.0,27.0,33.0,35.0})
       Predict: 151.94179894179894
      Else (feat

In [49]:
dtModel = pipelineModel.stages[-1]
dtModel.featureImportances

SparseVector(33, {1: 0.1679, 2: 0.1401, 3: 0.0562, 4: 0.1282, 5: 0.0109, 9: 0.0388, 10: 0.0036, 12: 0.2834, 13: 0.0152, 14: 0.0295, 15: 0.1262})

In [50]:
import pandas as pd
dtModel = pipelineModel.stages[-1]
featureImp = pd.DataFrame(
  list(zip(vecAssembler.getInputCols(), dtModel.featureImportances)),
  columns=["feature", "importance"])
featureImp.sort_values(by="importance", ascending=False)

Unnamed: 0,feature,importance
12,bedrooms,0.283406
1,cancellation_policyIndex,0.167893
2,instant_bookableIndex,0.140081
4,property_typeIndex,0.128179
15,number_of_reviews,0.126233
3,neighbourhood_cleansedIndex,0.0562
9,longitude,0.03881
14,minimum_nights,0.029473
13,beds,0.015218
5,room_typeIndex,0.010905


In [51]:
predDF = pipelineModel.transform(testDF)
predDF.select("features", "price", "prediction").orderBy("price", ascending=False).show()

+--------------------+------+------------------+
|            features| price|        prediction|
+--------------------+------+------------------+
|(33,[1,2,3,4,7,8,...|4500.0| 205.5814889336016|
|[0.0,2.0,1.0,16.0...|3800.0|214.04819277108433|
|(33,[0,2,3,4,7,8,...|2282.0|            8000.0|
|[0.0,2.0,0.0,21.0...|2010.0| 205.5814889336016|
|[0.0,0.0,1.0,21.0...|1599.0|309.03921568627453|
|(33,[1,3,4,5,7,8,...|1500.0|104.23992784125075|
|(33,[1,3,4,7,8,9,...|1252.0|            8000.0|
|(33,[0,1,3,4,7,8,...|1250.0|         722.96875|
|[0.0,2.0,1.0,2.0,...|1200.0|104.23992784125075|
|(33,[1,3,4,7,8,9,...|1200.0| 493.3795620437956|
|[1.0,0.0,1.0,3.0,...|1200.0| 205.5814889336016|
|(33,[0,3,4,7,8,9,...|1195.0|         722.96875|
|(33,[2,3,4,5,7,8,...|1099.0|            741.64|
|(33,[3,4,7,8,9,10...|1075.0| 493.3795620437956|
|[0.0,0.0,0.0,26.0...|1000.0| 493.3795620437956|
|(33,[3,4,7,8,9,10...|1000.0|296.76666666666665|
|(33,[3,4,7,8,9,10...|1000.0| 493.3795620437956|
|[1.0,1.0,1.0,20.0..

In [47]:
from pyspark.ml.evaluation import RegressionEvaluator

regressionEvaluator = RegressionEvaluator(predictionCol="prediction",
                                          labelCol="price",
                                          metricName="rmse")

rmse = regressionEvaluator.evaluate(predDF)
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"RMSE is {rmse}")
print(f"R2 is {r2}")

RMSE is 385.8704264527981
R2 is -1.5696388432265533


# RandomForest & HypyerParameter Tuning

In [52]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator

filePath = "sf-airbnb-clean.parquet"
airbnbDF = spark.read.parquet(filePath)
(trainDF, testDF) = airbnbDF.randomSplit([.8, .2], seed=42)

categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]
indexOutputCols = [x + "Index" for x in categoricalCols]

stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid="skip")

numericCols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") & (field != "price"))]
assemblerInputs = indexOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

rf = RandomForestRegressor(labelCol="price", maxBins=40, seed=42)
pipeline = Pipeline(stages = [stringIndexer, vecAssembler, rf])

paramGrid = (ParamGridBuilder()
            .addGrid(rf.maxDepth, [2, 4, 6])
            .addGrid(rf.numTrees, [10, 100])
            .build())



evaluator = RegressionEvaluator(labelCol="price",
                                predictionCol="prediction",
                                metricName="rmse")

cv = CrossValidator(estimator=pipeline,
                    evaluator=evaluator,
                    estimatorParamMaps=paramGrid,
                    numFolds=3,
                    seed=42)

cvModel = cv.fit(trainDF)
cvModel = cv.setParallelism(4).fit(trainDF)

cv = CrossValidator(estimator=rf,
                    evaluator=evaluator,
                    estimatorParamMaps=paramGrid,
                    numFolds=3,
                    parallelism=4,
                    seed=42)

pipeline = Pipeline(stages=[stringIndexer, vecAssembler, cv])
pipelineModel = pipeline.fit(trainDF)

list(zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics))

predDF = pipelineModel.transform(testDF)
regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price", metricName="rmse")

rmse = regressionEvaluator.evaluate(predDF)
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"RMSE is {rmse}")
print(f"R2 is {r2}")

RMSE is 211.70370310223277
R2 is 0.2265254865671944
