In [1]:
import findspark
findspark.init('/home/tarik/spark')

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('lrex').getOrCreate()

In [4]:
from pyspark.ml.regression import LinearRegression

In [5]:
import pandas as pd

In [6]:
df = spark.read.csv(path="hdfs://localhost:9000/Melbourne_housing_FULL.csv",inferSchema=True,header=True)

In [7]:
df.show()

+----------+-------------------+-----+----+-------+------+-------+---------+--------+--------+--------+--------+----+--------+------------+---------+------------------+---------+----------+--------------------+-------------+
|    Suburb|            Address|Rooms|Type|  Price|Method|SellerG|     Date|Distance|Postcode|Bedroom2|Bathroom| Car|Landsize|BuildingArea|YearBuilt|       CouncilArea|Lattitude|Longtitude|          Regionname|Propertycount|
+----------+-------------------+-----+----+-------+------+-------+---------+--------+--------+--------+--------+----+--------+------------+---------+------------------+---------+----------+--------------------+-------------+
|Abbotsford|      68 Studley St|    2|   h|   null|    SS| Jellis|3/09/2016|     2.5|    3067|       2|       1|   1|     126|        null|     null|Yarra City Council| -37.8014|  144.9958|Northern Metropol...|         4019|
|Abbotsford|       85 Turner St|    2|   h|1480000|     S| Biggin|3/12/2016|     2.5|    3067|      

In [8]:
from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+------+-------+-----+----+-----+------+-------+----+--------+--------+--------+--------+----+--------+------------+---------+-----------+---------+----------+----------+-------------+
|Suburb|Address|Rooms|Type|Price|Method|SellerG|Date|Distance|Postcode|Bedroom2|Bathroom| Car|Landsize|BuildingArea|YearBuilt|CouncilArea|Lattitude|Longtitude|Regionname|Propertycount|
+------+-------+-----+----+-----+------+-------+----+--------+--------+--------+--------+----+--------+------------+---------+-----------+---------+----------+----------+-------------+
|     0|      0|    0|   0| 7610|     0|      0|   0|       0|       0|    8217|    8226|8728|   11810|       21115|    19306|          0|     7976|      7976|         0|            0|
+------+-------+-----+----+-----+------+-------+----+--------+--------+--------+--------+----+--------+------------+---------+-----------+---------+----------+----------+-------------+



In [9]:
df.describe().show()

+-------+----------+----------------+------------------+-----+-----------------+------+-----------+---------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+-------------------+-------------------+----------------+------------------+
|summary|    Suburb|         Address|             Rooms| Type|            Price|Method|    SellerG|     Date|          Distance|          Postcode|          Bedroom2|          Bathroom|               Car|          Landsize|      BuildingArea|         YearBuilt|         CouncilArea|          Lattitude|         Longtitude|      Regionname|     Propertycount|
+-------+----------+----------------+------------------+-----+-----------------+------+-----------+---------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+

In [10]:
#count the number of original data rows
n1 = df.count()
print("number of original data rows: ", n1)
#count the number of data rows after deleting duplicated data
n2 = df.dropDuplicates().count()
print("number of data rows after deleting duplicated data: ", n2)
n3 = n1 - n2
print("number of duplicated data: ", n3)

number of original data rows:  34857
number of data rows after deleting duplicated data:  34856
number of duplicated data:  1


In [11]:
dfNoMissingValue = df.dropDuplicates().dropna(
    how="any")# use how="all" for all column missing data
numberOfMissingValueAny = n1 - dfNoMissingValue.count()
print("number of missing value rows: ", numberOfMissingValueAny)

number of missing value rows:  25970


In [12]:
df.printSchema()

root
 |-- Suburb: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Rooms: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Price: integer (nullable = true)
 |-- Method: string (nullable = true)
 |-- SellerG: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- Postcode: string (nullable = true)
 |-- Bedroom2: integer (nullable = true)
 |-- Bathroom: integer (nullable = true)
 |-- Car: integer (nullable = true)
 |-- Landsize: integer (nullable = true)
 |-- BuildingArea: double (nullable = true)
 |-- YearBuilt: integer (nullable = true)
 |-- CouncilArea: string (nullable = true)
 |-- Lattitude: double (nullable = true)
 |-- Longtitude: double (nullable = true)
 |-- Regionname: string (nullable = true)
 |-- Propertycount: string (nullable = true)



In [13]:
from pyspark.sql import functions as f
df = df.withColumn('year', f.substring('Date', -4,4))

