In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [24]:
spark = SparkSession.builder.appName("housing").getOrCreate()

In [25]:
file_path = "/content/housing.csv"
data = spark.read.csv(file_path, header=True, inferSchema=True)
data.show()

+----------+-----+------------------+-------------------+-----------+--------+----------------+------------------+---------------+----------------------+------------------+-------------------------------------+--------------------+----------+---------------+-----------+---------+---------+-----------------+--------------+------------------------+-------------------------+-------+
|        id| Date|number of bedrooms|number of bathrooms|living area|lot area|number of floors|waterfront present|number of views|condition of the house|grade of the house|Area of the house(excluding basement)|Area of the basement|Built Year|Renovation Year|Postal Code|Lattitude|Longitude|living_area_renov|lot_area_renov|Number of schools nearby|Distance from the airport|  Price|
+----------+-----+------------------+-------------------+-----------+--------+----------------+------------------+---------------+----------------------+------------------+-------------------------------------+--------------------+---

In [26]:
# Remove null values
data.dropna()

DataFrame[id: bigint, Date: int, number of bedrooms: int, number of bathrooms: double, living area: int, lot area: int, number of floors: double, waterfront present: string, number of views: int, condition of the house: int, grade of the house: int, Area of the house(excluding basement): int, Area of the basement: int, Built Year: int, Renovation Year: int, Postal Code: int, Lattitude: double, Longitude: double, living_area_renov: int, lot_area_renov: int, Number of schools nearby: int, Distance from the airport: int, Price: int]

In [27]:
# Perform one hot encoding of columns with categorical values (yes/no) using StringIndexer
indexer = StringIndexer(inputCol = "waterfront present", outputCol = "WaterFrontIndex")
data = indexer.fit(data).transform(data)
data = data.drop('waterfront present')
data.show()

+----------+-----+------------------+-------------------+-----------+--------+----------------+---------------+----------------------+------------------+-------------------------------------+--------------------+----------+---------------+-----------+---------+---------+-----------------+--------------+------------------------+-------------------------+-------+---------------+
|        id| Date|number of bedrooms|number of bathrooms|living area|lot area|number of floors|number of views|condition of the house|grade of the house|Area of the house(excluding basement)|Area of the basement|Built Year|Renovation Year|Postal Code|Lattitude|Longitude|living_area_renov|lot_area_renov|Number of schools nearby|Distance from the airport|  Price|WaterFrontIndex|
+----------+-----+------------------+-------------------+-----------+--------+----------------+---------------+----------------------+------------------+-------------------------------------+--------------------+----------+---------------+-

In [28]:
model_features = ['number of bedrooms',	'number of bathrooms',	'living area',	'WaterFrontIndex',	'number of floors',	'number of views',	'condition of the house',	'grade of the house',	'Area of the house(excluding basement)',	'Area of the basement']

In [29]:
# Create a new column called "FeatureVector" that contains all the model_features columns as a single one dimensional vector
assembler = VectorAssembler(inputCols=model_features, outputCol="FeatureVector")
data = assembler.transform(data)
data.show()

+----------+-----+------------------+-------------------+-----------+--------+----------------+---------------+----------------------+------------------+-------------------------------------+--------------------+----------+---------------+-----------+---------+---------+-----------------+--------------+------------------------+-------------------------+-------+---------------+--------------------+
|        id| Date|number of bedrooms|number of bathrooms|living area|lot area|number of floors|number of views|condition of the house|grade of the house|Area of the house(excluding basement)|Area of the basement|Built Year|Renovation Year|Postal Code|Lattitude|Longitude|living_area_renov|lot_area_renov|Number of schools nearby|Distance from the airport|  Price|WaterFrontIndex|       FeatureVector|
+----------+-----+------------------+-------------------+-----------+--------+----------------+---------------+----------------------+------------------+-------------------------------------+-------

In [31]:
# Normalise the numerical features using MinMaxScaler

scaler = MinMaxScaler(inputCol="FeatureVector", outputCol="ScaledFeatureVector")
data = scaler.fit(data).transform(data)
data.show()

+----------+-----+------------------+-------------------+-----------+--------+----------------+---------------+----------------------+------------------+-------------------------------------+--------------------+----------+---------------+-----------+---------+---------+-----------------+--------------+------------------------+-------------------------+-------+---------------+--------------------+--------------------+
|        id| Date|number of bedrooms|number of bathrooms|living area|lot area|number of floors|number of views|condition of the house|grade of the house|Area of the house(excluding basement)|Area of the basement|Built Year|Renovation Year|Postal Code|Lattitude|Longitude|living_area_renov|lot_area_renov|Number of schools nearby|Distance from the airport|  Price|WaterFrontIndex|       FeatureVector| ScaledFeatureVector|
+----------+-----+------------------+-------------------+-----------+--------+----------------+---------------+----------------------+------------------+---

