In [1]:
#"F:\DATA\SPARK\practice spark\Real estate.csv"

In [2]:
import findspark
import pyspark
findspark.find()
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
import os
import sys
os.environ['SPARK_HOME']= r'C:\SPARK\spark-3.0.0-bin-hadoop2.7'

In [3]:
if __name__ == "__main__":
    spark = SparkSession\
        .builder\
        .appName("LinearReg_spark")\
        .getOrCreate()

In [4]:
dataset = spark.read.csv('Real estate.csv',header= True)

In [5]:
dataset.show()

+---+-------------------+------------+--------------------------------------+-------------------------------+-----------+------------+--------------------------+
| No|X1 transaction date|X2 house age|X3 distance to the nearest MRT station|X4 number of convenience stores|X5 latitude|X6 longitude|Y house price of unit area|
+---+-------------------+------------+--------------------------------------+-------------------------------+-----------+------------+--------------------------+
|  1|           2012.917|          32|                              84.87882|                             10|   24.98298|   121.54024|                      37.9|
|  2|           2012.917|        19.5|                              306.5947|                              9|   24.98034|   121.53951|                      42.2|
|  3|           2013.583|        13.3|                              561.9845|                              5|   24.98746|   121.54391|                      47.3|
|  4|           2013.500|   

In [8]:
colm = ['No','X1 transaction date']
df = dataset.select([column for column in dataset.columns if column not in colm])

In [10]:
df.printSchema()

root
 |-- X2 house age: string (nullable = true)
 |-- X3 distance to the nearest MRT station: string (nullable = true)
 |-- X4 number of convenience stores: string (nullable = true)
 |-- X5 latitude: string (nullable = true)
 |-- X6 longitude: string (nullable = true)
 |-- Y house price of unit area: string (nullable = true)



In [11]:
from pyspark.sql.functions import col
df = df.select(*(col(c).cast('float').alias(c) for c in df.columns))

In [12]:
df.printSchema()

root
 |-- X2 house age: float (nullable = true)
 |-- X3 distance to the nearest MRT station: float (nullable = true)
 |-- X4 number of convenience stores: float (nullable = true)
 |-- X5 latitude: float (nullable = true)
 |-- X6 longitude: float (nullable = true)
 |-- Y house price of unit area: float (nullable = true)



In [14]:
from pyspark.sql.functions import col, count, isnan, when
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+------------+--------------------------------------+-------------------------------+-----------+------------+--------------------------+
|X2 house age|X3 distance to the nearest MRT station|X4 number of convenience stores|X5 latitude|X6 longitude|Y house price of unit area|
+------------+--------------------------------------+-------------------------------+-----------+------------+--------------------------+
|           0|                                     0|                              0|          0|           0|                         0|
+------------+--------------------------------------+-------------------------------+-----------+------------+--------------------------+



In [22]:
from functools import reduce

oldColumns = df.schema.names
newColumns = ['Age','Distance_2_MRT','Stores','Latitude','Longitude','Price']

df = reduce(lambda df, idx: df.withColumnRenamed(oldColumns[idx], newColumns[idx]),range(len(oldColumns)), df)


In [23]:
df.printSchema()

root
 |-- Age: float (nullable = true)
 |-- Distance_2_MRT: float (nullable = true)
 |-- Stores: float (nullable = true)
 |-- Latitude: float (nullable = true)
 |-- Longitude: float (nullable = true)
 |-- Price: float (nullable = true)



In [24]:
df.show()

+----+--------------+------+--------+---------+-----+
| Age|Distance_2_MRT|Stores|Latitude|Longitude|Price|
+----+--------------+------+--------+---------+-----+
|32.0|      84.87882|  10.0|24.98298|121.54024| 37.9|
|19.5|      306.5947|   9.0|24.98034|121.53951| 42.2|
|13.3|      561.9845|   5.0|24.98746|121.54391| 47.3|
|13.3|      561.9845|   5.0|24.98746|121.54391| 54.8|
| 5.0|      390.5684|   5.0|24.97937|121.54245| 43.1|
| 7.1|       2175.03|   3.0|24.96305|121.51254| 32.1|
|34.5|      623.4731|   7.0|24.97933|121.53642| 40.3|
|20.3|      287.6025|   6.0|24.98042|121.54228| 46.7|
|31.7|      5512.038|   1.0|24.95095|121.48458| 18.8|
|17.9|       1783.18|   3.0|24.96731|121.51486| 22.1|
|34.8|      405.2134|   1.0|24.97349|121.53372| 41.4|
| 6.3|      90.45606|   9.0|24.97433| 121.5431| 58.1|
|13.0|      492.2313|   5.0|24.96515|121.53737| 39.3|
|20.4|      2469.645|   4.0|24.96108|121.51046| 23.8|
|13.2|      1164.838|   4.0|24.99156|121.53406| 34.3|
|35.7|      579.2083|   2.0|