In [14]:
from pyspark.sql.types import *

df = df.withColumn("Rooms", df["Rooms"].cast(FloatType())) \
   .withColumn("Bedroom2", df["Bedroom2"].cast(FloatType())) \
   .withColumn("Bathroom",df["Bathroom"].cast(FloatType())) \
   .withColumn("Car", df["Car"].cast(FloatType())) \
   .withColumn("Landsize", df["Landsize"].cast(FloatType())) \
   .withColumn("BuildingArea", df["BuildingArea"].cast(FloatType())) \
   .withColumn("YearBuilt", df["YearBuilt"].cast(FloatType())) \
   .withColumn("Lattitude", df["Lattitude"].cast(FloatType())) \
   .withColumn("Longtitude", df["Longtitude"].cast(FloatType()))

In [15]:
df.printSchema()

root
 |-- Suburb: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Rooms: float (nullable = true)
 |-- Type: string (nullable = true)
 |-- Price: integer (nullable = true)
 |-- Method: string (nullable = true)
 |-- SellerG: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- Postcode: string (nullable = true)
 |-- Bedroom2: float (nullable = true)
 |-- Bathroom: float (nullable = true)
 |-- Car: float (nullable = true)
 |-- Landsize: float (nullable = true)
 |-- BuildingArea: float (nullable = true)
 |-- YearBuilt: float (nullable = true)
 |-- CouncilArea: string (nullable = true)
 |-- Lattitude: float (nullable = true)
 |-- Longtitude: float (nullable = true)
 |-- Regionname: string (nullable = true)
 |-- Propertycount: string (nullable = true)
 |-- year: string (nullable = true)



In [16]:
meanPrice = df.groupBy().avg("Price").take(1)[0][0]
print("mean Price: ", meanPrice)
meanRooms = df.groupBy().avg("Rooms").take(1)[0][0]
print("mean Rooms: ", meanRooms)
MeanBedroom2 = df.groupBy().avg("Bedroom2").take(1)[0][0]
print("mean Bedroom2: ", MeanBedroom2)
MeanBathroom = df.groupBy().avg("Bathroom").take(1)[0][0]
print("mean Bathroom: ", MeanBathroom)
MeanCar = df.groupBy().avg("Car").take(1)[0][0]
print("mean Car: ", MeanCar)
MeanLandsize = df.groupBy().avg("Landsize").take(1)[0][0]
print("mean Landsize: ", MeanLandsize)
MeanBuildingArea = df.groupBy().avg("BuildingArea").take(1)[0][0]
print("mean BuildingArea: ", MeanBuildingArea)
MeanYearBuilt = df.groupBy().avg("YearBuilt").take(1)[0][0]
print("mean YearBuilt: ", MeanYearBuilt)

mean Price:  1050173.344955408
mean Rooms:  3.0310124221820582
mean Bedroom2:  3.0846471471471473
mean Bathroom:  1.624798167549097
mean Car:  1.7288453442535114
mean Landsize:  593.598993361392
mean BuildingArea:  160.2564003607925
mean YearBuilt:  1965.289884894862


In [17]:
df=df.fillna(
    {
        'Car': MeanCar,
        'Rooms': meanRooms, 'Landsize': MeanLandsize,
        'Bedroom2': MeanBedroom2, 'BuildingArea': MeanBuildingArea,
        'Bathroom': MeanBathroom, 'YearBuilt': MeanYearBuilt
    })
#just for experiment
df.groupBy().avg("Price").show()

+-----------------+
|       avg(Price)|
+-----------------+
|1050173.344955408|
+-----------------+



In [18]:
df.select('Price').describe().show()

+-------+-----------------+
|summary|            Price|
+-------+-----------------+
|  count|            27247|
|   mean|1050173.344955408|
| stddev|641467.1301045999|
|    min|            85000|
|    max|         11200000|
+-------+-----------------+



In [19]:
df.filter(df['Price'] < 300000).select('Price').count()

208

In [20]:
df = df.filter(df['Price'] > 300000)

In [21]:
df.filter(df['Price'] > 4000000).select('Price').count()

136

In [22]:
df = df.filter(df['Price'] < 4000000)

In [23]:
df.select('Rooms').describe().show()

+-------+------------------+
|summary|             Rooms|
+-------+------------------+
|  count|             26861|
|   mean| 2.999776627824727|
| stddev|0.9331329517945214|
|    min|               1.0|
|    max|              12.0|
+-------+------------------+



