# Creating the pyspark session

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master('local').appName('linear_regression').getOrCreate()

22/06/04 19:10:58 WARN Utils: Your hostname, Yashs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.29.55 instead (on interface en0)
22/06/04 19:10:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/04 19:10:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Load the data

In [3]:
data = spark.read.csv('data/real_estate.csv', inferSchema=True, header=True)
data.show(5)

+---+-------------------+------------+--------------------------------------+-------------------------------+-----------+------------+--------------------------+
| No|X1 transaction date|X2 house age|X3 distance to the nearest MRT station|X4 number of convenience stores|X5 latitude|X6 longitude|Y house price of unit area|
+---+-------------------+------------+--------------------------------------+-------------------------------+-----------+------------+--------------------------+
|  1|           2012.917|        32.0|                              84.87882|                             10|   24.98298|   121.54024|                      37.9|
|  2|           2012.917|        19.5|                              306.5947|                              9|   24.98034|   121.53951|                      42.2|
|  3|           2013.583|        13.3|                              561.9845|                              5|   24.98746|   121.54391|                      47.3|
|  4|             2013.5|   

In [4]:
data.printSchema()

root
 |-- No: integer (nullable = true)
 |-- X1 transaction date: double (nullable = true)
 |-- X2 house age: double (nullable = true)
 |-- X3 distance to the nearest MRT station: double (nullable = true)
 |-- X4 number of convenience stores: integer (nullable = true)
 |-- X5 latitude: double (nullable = true)
 |-- X6 longitude: double (nullable = true)
 |-- Y house price of unit area: double (nullable = true)



In [5]:
data.describe().toPandas()

Unnamed: 0,summary,No,X1 transaction date,X2 house age,X3 distance to the nearest MRT station,X4 number of convenience stores,X5 latitude,X6 longitude,Y house price of unit area
0,count,414.0,414.0,414.0,414.0,414.0,414.0,414.0,414.0
1,mean,207.5,2013.148971014493,17.71256038647343,1083.8856889130436,4.094202898550725,24.969030072463745,121.53336108695667,37.98019323671498
2,stddev,119.6557562342907,0.2819672402629999,11.392484533242524,1262.109595407851,2.9455618056636177,0.0124101965904502,0.0153471830045923,13.606487697735316
3,min,1.0,2012.667,0.0,23.38284,0.0,24.93207,121.47353,7.6
4,max,414.0,2013.583,43.8,6488.021,10.0,25.01459,121.56627,117.5


In [6]:
data.columns

['No',
 'X1 transaction date',
 'X2 house age',
 'X3 distance to the nearest MRT station',
 'X4 number of convenience stores',
 'X5 latitude',
 'X6 longitude',
 'Y house price of unit area']

# Combine feature into an array and name this column as features

In [7]:
from pyspark.ml.feature import VectorAssembler

In [8]:
assembler = VectorAssembler(inputCols=[
    'X1 transaction date',
    'X2 house age',
    'X3 distance to the nearest MRT station',
    'X4 number of convenience stores',
    'X5 latitude',
    'X6 longitude'
], outputCol='features')

data_set = assembler.transform(data)
data_set.select(['features', 'Y house price of unit area']).show(5)

+--------------------+--------------------------+
|            features|Y house price of unit area|
+--------------------+--------------------------+
|[2012.917,32.0,84...|                      37.9|
|[2012.917,19.5,30...|                      42.2|
|[2013.583,13.3,56...|                      47.3|
|[2013.5,13.3,561....|                      54.8|
|[2012.833,5.0,390...|                      43.1|
+--------------------+--------------------------+
only showing top 5 rows



# Split the data into train and test

In [9]:
train_data, test_data = data_set.randomSplit([0.7, 0.3])

In [10]:
train_data.show(2), test_data.show(2)

+---+-------------------+------------+--------------------------------------+-------------------------------+-----------+------------+--------------------------+--------------------+
| No|X1 transaction date|X2 house age|X3 distance to the nearest MRT station|X4 number of convenience stores|X5 latitude|X6 longitude|Y house price of unit area|            features|
+---+-------------------+------------+--------------------------------------+-------------------------------+-----------+------------+--------------------------+--------------------+
|  1|           2012.917|        32.0|                              84.87882|                             10|   24.98298|   121.54024|                      37.9|[2012.917,32.0,84...|
|  2|           2012.917|        19.5|                              306.5947|                              9|   24.98034|   121.53951|                      42.2|[2012.917,19.5,30...|
+---+-------------------+------------+--------------------------------------+--------

(None, None)

# Training the Linear Regression model

In [11]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(labelCol='Y house price of unit area')
lrModel = lr.fit(train_data)

22/06/04 19:11:03 WARN Instrumentation: [2ee7d0b0] regParam is zero, which might cause numerical instability and overfitting.
22/06/04 19:11:03 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/06/04 19:11:03 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/06/04 19:11:03 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


# Predict

In [12]:
test_stats = lrModel.evaluate(test_data)

# Performance measurement

In [13]:
print(f"RMSE: {test_stats.rootMeanSquaredError}")
print(f"R2: {test_stats.r2}")
print(f"MSE: {test_stats.meanSquaredError}")

RMSE: 8.555293620953073
R2: 0.5967683479771075
MSE: 73.19304894072033
