In [1]:
# Sets up the Apache Spark environment by importing the necessary libraries and installations:

import os                                                                                                         # Enables interaction with the operating system


# Defines the latest version of Apache Spark version (currently 3.4.0) and sets it up in the operating system's environment variables
# The latest version  of Apache Spark can be found on http://www.apache.org/dist/spark/
spark_version = 'spark-3.4.0'
os.environ['SPARK_VERSION']=spark_version

# Installs Apache Spark and Java (Apache Spark requires Java to run)
# The exclamation mark '!' indicates that the line is ran in the terminal environment
!apt-get update                                                                                                   # Ensures the package manager has the latest information about the available software packages
!apt-get install openjdk-11-jdk-headless -qq > /dev/null                                                          # Installs the OpenJDK 11 which is a Java Development kit
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz                           # Visits the Apache website and gets the tgz file that contains the compressed Spark packaged for Hadoop
!tar xf $SPARK_VERSION-bin-hadoop3.tgz                                                                            # Unpacks Spark into the current directory
!pip install -q findspark                                                                                         # Installs findspark which enables the Python environment to work with the installed Spark version (3.4.0)

# Sets up environment Variables for Java and Apache Spark installations (pointing to where the installation is located)
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Starts a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
0% [Connecting to archive.ubuntu.com] [1 InRelease 14.2 kB/110 kB 13%] [Connect                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
0% [Waiting for headers] [1 InRelease 67.7 kB/110 kB 61%] [2 InRelease 0 B/3,620% [Waiting for headers] [1 InRelease 90.9 kB/110 kB 82%] [Connected to ppa.lau                                                                               Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
                                                                               Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Get:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [109 kB]
Hit:7 https://ppa.launchpadcontent.net

In [2]:
# Imports libraries and dependencies required
from pyspark.sql import SparkSession                                                                              # Enables the creation and management of Spark sessions where the session acts as a end point to work with the Apache Spark API
import time                                                                                                       # Enables time related operations in Python
from pyspark import SparkFiles                                                                                    # Enables accessability to distributed files

# Sets up a SparkSession in a PySpark application
# Configurations include showing up to 2000 fields and increasing the default driver memory from 1g to 2g to avoid crashing due to the dataset's size
spark = SparkSession.builder \
    .appName("SparkSQL") \
    .config("spark.sql.debug.maxToStringFields", 2000) \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

In [3]:
# 1. Reads in the AWS S3 bucket into a home sales DataFrame.

url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv" # Location of home sales revised CSV file which is within an AWS S3 bucket in the cloud

spark.sparkContext.addFile(url)                                                                                    # Adds the file locally to all the nodes in the Spark cluster used to run the script

home_sales_df = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), sep= ",", header=True,                    # Reads the home sales revised CSV and loads it into a DataFrame; seperates values by a comma delimiter and recognises that the data has a header record included
                                                                                   ignoreLeadingWhiteSpace=True,   # Removes white space before values
                                                                                   inferSchema=True,               # Observes data and interprets their data types
                                                                                   timestampFormat="yyyy/MM/dd")   # Expected format of dates in the file

# The date_built column is inferred to be an integer data type which is not correct in this context, if this field is converted to a date format 'yyyy' the appearance of this column changes to a format of "20XX-01-01"
# Therefore to avoid misinterpreation of the data in this field, it is changed back to a string data type
home_sales_df = home_sales_df.withColumn("date_built", home_sales_df["date_built"].cast("string"))


# Renames the date and date_built columns to more applicable names
home_sales_df = home_sales_df.withColumnRenamed("date", "date_sold")
home_sales_df = home_sales_df.withColumnRenamed("date_built", "year_built")


# Shows the home sales DataFrame ensuring that all data is visible within each field
home_sales_df.show(truncate=False)

# Prints out a list of the data types in the home sales DataFrame
print("Data types within home_sales_df:\n", home_sales_df.dtypes)

+------------------------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|id                                  |date_sold |year_built|price |bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+------------------------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d6-9c31-7398aa8f6089|2022-04-08|2016      |936923|4       |3        |3167       |11733   |2     |1         |76  |
|7530a2d8-1ae3-4517-9f4a-befe060c4353|2021-06-13|2013      |379628|2       |2        |2235       |14384   |1     |0         |23  |
|43de979c-0bf0-4c9f-85ef-96dc27b258d5|2019-04-12|2014      |417866|2       |2        |2127       |10575   |2     |0         |0   |
|b672c137-b88c-48bf-9f18-d0a4ac62fb8b|2019-10-16|2016      |239895|2       |2        |1631       |11149   |2     |0         |0   |
|e0726d4d-d595-4074-8283-4139a54d0d63|2022-01-08|2017      |424418|3       |2      

In [4]:
# 2. Creates a local temporary view of the DataFrame.
# This lazily loads the spark table into memory and enables us to query the data in Apache Spark using SQL languages

home_sales_df.createOrReplaceTempView('home_sales')

In this script, Apache SparkSQL queries will be used to achieve the desired outputs. It should also be acknowledged that it is also possible to use the Apache PySpark DataFrame operations such as filter, groupBy, and agg to achieve the same results however, SparkSQL is preferred as it will utilise faster performance due to being queried against a temporary table which is lazily loaded.

In [5]:
# 3. Returns the average price for a four bedroom house sold in each year rounded to two decimal places and formatted as a currency

query = """
SELECT
  CONCAT('$', FORMAT_NUMBER(AVG(price), 2)) AS `Average Price`,
  YEAR(date_sold) AS `Year Sold`
FROM
  home_sales
WHERE
  bedrooms = 4
GROUP BY
  `Year Sold`
ORDER BY
  `Year Sold`
"""

spark.sql(query).show()

+-------------+---------+
|Average Price|Year Sold|
+-------------+---------+
|  $300,263.70|     2019|
|  $298,353.78|     2020|
|  $301,819.44|     2021|
|  $296,363.88|     2022|
+-------------+---------+



In [6]:
# 4. Returns the average price of a home for each year the home was built that has 3 bedrooms and 3 bathrooms rounded to two decimal places and formatted as a currency

query = """
SELECT
  CONCAT('$', FORMAT_NUMBER(AVG(price), 2)) AS `Average Price`,
  year_built AS `Year Built`
FROM
  home_sales
WHERE
  bedrooms = 3 AND bathrooms = 3
GROUP BY
  `Year Built`
ORDER BY
  `Year Built`
"""
spark.sql(query).show()


+-------------+----------+
|Average Price|Year Built|
+-------------+----------+
|  $292,859.62|      2010|
|  $291,117.47|      2011|
|  $293,683.19|      2012|
|  $295,962.27|      2013|
|  $290,852.27|      2014|
|  $288,770.30|      2015|
|  $290,555.07|      2016|
|  $292,676.79|      2017|
+-------------+----------+



In [7]:
# 5. Returns the average price of a home for each year built that have 3 bedrooms, 3 bathrooms, with two floors, and are greater than or equal to 2,000 square feet rounded to two decimal places and formatted as a currency

query = """
SELECT
  CONCAT('$', FORMAT_NUMBER(AVG(price), 2)) AS `Average Price`,
  year_built AS `Year Built`
FROM
  home_sales
WHERE
  bedrooms = 3 AND
  bathrooms = 3 AND
  floors = 2 AND
  sqft_living >= 2000
GROUP BY
  `Year Built`
ORDER BY
  `Year Built`
"""
spark.sql(query).show()

+-------------+----------+
|Average Price|Year Built|
+-------------+----------+
|  $285,010.22|      2010|
|  $276,553.81|      2011|
|  $307,539.97|      2012|
|  $303,676.79|      2013|
|  $298,264.72|      2014|
|  $297,609.97|      2015|
|  $293,965.10|      2016|
|  $280,317.58|      2017|
+-------------+----------+



In [8]:
# 6. Returns the "view" rating for the average price of a home which is rounded to two decimal places and formatted as a currency, where the homes are greater than or equal to $350,000
# The runtime for the query is also output

start_time = time.time()

query = """
SELECT
  CONCAT('$', FORMAT_NUMBER(AVG(price), 2)) AS `Average Price`,
  View AS `View`
FROM
  home_sales
WHERE
  price >= 350000
GROUP BY
  `View`
ORDER BY
  `View`
"""
spark.sql(query).show()


print("--- %s seconds ---" % (time.time() - start_time))

+-------------+----+
|Average Price|View|
+-------------+----+
|  $403,848.51|   0|
|  $401,044.25|   1|
|  $397,389.25|   2|
|  $398,867.60|   3|
|  $399,631.89|   4|
|  $401,471.82|   5|
|  $395,655.38|   6|
|  $403,005.77|   7|
|  $398,592.71|   8|
|  $401,393.34|   9|
|  $401,868.43|  10|
|  $399,548.12|  11|
|  $401,501.32|  12|
|  $398,917.98|  13|
|  $398,570.03|  14|
|  $404,673.30|  15|
|  $399,586.53|  16|
|  $398,474.49|  17|
|  $399,332.91|  18|
|  $398,953.17|  19|
+-------------+----+
only showing top 20 rows

--- 0.9378397464752197 seconds ---


In [9]:
# 7. Caches the the temporary table home_sales
spark.sql("cache table home_sales")

DataFrame[]

In [10]:
# 8. Checks if the temporary table home_sales is cached
spark.catalog.isCached('home_sales')

True

In [11]:
# 9. Using the cached data, returns the "view" rating for the average price of a home which is rounded to two decimal places and formatted as a currency, where the homes are greater than or equal to $350,000
# The runtime for the query is also output

start_time = time.time()

query = """
SELECT
  CONCAT('$', FORMAT_NUMBER(AVG(price), 2)) AS `Average Price`,
  view AS `View`
FROM
  home_sales
WHERE
  price >= 350000
GROUP BY
  `View`
ORDER BY
  `View`
"""
spark.sql(query).show()

print("--- %s seconds ---" % (time.time() - start_time))
print("\nWhen the temporary table home sales is cached, the run time of the same query executes faster compared to when uncached in step 6. This is because the data is small enough to fit into memory and will be called frequently.")


+-------------+----+
|Average Price|View|
+-------------+----+
|  $403,848.51|   0|
|  $401,044.25|   1|
|  $397,389.25|   2|
|  $398,867.60|   3|
|  $399,631.89|   4|
|  $401,471.82|   5|
|  $395,655.38|   6|
|  $403,005.77|   7|
|  $398,592.71|   8|
|  $401,393.34|   9|
|  $401,868.43|  10|
|  $399,548.12|  11|
|  $401,501.32|  12|
|  $398,917.98|  13|
|  $398,570.03|  14|
|  $404,673.30|  15|
|  $399,586.53|  16|
|  $398,474.49|  17|
|  $399,332.91|  18|
|  $398,953.17|  19|
+-------------+----+
only showing top 20 rows

--- 0.5915303230285645 seconds ---

When the temporary table home sales is cached, the run time of the same query executes faster compared to when uncached in step 6. This is because the data is small enough to fit into memory and will be called frequently.


In [12]:
# 10. Partitions by the "year_built" field on the formatted parquet home sales data
home_sales_df.write.partitionBy('year_built').mode("overwrite").parquet("home_sales_partitioned")

In [13]:
# 11. Reads the parquet formatted data
p_home_sales_df_p = spark.read.parquet('home_sales_partitioned')

In [14]:
# 12. Creates a local temporary table for the partitioned parquet data
# This lazily loads the spark table into memory and enables us to query the data in Apache Spark using SQL languages
p_home_sales_df_p.createOrReplaceTempView('p_home_sales_p')

In [15]:
# 13. Using the parquet partioned p_home_sales_p temporary view, a query is ran which  returns the "view" rating for the average price of a home which is rounded to two decimal places and formatted as a currency, where the homes are greater than or equal to $350,000
# The runtime for the query is also output

start_time = time.time()

query = """
SELECT
  CONCAT('$', FORMAT_NUMBER(AVG(price), 2)) AS `Average Price`,
  view AS `View`
FROM
  p_home_sales_p
WHERE
  price >= 350000
GROUP BY
  `View`
ORDER BY
  `View`
"""
spark.sql(query).show()

print("--- %s seconds ---" % (time.time() - start_time))
print("\nWhen the home sales data is formatted to parquet and partioned by the year built, the run time of the same query executes slower compared to when ran on the cached, unpartioned, and unformatted data in step 9. \nThis is because parquet works best with large datasets such as terabytes/ petabytes and the partioning column, date built could have caused more data shuffling due to this field not being aggregated by in the query or it could have beeen distributed unequally.\nAs this data set is smaller, caching seems to optimise queries the best.")


+-------------+----+
|Average Price|View|
+-------------+----+
|  $403,848.51|   0|
|  $401,044.25|   1|
|  $397,389.25|   2|
|  $398,867.60|   3|
|  $399,631.89|   4|
|  $401,471.82|   5|
|  $395,655.38|   6|
|  $403,005.77|   7|
|  $398,592.71|   8|
|  $401,393.34|   9|
|  $401,868.43|  10|
|  $399,548.12|  11|
|  $401,501.32|  12|
|  $398,917.98|  13|
|  $398,570.03|  14|
|  $404,673.30|  15|
|  $399,586.53|  16|
|  $398,474.49|  17|
|  $399,332.91|  18|
|  $398,953.17|  19|
+-------------+----+
only showing top 20 rows

--- 1.2631134986877441 seconds ---

When the home sales data is formatted to parquet and partioned by the year built, the run time of the same query executes slower compared to when ran on the cached, unpartioned, and unformatted data in step 9. 
This is because parquet works best with large datasets such as terabytes/ petabytes and the partioning column, date built could have caused more data shuffling due to this field not being aggregated by in the query or it co

In [16]:
# 14. Uncache the home_sales temporary table.
spark.sql("uncache table home_sales")

DataFrame[]

In [17]:
# 15. Check if the home_sales temporary table is no longer cached
spark.catalog.isCached("home_sales")

False