# Linear Regression  with pyspark

In [1]:
from pyspark.sql import SparkSession

In [2]:
saprk = SparkSession.builder.appName('lrex').getOrCreate()

### Reading csv file in pyspark

In [3]:
data = spark.read.csv('/home/jashuva/Downloads/real-estate-price-prediction/Real estate.csv',inferSchema=True,
                         header=True)

In [4]:
data.head()

Row(No=1, X1 transaction date=2012.917, X2 house age=32.0, X3 distance to the nearest MRT station=84.87882, X4 number of convenience stores=10, X5 latitude=24.98298, X6 longitude=121.54024, Y house price of unit area=37.9)

### Schema of Dataset

In [5]:
data.printSchema()

root
 |-- No: integer (nullable = true)
 |-- X1 transaction date: double (nullable = true)
 |-- X2 house age: double (nullable = true)
 |-- X3 distance to the nearest MRT station: double (nullable = true)
 |-- X4 number of convenience stores: integer (nullable = true)
 |-- X5 latitude: double (nullable = true)
 |-- X6 longitude: double (nullable = true)
 |-- Y house price of unit area: double (nullable = true)



In [6]:
for i in  data.head(2)[1]:
    print(i)

2
2012.917
19.5
306.5947
9
24.98034
121.53951
42.2


In [7]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [8]:
data.columns

['No',
 'X1 transaction date',
 'X2 house age',
 'X3 distance to the nearest MRT station',
 'X4 number of convenience stores',
 'X5 latitude',
 'X6 longitude',
 'Y house price of unit area']

### Create pyspark Dataframe  

In [9]:
assembler = VectorAssembler(inputCols=['No','X1 transaction date','X2 house age','X3 distance to the nearest MRT station','X4 number of convenience stores','X5 latitude','X6 longitude'],
                           outputCol='features')

In [10]:
data_frame = assembler.transform(data)

In [11]:
data_frame.show()