In [24]:
df.filter(df['Rooms'] > 8).select('Rooms').show()

+-----+
|Rooms|
+-----+
| 10.0|
|  9.0|
| 10.0|
| 10.0|
| 10.0|
| 10.0|
| 12.0|
+-----+



In [25]:
df = df.filter(df['Rooms'] < 8)

In [26]:
from pyspark.ml.feature import StringIndexer

Suburb_indexer=StringIndexer(inputCol="Suburb",outputCol="SuburbIndex")
df=Suburb_indexer.fit(df).transform(df)

Postcode_indexer=StringIndexer(inputCol="Postcode",outputCol="PostcodeIndex")
df=Postcode_indexer.fit(df).transform(df)

Regionname_indexer=StringIndexer(inputCol="Regionname",outputCol="RegionnameIndex")
df=Regionname_indexer.fit(df).transform(df)

Distance_indexer=StringIndexer(inputCol="Distance",outputCol="DistanceIndex")
df=Distance_indexer.fit(df).transform(df)

CouncilArea_indexer=StringIndexer(inputCol="CouncilArea",outputCol="CouncilAreaIndex")
df=CouncilArea_indexer.fit(df).transform(df)

Propertycount_indexer=StringIndexer(inputCol="Propertycount",outputCol="PropertycountIndex")
df=Propertycount_indexer.fit(df).transform(df)

Year_indexer=StringIndexer(inputCol="year",outputCol="YearIndex")
df=Year_indexer.fit(df).transform(df)

Type_indexer=StringIndexer(inputCol="Type",outputCol="TypeIndex")
df=Type_indexer.fit(df).transform(df)

In [27]:
df.printSchema()

root
 |-- Suburb: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Rooms: float (nullable = false)
 |-- Type: string (nullable = true)
 |-- Price: integer (nullable = true)
 |-- Method: string (nullable = true)
 |-- SellerG: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- Postcode: string (nullable = true)
 |-- Bedroom2: float (nullable = false)
 |-- Bathroom: float (nullable = false)
 |-- Car: float (nullable = false)
 |-- Landsize: float (nullable = false)
 |-- BuildingArea: float (nullable = false)
 |-- YearBuilt: float (nullable = false)
 |-- CouncilArea: string (nullable = true)
 |-- Lattitude: float (nullable = true)
 |-- Longtitude: float (nullable = true)
 |-- Regionname: string (nullable = true)
 |-- Propertycount: string (nullable = true)
 |-- year: string (nullable = true)
 |-- SuburbIndex: double (nullable = false)
 |-- PostcodeIndex: double (nullable = false)
 |-- RegionnameIndex: double (nullabl

In [28]:
df.select('Landsize').describe().show()

+-------+-----------------+
|summary|         Landsize|
+-------+-----------------+
|  count|            26840|
|   mean|593.8671487367633|
| stddev| 3074.28424873174|
|    min|              0.0|
|    max|         433014.0|
+-------+-----------------+



In [29]:
df.select('Price').count()
df = df.na.drop(subset = "Price")

In [30]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['Lattitude', 'Longtitude', 'SuburbIndex', 'RegionnameIndex', 'CouncilAreaIndex', 'TypeIndex', 'DistanceIndex', 'PostcodeIndex', 'Rooms', 'Bedroom2', 'Bathroom', 'Car', 'Landsize', 'BuildingArea', 'YearBuilt'], outputCol = 'features',handleInvalid='skip')
vhouse_df = vectorAssembler.transform(df)
vhouse_df = vhouse_df.select(['features', 'Price'])
vhouse_df.show(3)

