In [1]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.3.1'
spark_version = 'spark-3.3.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu focal InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.36)] [                                                                               Get:2 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
                                                                               Get:3 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease [3,622 B]
                                                                               Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
0% [2 InRelease 21.4 kB/114 kB 19%] [Waiting for headers] [Waiting for headers]                                                                               Get:5 http://security.ubuntu.com/ubuntu focal-security InRelease [114 kB]
0% [2 InRelease 59.1 kB/114 kB 52%] [5 InRelease 14.2 kB/114 kB 12%] [Waiting f                              

In [101]:
# enable files acess in google drive
from google.colab import drive
drive.mount("/content/drive")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [6]:
# Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles
url = "https://raw.githubusercontent.com/wingylui/housing_price_anaylsis/main/Dataset/perth_housing_price_2021.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("perth_housing_price_2021.csv"), header=True, inferSchema=True, quote="\"", escape="\"")

df.show()

+--------------------+------------+------+--------+---------+------+---------+----------+----------+--------+--------------------+----------------+----------+---------+--------+------------+-----------+--------------------+----------------+----------------+
|             ADDRESS|      SUBURB| PRICE|BEDROOMS|BATHROOMS|GARAGE|LAND_AREA|FLOOR_AREA|BUILD_YEAR|CBD_DIST|         NEAREST_STN|NEAREST_STN_DIST|MONTH_SOLD|YEAR_SOLD|POSTCODE|    LATITUDE|  LONGITUDE|         NEAREST_SCH|NEAREST_SCH_DIST|NEAREST_SCH_RANK|
+--------------------+------------+------+--------+---------+------+---------+----------+----------+--------+--------------------+----------------+----------+---------+--------+------------+-----------+--------------------+----------------+----------------+
|       1 Acorn Place|  South Lake|565000|       4|        2|     2|      600|       160|      2003|   18300|Cockburn Central ...|            1800|         9|     2018|    6164|    -32.1159|  115.84245|Lakeland Senior H...|   

In [8]:
# ensure correct format for each column 
df.printSchema()