+---+-------------------+------------+--------------------------------------+-------------------------------+-----------+------------+--------------------------+--------------------+
| No|X1 transaction date|X2 house age|X3 distance to the nearest MRT station|X4 number of convenience stores|X5 latitude|X6 longitude|Y house price of unit area|            features|
+---+-------------------+------------+--------------------------------------+-------------------------------+-----------+------------+--------------------------+--------------------+
|  1|           2012.917|        32.0|                              84.87882|                             10|   24.98298|   121.54024|                      37.9|[1.0,2012.917,32....|
|  2|           2012.917|        19.5|                              306.5947|                              9|   24.98034|   121.53951|                      42.2|[2.0,2012.917,19....|
|  3|           2013.583|        13.3|                              561.9845|        

In [12]:
spark_data_frame = data_frame.select('features','Y house price of unit area')

In [13]:
spark_data_frame.show()

+--------------------+--------------------------+
|            features|Y house price of unit area|
+--------------------+--------------------------+
|[1.0,2012.917,32....|                      37.9|
|[2.0,2012.917,19....|                      42.2|
|[3.0,2013.583,13....|                      47.3|
|[4.0,2013.5,13.3,...|                      54.8|
|[5.0,2012.833,5.0...|                      43.1|
|[6.0,2012.667,7.1...|                      32.1|
|[7.0,2012.667,34....|                      40.3|
|[8.0,2013.417,20....|                      46.7|
|[9.0,2013.5,31.7,...|                      18.8|
|[10.0,2013.417,17...|                      22.1|
|[11.0,2013.083,34...|                      41.4|
|[12.0,2013.333,6....|                      58.1|
|[13.0,2012.917,13...|                      39.3|
|[14.0,2012.667,20...|                      23.8|
|[15.0,2013.5,13.2...|                      34.3|
|[16.0,2013.583,35...|                      50.5|
|[17.0,2013.25,0.0...|                      70.1|


### Normalization In Preprocessing 

In [14]:
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors

In [15]:
normalizer = Normalizer().setInputCol("features").setOutputCol("nor_features").setP(1.0)

In [16]:
l1NormData = normalizer.transform(spark_data_frame)
print("Normalized using L^1 norm")
l1NormData.show()

Normalized using L^1 norm
+--------------------+--------------------------+--------------------+
|            features|Y house price of unit area|        nor_features|
+--------------------+--------------------------+--------------------+
|[1.0,2012.917,32....|                      37.9|[4.37193055499594...|
|[2.0,2012.917,19....|                      42.2|[8.01111445997948...|
|[3.0,2013.583,13....|                      47.3|[0.00109353402190...|
|[4.0,2013.5,13.3,...|                      54.8|[0.00145755816366...|
|[5.0,2012.833,5.0...|                      43.1|[0.00194937609087...|
|[6.0,2012.667,7.1...|                      32.1|[0.00137922391663...|
|[7.0,2012.667,34....|                      40.3|[0.00247248840080...|
|[8.0,2013.417,20....|                      46.7|[0.00322341202837...|
|[9.0,2013.5,31.7,...|                      18.8|[0.00116675925744...|
|[10.0,2013.417,17...|                      22.1|[0.00251636950578...|
|[11.0,2013.083,34...|                      41.4|[0

In [17]:
spark_data_frame_norm = l1NormData.select('nor_features','Y house price of unit area')

In [18]:
spark_data_frame_norm.show()

+--------------------+--------------------------+
|        nor_features|Y house price of unit area|
+--------------------+--------------------------+
|[4.37193055499594...|                      37.9|
|[8.01111445997948...|                      42.2|
|[0.00109353402190...|                      47.3|
|[0.00145755816366...|                      54.8|
|[0.00194937609087...|                      43.1|
|[0.00137922391663...|                      32.1|
|[0.00247248840080...|                      40.3|
|[0.00322341202837...|                      46.7|
|[0.00116675925744...|                      18.8|
|[0.00251636950578...|                      22.1|
|[0.00421197150971...|                      41.4|
|[0.00526868888576...|                      58.1|
|[0.00484595307860...|                      39.3|
|[0.00299966776108...|                      23.8|
|[0.00446819056708...|                      34.3|
|[0.00572856643709...|                      50.5|
|[0.00686655110772...|                      70.1|


In [19]:
spark_data_frame_norm.describe().show()

+-------+--------------------------+
|summary|Y house price of unit area|
+-------+--------------------------+
|  count|                       414|
|   mean|         37.98019323671498|
| stddev|        13.606487697735316|
|    min|                       7.6|
|    max|                     117.5|
+-------+--------------------------+



### Splitting Data Into Training and Testing

In [20]:
train_data, test_data = spark_data_frame_norm.randomSplit([0.7,0.3])

In [21]:
train_data.show()

+--------------------+--------------------------+
|        nor_features|Y house price of unit area|
+--------------------+--------------------------+
|[4.37193055499594...|                      37.9|
|[8.01111445997948...|                      42.2|
|[0.00109353402190...|                      47.3|
|[0.00116675925744...|                      18.8|
|[0.00137922391663...|                      32.1|
|[0.00145755816366...|                      54.8|
|[0.00194937609087...|                      43.1|
|[0.00251636950578...|                      22.1|
|[0.00299966776108...|                      23.8|
|[0.00322341202837...|                      46.7|
|[0.00446819056708...|                      34.3|
|[0.00470401798063...|                      29.3|
|[0.00484595307860...|                      39.3|
|[0.00526868888576...|                      58.1|
|[0.00646383987735...|                      24.6|
|[0.00668222310479...|                      18.2|
|[0.00686655110772...|                      70.1|


In [22]:
test_data.show()

+--------------------+--------------------------+
|        nor_features|Y house price of unit area|
+--------------------+--------------------------+
|[0.00247248840080...|                      40.3|
|[0.00421197150971...|                      41.4|
|[0.00460177287040...|                      22.1|
|[0.00572396125307...|                      27.3|
|[0.00572856643709...|                      50.5|
|[0.00651200872540...|                      15.9|
|[0.00740754933605...|                      13.2|
|[0.00872703132440...|                      22.6|
|[0.00887594152608...|                      51.6|
|[0.00891973684249...|                      22.9|
|[0.00904544103033...|                      47.7|
|[0.01047048526178...|                      47.0|
|[0.01169142100716...|                      20.0|
|[0.01338621560432...|                      49.3|
|[0.01445060679790...|                      55.1|
|[0.01556009467239...|                      34.7|
|[0.01959283336627...|                      21.8|


### Calling Linear Regression Algorithm from 'pyspark.ml.regression'  

In [23]:
from pyspark.ml.regression import LinearRegression

In [24]:
reg = LinearRegression(labelCol='Y house price of unit area',featuresCol='nor_features')

### Fittig Train Data to the Model 

In [25]:
model =  reg.fit(train_data)

In [26]:
unlable_data = test_data.select("nor_features")

In [27]:
unlable_data.show()

+--------------------+
|        nor_features|
+--------------------+
|[0.00247248840080...|
|[0.00421197150971...|
|[0.00460177287040...|
|[0.00572396125307...|
|[0.00572856643709...|
|[0.00651200872540...|
|[0.00740754933605...|
|[0.00872703132440...|
|[0.00887594152608...|
|[0.00891973684249...|
|[0.00904544103033...|
|[0.01047048526178...|
|[0.01169142100716...|
|[0.01338621560432...|
|[0.01445060679790...|
|[0.01556009467239...|
|[0.01959283336627...|
|[0.02177855856209...|
|[0.02348221742957...|
|[0.02429198213569...|
+--------------------+
only showing top 20 rows



### Predicting The Test Data 

In [28]:
predictions = model.transform(unlable_data)
predictions.show()

+--------------------+------------------+
|        nor_features|        prediction|
+--------------------+------------------+
|[0.00247248840080...| 37.15339180369847|
|[0.00421197150971...| 37.12366622522677|
|[0.00460177287040...| 15.16310926182814|
|[0.00572396125307...|26.056492064653867|
|[0.00572856643709...|39.182447237039014|
|[0.00651200872540...| 14.76161146533741|
|[0.00740754933605...|13.194796443653104|
|[0.00872703132440...| 15.26208299924292|
|[0.00887594152608...| 51.78590859740507|
|[0.00891973684249...| 26.81525280337337|
|[0.00904544103033...| 52.88409327393674|
|[0.01047048526178...| 43.52262614967731|
|[0.01169142100716...|15.345543167303731|
|[0.01338621560432...|48.801329897240066|
|[0.01445060679790...| 51.10493097794824|
|[0.01556009467239...| 35.74360438300755|
|[0.01959283336627...|23.639897136475156|
|[0.02177855856209...|22.462885620521774|
|[0.02348221742957...|24.602652130572096|
|[0.02429198213569...| 35.17239173834059|
+--------------------+------------

### Evaluating Algorithm Performance

In [29]:
trainingSummary = model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 8.496913
r2: 0.644609
