In [20]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import RFormula
import pandas as pd
import click

In [4]:
spark = SparkSession.builder.appName("App").getOrCreate()

In [6]:
filePath = """/content/ML_Data.parquet"""

airbnbDF = spark.read.parquet(filePath)

airbnbDF.select("neighbourhood_cleansed", "room_type", "price", "bedrooms", "bathrooms", "number_of_reviews", "price").show()

+----------------------+---------------+-----+--------+---------+-----------------+-----+
|neighbourhood_cleansed|      room_type|price|bedrooms|bathrooms|number_of_reviews|price|
+----------------------+---------------+-----+--------+---------+-----------------+-----+
|      Western Addition|Entire home/apt|170.0|     1.0|      1.0|            180.0|170.0|
|        Bernal Heights|Entire home/apt|235.0|     2.0|      1.0|            111.0|235.0|
|        Haight Ashbury|   Private room| 65.0|     1.0|      4.0|             17.0| 65.0|
|        Haight Ashbury|   Private room| 65.0|     1.0|      4.0|              8.0| 65.0|
|      Western Addition|Entire home/apt|785.0|     2.0|      1.5|             27.0|785.0|
|      Western Addition|Entire home/apt|255.0|     2.0|      1.0|             31.0|255.0|
|               Mission|   Private room|139.0|     1.0|      1.0|            647.0|139.0|
|          Potrero Hill|   Private room|135.0|     1.0|      1.0|            453.0|135.0|
|         

In [7]:
trainDF, testDF = airbnbDF.randomSplit([.8, .2], seed=42)

print(f"""There are {trainDF.count()} rows in the training set, and {testDF.count()} in the test set""")

There are 5780 rows in the training set, and 1366 in the test set


In [11]:
vecAssembler = VectorAssembler(inputCols = ["bedrooms"], outputCol = "features")

vecTrainDF = vecAssembler.transform(trainDF)

vecTrainDF.select("bedrooms", "features", "price").show(10)

+--------+--------+-----+
|bedrooms|features|price|
+--------+--------+-----+
|     1.0|   [1.0]|200.0|
|     1.0|   [1.0]|130.0|
|     1.0|   [1.0]| 95.0|
|     1.0|   [1.0]|250.0|
|     3.0|   [3.0]|250.0|
|     1.0|   [1.0]|115.0|
|     1.0|   [1.0]|105.0|
|     1.0|   [1.0]| 86.0|
|     1.0|   [1.0]|100.0|
|     2.0|   [2.0]|220.0|
+--------+--------+-----+
only showing top 10 rows



In [12]:
lr = LinearRegression(featuresCol="features", labelCol="price")

lrModel = lr.fit(vecTrainDF)

In [15]:
m = round(lrModel.coefficients[0], 2)

b = round(lrModel.intercept, 2)

print(f"""The formula for the linear regression line is price = {m}*bedrooms + {b}""")

The formula for the linear regression line is price = 123.68*bedrooms + 47.51


In [21]:
pipeline = Pipeline(stages=[vecAssembler, lr])

pipelineModel = pipeline.fit(trainDF)

In [22]:
predDF = pipelineModel.transform(testDF)

predDF.select("bedrooms", "features", "price", "prediction").show(10)

+--------+--------+------+------------------+
|bedrooms|features| price|        prediction|
+--------+--------+------+------------------+
|     1.0|   [1.0]|  85.0|171.18598011578285|
|     1.0|   [1.0]|  45.0|171.18598011578285|
|     1.0|   [1.0]|  70.0|171.18598011578285|
|     1.0|   [1.0]| 128.0|171.18598011578285|
|     1.0|   [1.0]| 159.0|171.18598011578285|
|     2.0|   [2.0]| 250.0|294.86172649777757|
|     1.0|   [1.0]|  99.0|171.18598011578285|
|     1.0|   [1.0]|  95.0|171.18598011578285|
|     1.0|   [1.0]| 100.0|171.18598011578285|
|     1.0|   [1.0]|2010.0|171.18598011578285|
+--------+--------+------+------------------+
only showing top 10 rows