In [25]:
features = df.drop('Price')

In [26]:
from pyspark.ml.feature import VectorAssembler
#let's assemble our features together using vectorAssembler
assembler = VectorAssembler(
    inputCols=features.columns,
    outputCol="features")

In [27]:
output = assembler.transform(df).select('features','Price')


In [28]:
train,test = output.randomSplit([0.75, 0.25])

In [29]:
train.show()

+--------------------+-----+
|            features|Price|
+--------------------+-----+
|[0.0,185.42959594...| 37.9|
|[0.0,185.42959594...| 45.5|
|[0.0,185.42959594...| 52.2|
|[0.0,185.42959594...| 55.2|
|[0.0,208.39050292...| 44.0|
|[0.0,208.39050292...| 45.7|
|[0.0,274.01440429...| 43.5|
|[0.0,274.01440429...| 45.4|
|[0.0,274.01440429...| 52.2|
|[0.0,292.99780273...| 63.3|
|[0.0,292.99780273...| 69.7|
|[0.0,292.99780273...| 70.1|
|[0.0,292.99780273...| 73.6|
|[0.0,338.96789550...| 44.9|
|[0.0,338.96789550...| 50.8|
|[1.0,193.58450317...| 50.7|
|[1.10000002384185...| 45.1|
|[1.10000002384185...| 48.6|
|[1.10000002384185...| 49.0|
|[1.10000002384185...| 54.4|
+--------------------+-----+
only showing top 20 rows



In [30]:
test.show()

+--------------------+-----+
|            features|Price|
+--------------------+-----+
|[0.0,185.42959594...| 55.3|
|[0.0,292.99780273...| 71.0|
|[2.09999990463256...| 45.5|
|[3.70000004768371...| 41.6|
|[4.09999990463256...| 31.3|
|[5.09999990463256...| 28.9|
|[5.09999990463256...| 35.6|
|[6.19999980926513...| 31.3|
|[6.30000019073486...| 58.1|
|[6.40000009536743...| 59.5|
|[6.59999990463256...| 58.1|
|[6.80000019073486...| 54.4|
|[7.59999990463256...| 27.7|
|[8.0,132.54690551...| 47.3|
|[8.0,2216.6120605...| 23.9|
|[8.10000038146972...| 51.6|
|[8.5,104.81009674...| 56.8|
|[9.0,1402.0159912...| 38.5|
|[10.0,942.4663696...| 43.5|
|[10.1000003814697...| 47.9|
+--------------------+-----+
only showing top 20 rows



In [31]:
from pyspark.ml.regression import LinearRegression
lin_reg = LinearRegression(featuresCol = 'features', labelCol='Price')
linear_model = lin_reg.fit(train)

In [32]:
print("Coefficients: " + str(linear_model.coefficients))
print("\nIntercept: " + str(linear_model.intercept))

Coefficients: [-0.2845380180805475,-0.004727311005402087,1.187968326885585,201.55230488460887,-43.50846789357342]

Intercept: 298.6774040798928


In [33]:
trainSummary = linear_model.summary
print("RMSE: %f" % trainSummary.rootMeanSquaredError)
print("\nr2: %f" % trainSummary.r2)

RMSE: 9.110080

r2: 0.554706


In [66]:
from  pyspark.sql.functions import abs
predictions = linear_model.transform(test)
x =((predictions['Price']-predictions['prediction'])/predictions['Price'])*100
predictions = predictions.withColumn('Accuracy',abs(x))
predictions.select("prediction","Price","Accuracy","features").show()

+------------------+-----+-------------------+--------------------+
|        prediction|Price|           Accuracy|            features|
+------------------+-----+-------------------+--------------------+
| 43.12547834643925| 55.3|  22.01540878586896|[0.0,185.42959594...|
| 50.46230673101314| 71.0| 28.926328547868813|[0.0,292.99780273...|
| 47.46100355578261| 45.5|  4.309897924796947|[2.09999990463256...|
|  46.8530780492884| 41.6|  12.62759559579116|[3.70000004768371...|
|35.434011977095054| 31.3| 13.207708756553762|[4.09999990463256...|
|39.619817126403234| 28.9|  37.09279463450084|[5.09999990463256...|
| 39.33273298109731| 35.6| 10.485209738673646|[5.09999990463256...|
|29.351057585089677| 31.3|  6.226650797049346|[6.19999980926513...|
| 52.62887519936618| 58.1|  9.416735659970566|[6.30000019073486...|
| 52.60042142469416| 59.5| 11.595930378665274|[6.40000009536743...|
| 52.54351387534922| 58.1|  9.563657047679346|[6.59999990463256...|
| 54.38035537626644| 54.4|0.03611424459818044|[6

In [67]:
from pyspark.ml.evaluation import RegressionEvaluator
pred_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="Price",metricName="r2")
print("R Squared (R2) on test data = %g" % pred_evaluator.evaluate(predictions))

R Squared (R2) on test data = 0.610204


In [69]:
def adj_r2(x):
    r2 = trainSummary.r2
    n = df.count()
    p = len(df.columns)
    adjusted_r2 = 1-(1-r2)*(n-1)/(n-p-1)
    return adjusted_r2
adj_r2(train)

0.548141210028837

In [70]:
adj_r2(test)

0.548141210028837

In [80]:
lin_reg = LinearRegression(featuresCol = 'features', labelCol='Price',maxIter=50, regParam=0.12, elasticNetParam=0.2)
linear_model = lin_reg.fit(train)

In [79]:
linear_model.summary.rootMeanSquaredError

9.110906766525105

In [107]:
features_rdd = features.rdd.map(lambda row: row[0:])

In [108]:
features_rdd.collect()

[(32.0, 84.87882232666016, 10.0, 24.982980728149414, 121.54023742675781),
 (19.5, 306.5946960449219, 9.0, 24.98033905029297, 121.53951263427734),
 (13.300000190734863,
  561.9844970703125,
  5.0,
  24.987459182739258,
  121.54390716552734),
 (13.300000190734863,
  561.9844970703125,
  5.0,
  24.987459182739258,
  121.54390716552734),
 (5.0, 390.5683898925781, 5.0, 24.9793701171875, 121.54244995117188),
 (7.099999904632568,
  2175.030029296875,
  3.0,
  24.963050842285156,
  121.51254272460938),
 (34.5, 623.4730834960938, 7.0, 24.97933006286621, 121.53642272949219),
 (20.299999237060547,
  287.6025085449219,
  6.0,
  24.980419158935547,
  121.54228210449219),
 (31.700000762939453,
  5512.0380859375,
  1.0,
  24.950950622558594,
  121.48458099365234),
 (17.899999618530273,
  1783.1800537109375,
  3.0,
  24.967309951782227,
  121.51486206054688),
 (34.79999923706055,
  405.2134094238281,
  1.0,
  24.97348976135254,
  121.53372192382812),
 (6.300000190734863,
  90.45606231689453,
  9.0,
  

In [109]:
from pyspark.mllib.feature import StandardScaler
scaler1 = StandardScaler().fit(features_rdd)
scaled_features=scaler1.transform(features_rdd)

In [110]:
for data in scaled_features.collect():
    print(data)

[2.808869299771829,0.06725154690948568,3.3949381000162235,2013.0949590447945,7919.393034999268]
[1.7116547295484583,0.24292240417652702,3.0554442900146013,2012.8820961989197,7919.345808528075]
[1.1674363194598323,0.4452739297168213,1.6974690500081118,2013.4558268874762,7919.63215007973]
[1.1674363194598323,0.4452739297168213,1.6974690500081118,2013.4558268874762,7919.63215007973]
[0.4388858280893483,0.3094567958675935,1.6974690500081118,2012.8040208735013,7919.537200016594]
[0.6232178675157918,1.723328976948524,1.0184814300048672,2011.4890356682267,7917.58848667894]
[3.0283122138165033,0.49399282615128015,2.3764566700113567,2012.8007933502065,7919.144474624567]
[1.7818763950740915,0.227874433981389,2.0369628600097345,2012.8885512455092,7919.526263360108]
[2.782536217055131,4.367321291012587,0.3394938100016224,2010.5140162500102,7915.7665391323835]
[1.5712112310755355,1.4128567496928301,1.0184814300048672,2011.8322289785795,7917.739611386758]
[3.0546452965332014,0.3210603995816714,0.339

In [111]:
df =scaled_features.map(lambda x: (x, )).toDF(["Scaled_data"])

In [113]:
df.show()

+--------------------+
|         Scaled_data|
+--------------------+
|[2.80886929977182...|
|[1.71165472954845...|
|[1.16743631945983...|
|[1.16743631945983...|
|[0.43888582808934...|
|[0.62321786751579...|
|[3.02831221381650...|
|[1.78187639507409...|
|[2.78253621705513...|
|[1.57121123107553...|
|[3.05464529653320...|
|[0.55299616013474...|
|[1.14110315303230...|
|[1.79065414512020...|
|[1.15865856941371...|
|[3.13364487952660...|
|[0.0,0.2321492562...|
|[1.55365589840495...|
|[1.48343406545766...|
|[0.13166574842680...|
+--------------------+
only showing top 20 rows