In [32]:
# Create a new DataFrame with input column as ScaledFeatureColumn and output column as Price

df = data.select(col("ScaledFeatureVector").alias("features"), col("Price").alias("label"))
df.show()

+--------------------+-------+
|            features|  label|
+--------------------+-------+
|[0.09375,0.266666...|1400000|
|[0.125,0.3,0.1928...|1200000|
|[0.09375,0.266666...| 838000|
|[0.0625,0.2,0.177...| 805000|
|[0.0625,0.2666666...| 790000|
|[0.125,0.36666666...| 785000|
|[0.0625,0.1666666...| 750000|
|[0.0625,0.2666666...| 750000|
|[0.09375,0.233333...| 698000|
|[0.125,0.26666666...| 675000|
|[0.09375,0.2,0.11...| 650000|
|[0.09375,0.2,0.08...| 640000|
|[0.09375,0.3,0.17...| 630000|
|[0.0625,0.2333333...| 626000|
|[0.09375,0.266666...| 625000|
|[0.09375,0.366666...| 625000|
|[0.0625,0.1666666...| 615000|
|[0.09375,0.266666...| 612500|
|[0.0625,0.2666666...| 604000|
|[0.0625,0.1666666...| 588500|
+--------------------+-------+
only showing top 20 rows



In [34]:
# Split the dataset into training and test sets

train_data, test_data = df.randomSplit([0.8, 0.2])

In [35]:
# Create and train the LinearRegression Model

model = LinearRegression(featuresCol="features", labelCol="label")
model = model.fit(train_data)

In [36]:
# Predict using the model on the test data

predictions = model.transform(test_data)
predictions.show()

+--------------------+------+-------------------+
|            features| label|         prediction|
+--------------------+------+-------------------+
|(10,[0,1,2,7,8],[...|255000| 16269.438387553644|
|(10,[0,1,2,7,8],[...|235000|    33734.745750751|
|(10,[1,2,5,6,8],[...|355000|  76727.96187848778|
|(10,[1,2,6,7,8],[...|245000|-10526.740356611816|
|(10,[1,2,6,7,8],[...|280000|   -2764.3815285241|
|(10,[1,2,6,7,8],[...|202000| 127535.01620461157|
|(10,[1,2,6,7,8],[...|275000| 14280.268752407894|
|(10,[1,2,6,7,8],[...|129000|  97724.88663046039|
|(10,[1,2,6,7,8],[...|350000|  243829.6289063283|
|(10,[1,2,6,7,8],[...|199900| 150401.43560660933|
|(10,[1,2,6,7,8],[...|335000| 150401.43560660933|
|(10,[1,2,6,7,8],[...|185000| 208618.38729337673|
|(10,[1,2,6,7,8],[...|395000| 249651.39802739403|
|(10,[1,2,6,7,8],[...|439000| 261294.93626952567|
|(10,[1,2,6,7,8],[...|525000|  269057.2950976134|
|(10,[1,2,6,7,8],[...|165000| 117412.15011112706|
|(10,[1,2,6,7,8],[...|210000| 191153.81945406995|


In [37]:
# Evaluate the predictions using RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("RMSE Value is ", rmse)

RMSE Value is  222317.62214104968


In [38]:
# Display model's coefficients and intercept
print("Model Coefficients = ", model.coefficients)
print("Model Intercept = ", model.intercept)

Model Coefficients =  [-1125228.8386789863,-70837.4036786206,1348240.2268363484,634148.3520632961,3881.1680609771997,194476.555313532,232867.80674706955,875784.3544238652,828849.5377749648,578345.7448001707]
Model Intercept =  -223849.35936164396


In [40]:
# Display the feature importance

feature_importance = list(zip(model_features, model.coefficients))
for feature, importance in feature_importance:
  print(f"{feature} = {importance}")

number of bedrooms = -1125228.8386789863
number of bathrooms = -70837.4036786206
living area = 1348240.2268363484
WaterFrontIndex = 634148.3520632961
number of floors = 3881.1680609771997
number of views = 194476.555313532
condition of the house = 232867.80674706955
grade of the house = 875784.3544238652
Area of the house(excluding basement) = 828849.5377749648
Area of the basement = 578345.7448001707


In [None]:
spark.stop()