root
 |-- ADDRESS: string (nullable = true)
 |-- SUBURB: string (nullable = true)
 |-- PRICE: integer (nullable = true)
 |-- BEDROOMS: integer (nullable = true)
 |-- BATHROOMS: integer (nullable = true)
 |-- GARAGE: integer (nullable = true)
 |-- LAND_AREA: integer (nullable = true)
 |-- FLOOR_AREA: integer (nullable = true)
 |-- BUILD_YEAR: integer (nullable = true)
 |-- CBD_DIST: integer (nullable = true)
 |-- NEAREST_STN: string (nullable = true)
 |-- NEAREST_STN_DIST: integer (nullable = true)
 |-- MONTH_SOLD: integer (nullable = true)
 |-- YEAR_SOLD: integer (nullable = true)
 |-- POSTCODE: integer (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- NEAREST_SCH: string (nullable = true)
 |-- NEAREST_SCH_DIST: double (nullable = true)
 |-- NEAREST_SCH_RANK: integer (nullable = true)



In [42]:
# Create a temporary view of the DataFrame.
df.createOrReplaceTempView("perthHousingPrice")

In [76]:
# ensure it is the correct df 
spark.sql("SELECT * FROM perthHousingPrice LIMIT 3").show()

+---------------+----------+------+--------+---------+------+---------+----------+----------+--------+--------------------+----------------+----------+---------+--------+------------+-----------+--------------------+----------------+----------------+
|        ADDRESS|    SUBURB| PRICE|BEDROOMS|BATHROOMS|GARAGE|LAND_AREA|FLOOR_AREA|BUILD_YEAR|CBD_DIST|         NEAREST_STN|NEAREST_STN_DIST|MONTH_SOLD|YEAR_SOLD|POSTCODE|    LATITUDE|  LONGITUDE|         NEAREST_SCH|NEAREST_SCH_DIST|NEAREST_SCH_RANK|
+---------------+----------+------+--------+---------+------+---------+----------+----------+--------+--------------------+----------------+----------+---------+--------+------------+-----------+--------------------+----------------+----------------+
|  1 Acorn Place|South Lake|565000|       4|        2|     2|      600|       160|      2003|   18300|Cockburn Central ...|            1800|         9|     2018|    6164|    -32.1159|  115.84245|Lakeland Senior H...|    0.8283385518|            nu

## Collect data for Machine Learning (Kmeans)
---

In [52]:
# understand the amount of house sold in each year
spark.sql("SELECT YEAR_SOLD, ROUND(COUNT(YEAR_SOLD)/(SELECT COUNT(YEAR_SOLD) FROM perthHousingPrice)*100, 2) AS percentOfHouseSold\
          FROM perthHousingPrice\
          GROUP BY YEAR_SOLD\
          ORDER BY percentOfHouseSold DESC").show()

+---------+------------------+
|YEAR_SOLD|percentOfHouseSold|
+---------+------------------+
|     2018|             16.69|
|     2019|             15.71|
|     2020|             15.63|
|     2017|             14.39|
|     2016|             11.59|
|     2015|              9.06|
|     2014|              5.79|
|     2013|               3.5|
|     2012|              1.83|
|     2011|              1.08|
|     2010|              0.81|
|     2009|              0.72|
|     2007|              0.42|
|     2008|              0.42|
|     2006|               0.4|
|     2005|              0.34|
|     2003|              0.26|
|     2002|              0.25|
|     2004|              0.22|
|     2001|              0.15|
+---------+------------------+
only showing top 20 rows



In [79]:
# query the required columns for machine learning (kmeans)
ML_df = spark.sql("SELECT SUBURB, PRICE, LAND_AREA, FLOOR_AREA, BUILD_YEAR, YEAR_SOLD, CBD_DIST, NEAREST_STN_DIST, NEAREST_SCH_DIST\
                  FROM perthHousingPrice\
                  WHERE YEAR_SOLD >= 2013")
ML_df.show(10)

+------------+------+---------+----------+----------+---------+--------+----------------+----------------+
|      SUBURB| PRICE|LAND_AREA|FLOOR_AREA|BUILD_YEAR|YEAR_SOLD|CBD_DIST|NEAREST_STN_DIST|NEAREST_SCH_DIST|
+------------+------+---------+----------+----------+---------+--------+----------------+----------------+
|  South Lake|565000|      600|       160|      2003|     2018|   18300|            1800|    0.8283385518|
|       Wandi|365000|      351|       139|      2013|     2019|   26900|            4900|     5.524324354|
|     Camillo|287000|      719|        86|      1979|     2015|   22600|            1900|     1.649178183|
|    Bellevue|255000|      651|        59|      1953|     2018|   17900|            3600|     1.571400937|
|   Lockridge|325000|      466|       131|      1998|     2016|   11200|            2000|     1.514921629|
|Mount Richon|409000|      759|       118|      1991|     2013|   27300|            1000|     1.227219194|
|     Hilbert|400000|      386|      

In [80]:
# calculate property age and then drop columns (build year and year sold)
ML_df = ML_df.withColumn("PROPERTY_AGE", ML_df["YEAR_SOLD"] - ML_df["BUILD_YEAR"]).drop("BUILD_YEAR", "YEAR_SOLD")

ML_df.show(10)

+------------+------+---------+----------+--------+----------------+----------------+------------+
|      SUBURB| PRICE|LAND_AREA|FLOOR_AREA|CBD_DIST|NEAREST_STN_DIST|NEAREST_SCH_DIST|PROPERTY_AGE|
+------------+------+---------+----------+--------+----------------+----------------+------------+
|  South Lake|565000|      600|       160|   18300|            1800|    0.8283385518|          15|
|       Wandi|365000|      351|       139|   26900|            4900|     5.524324354|           6|
|     Camillo|287000|      719|        86|   22600|            1900|     1.649178183|          36|
|    Bellevue|255000|      651|        59|   17900|            3600|     1.571400937|          65|
|   Lockridge|325000|      466|       131|   11200|            2000|     1.514921629|          18|
|Mount Richon|409000|      759|       118|   27300|            1000|     1.227219194|          22|
|     Hilbert|400000|      386|       132|   28200|            3700|     2.485730857|           2|
|     Waik

In [103]:
# save file into google drive
ML_df.write.option("header", True).csv("/content/drive/MyDrive/Colab Notebooks/output_csv/Perth_housing_ML.csv")

## Collect data for Tableau Data Visualisation
---

In [83]:
# query the required columns for Data Visualisation
visual_df = spark.sql("SELECT SUBURB, PRICE, LAND_AREA, FLOOR_AREA, BUILD_YEAR, YEAR_SOLD\
                      FROM perthHousingPrice\
                      WHERE YEAR_SOLD >= 2013")
visual_df.show(10)

+------------+------+---------+----------+----------+---------+
|      SUBURB| PRICE|LAND_AREA|FLOOR_AREA|BUILD_YEAR|YEAR_SOLD|
+------------+------+---------+----------+----------+---------+
|  South Lake|565000|      600|       160|      2003|     2018|
|       Wandi|365000|      351|       139|      2013|     2019|
|     Camillo|287000|      719|        86|      1979|     2015|
|    Bellevue|255000|      651|        59|      1953|     2018|
|   Lockridge|325000|      466|       131|      1998|     2016|
|Mount Richon|409000|      759|       118|      1991|     2013|
|     Hilbert|400000|      386|       132|      2014|     2016|
|     Waikiki|370000|      468|       158|      2013|     2019|
|   Hamersley|565000|      875|       168|      1983|     2020|
|   Bayswater|685000|      552|       126|      1999|     2019|
+------------+------+---------+----------+----------+---------+
only showing top 10 rows



In [84]:
# calculate property age and then drop columns (build year and year sold)
visual_df = visual_df.withColumn("PROPERTY_AGE", visual_df["YEAR_SOLD"] - visual_df["BUILD_YEAR"]).drop("BUILD_YEAR", "YEAR_SOLD")

visual_df.show(10)

+------------+------+---------+----------+------------+
|      SUBURB| PRICE|LAND_AREA|FLOOR_AREA|PROPERTY_AGE|
+------------+------+---------+----------+------------+
|  South Lake|565000|      600|       160|          15|
|       Wandi|365000|      351|       139|           6|
|     Camillo|287000|      719|        86|          36|
|    Bellevue|255000|      651|        59|          65|
|   Lockridge|325000|      466|       131|          18|
|Mount Richon|409000|      759|       118|          22|
|     Hilbert|400000|      386|       132|           2|
|     Waikiki|370000|      468|       158|           6|
|   Hamersley|565000|      875|       168|          37|
|   Bayswater|685000|      552|       126|          20|
+------------+------+---------+----------+------------+
only showing top 10 rows



In [85]:
# calculate price/floor (price per square meter)
visual_df = visual_df.withColumn("PRICE/FLOOR($/m2)", visual_df["PRICE"]/visual_df["FLOOR_AREA"])

# calculate price/floor (price per square meter)
visual_df = visual_df.withColumn("PRICE/LAND($/m2)", visual_df["PRICE"]/visual_df["LAND_AREA"])

# calculate floor/land
visual_df = visual_df.withColumn("Land/Floor", visual_df["LAND_AREA"]/visual_df["FLOOR_AREA"])

visual_df.show(10)

+------------+------+---------+----------+------------+------------------+------------------+------------------+
|      SUBURB| PRICE|LAND_AREA|FLOOR_AREA|PROPERTY_AGE| PRICE/FLOOR($/m2)|  PRICE/LAND($/m2)|        Land/Floor|
+------------+------+---------+----------+------------+------------------+------------------+------------------+
|  South Lake|565000|      600|       160|          15|           3531.25| 941.6666666666666|              3.75|
|       Wandi|365000|      351|       139|           6|2625.8992805755397|  1039.88603988604|2.5251798561151078|
|     Camillo|287000|      719|        86|          36|3337.2093023255816| 399.1655076495132|  8.36046511627907|
|    Bellevue|255000|      651|        59|          65| 4322.033898305085|391.70506912442397|11.033898305084746|
|   Lockridge|325000|      466|       131|          18| 2480.916030534351| 697.4248927038626|3.5572519083969465|
|Mount Richon|409000|      759|       118|          22| 3466.101694915254|  538.866930171278| 6.

In [102]:
# save file into google drive
visual_df.write.option("header", True).csv("/content/drive/MyDrive/Colab Notebooks/output_csv/Perth_housing_visual.csv")