+--------------------+-------+
|            features|  Price|
+--------------------+-------+
|[-37.799598693847...|1480000|
|[-37.807899475097...|1035000|
|[-37.809299468994...|1465000|
+--------------------+-------+
only showing top 3 rows



In [31]:
vhouse_df = vhouse_df.withColumn("Price", vhouse_df["Price"].cast(FloatType()))
vhouse_df.show(3)

+--------------------+---------+
|            features|    Price|
+--------------------+---------+
|[-37.799598693847...|1480000.0|
|[-37.807899475097...|1035000.0|
|[-37.809299468994...|1465000.0|
+--------------------+---------+
only showing top 3 rows



In [32]:
from pyspark.ml.feature import StandardScaler
# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(vhouse_df)

# Transform the data in `df` with the scaler
scaled_df = scaler.transform(vhouse_df)

# Inspect the result
scaled_df.show(2)

+--------------------+---------+--------------------+
|            features|    Price|     features_scaled|
+--------------------+---------+--------------------+
|[-37.799598693847...|1480000.0|[-411.06892262741...|
|[-37.807899475097...|1035000.0|[-411.15919324729...|
+--------------------+---------+--------------------+
only showing top 2 rows



In [33]:
scaled_df = scaled_df.dropna()

In [34]:
scaled_df = scaled_df.withColumn("features", scaled_df["features_scaled"])
scaled_df= scaled_df.drop('features_scaled')
scaled_df.show(3)

+--------------------+---------+
|            features|    Price|
+--------------------+---------+
|[-411.06892262741...|1480000.0|
|[-411.15919324729...|1035000.0|
|[-411.17441811746...|1465000.0|
+--------------------+---------+
only showing top 3 rows



In [35]:
splits = scaled_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [36]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='Price', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-127990.19041411101,68270.13797250677,-55792.89349964961,-119917.04296540517,-96738.83845678493,-97535.73970382372,1639.661817415845,24806.639553540586,97089.67032013868,53781.27030730667,149082.89967226054,13362.967854799948,39964.024606101346,144538.6005687227,-112519.62632407655]
Intercept: -126066559.13476793


In [37]:
#predict testing data using our model
prediction = lr_model.transform(test_df)
#show some prediction results
prediction.show(3)

+--------------------+--------+------------------+
|            features|   Price|        prediction|
+--------------------+--------+------------------+
|[-415.23348466268...|860000.0| 1350710.387042299|
|[-415.14441709794...|670000.0|1489734.3064708859|
|[-415.14412670532...|563000.0|1018820.5972778797|
+--------------------+--------+------------------+
only showing top 3 rows



In [38]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 405800.242794
r2: 0.509443


In [39]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'Price')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(labelCol="Price", predictionCol="prediction", metricName="rmse")
dt_evaluator_r2 = RegressionEvaluator(labelCol="Price", predictionCol="prediction", metricName="r2")
rmse = dt_evaluator.evaluate(dt_predictions)
r2 = dt_evaluator_r2.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
print("R2 (R2) on test data = %g" % r2)

Root Mean Squared Error (RMSE) on test data = 357273
R2 (R2) on test data = 0.622282


In [40]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'Price', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'Price', 'features').show(5)

+-----------------+--------+--------------------+
|       prediction|   Price|            features|
+-----------------+--------+--------------------+
|961398.1461729957|860000.0|[-415.23348466268...|
|799920.4110186616|670000.0|[-415.14441709794...|
| 633331.938134118|563000.0|[-415.14412670532...|
|822124.5958361379|737500.0|[-415.12856995787...|
|807014.8820903505|650000.0|[-415.07546959323...|
+-----------------+--------+--------------------+
only showing top 5 rows



In [41]:
gbt_evaluator = RegressionEvaluator(labelCol="Price", predictionCol="prediction", metricName="rmse")
gbt_evaluator_r2 = RegressionEvaluator(labelCol="Price", predictionCol="prediction", metricName="r2")
rmse = gbt_evaluator.evaluate(gbt_predictions)
r2 = gbt_evaluator_r2.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
print("R2 Error (R2) on test data = %g" % r2)

Root Mean Squared Error (RMSE) on test data = 305639
R2 Error (R2) on test data = 0.72357


In [42]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator


# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(scaled_df)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = scaled_df.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol='indexedFeatures', labelCol='Price')

# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "Price", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(labelCol="Price", predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol="Price", predictionCol="prediction", metricName="r2")
rmse = evaluator.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
print("R2 Error (R2) on test data = %g" % r2)

rfModel = model.stages[1]
print(rfModel)  # summary only

+------------------+---------+--------------------+
|        prediction|    Price|            features|
+------------------+---------+--------------------+
|1238358.0367740605| 975000.0|[-415.18718778227...|
|1181854.5576047963|1350000.0|[-415.15010049634...|
|1181854.5576047963| 737500.0|[-415.12856995787...|
|1181854.5576047963| 890000.0|[-415.11550229001...|
|1181854.5576047963| 750000.0|[-415.10737129667...|
+------------------+---------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 351103
R2 Error (R2) on test data = 0.647568
RandomForestRegressionModel: uid=RandomForestRegressor_5fa816f7a655, numTrees=20, numFeatures=15
