In [4]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.4.0'
spark_version = 'spark-3.5.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.91.81)] [Waiting for headers] [Waiting for headers] [Co                                                                                                    Hit:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
0% [Connecting to archive.ubuntu.com (91.189.91.81)] [Waiting for headers] [Connecting to ppa.launch                                                                                                    Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Waiting for headers] [Waiting for headers] [Connecting to ppa.launchpadcontent.net (185.125.190.                                                                                                    Hit:4 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
0% [Waiting for headers] [Waiting for headers] [Connecting to ppa.launchpadcon

In [5]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [6]:
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"


In [25]:
# 2. Create a temporary view of the DataFrame.
from pyspark import SparkFiles
spark.sparkContext.addFile(url)
home_df = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), sep=",", header=True)
home_df = home_df.drop_duplicates()
_home_df = home_df.dropna()

# Show DataFrame
home_df.show()
home_df.createOrReplaceTempView('home_table')

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|e81aacfe-17fe-46b...|2020-06-16|      2016|181925|       3|        3|       2137|   11709|     2|         0|  22|
|7c8448c9-d6a7-46a...|2021-01-25|      2016|137482|       2|        2|       2353|   11308|     2|         0|  49|
|0e73ff99-fa2d-4af...|2019-02-23|      2010|386063|       4|        2|       2083|   13164|     1|         0|  42|
|f884e205-b21f-42d...|2020-05-23|      2013|236907|       4|        3|       1726|    8960|     1|         0|  21|
|89e01d5c-5b2d-438...|2022-04-06|      2012|311927|       4|        2|       1644|   10745|     1|         0|   6|
|7f4360fb-77cc-4c6...|2021-11-01|      2017|290236|       4|        2|       156

In [26]:
# 3. What is the average price for a four bedroom house sold in each year rounded to two decimal places?
avg4bed = spark.sql("SELECT ROUND(AVG(price),2), YEAR(date) from home_table where bedrooms == 4 group by YEAR(date) ORDER BY YEAR(date) desc")
avg4bed.show()

+--------------------+----------+
|round(avg(price), 2)|year(date)|
+--------------------+----------+
|           296363.88|      2022|
|           301819.44|      2021|
|           298353.78|      2020|
|            300263.7|      2019|
+--------------------+----------+



In [27]:
# 4. What is the average price of a home for each year the home was built that have 3 bedrooms and 3 bathrooms rounded to two decimal places?
avg3bed3bath = spark.sql("SELECT ROUND(AVG(price),2), date_built from home_table where bedrooms == 3 AND bathrooms == 3 group by date_built ORDER BY date_built desc")
avg3bed3bath.show()

+--------------------+----------+
|round(avg(price), 2)|date_built|
+--------------------+----------+
|           292676.79|      2017|
|           290555.07|      2016|
|            288770.3|      2015|
|           290852.27|      2014|
|           295962.27|      2013|
|           293683.19|      2012|
|           291117.47|      2011|
|           292859.62|      2010|
+--------------------+----------+



In [29]:
# 5. What is the average price of a home for each year built that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet rounded to two decimal places?
avg3bed3bath2floor = spark.sql("SELECT ROUND(AVG(price),2), date_built from home_table where bedrooms == 3 AND bathrooms == 3 AND floors == 2 AND sqft_living >= 2000 group by date_built ORDER BY date_built asc")
avg3bed3bath2floor.show()

+--------------------+----------+
|round(avg(price), 2)|date_built|
+--------------------+----------+
|           285010.22|      2010|
|           276553.81|      2011|
|           307539.97|      2012|
|           303676.79|      2013|
|           298264.72|      2014|
|           297609.97|      2015|
|            293965.1|      2016|
|           280317.58|      2017|
+--------------------+----------+



In [30]:
# 6. What is the "view" rating for the average price of a home, rounded to two decimal places, where the homes are greater than
# or equal to $350,000? Although this is a small dataset, determine the run time for this query.

start_time = time.time()

spark.sql("SELECT view, ROUND(AVG(price),2) from home_table group by view having ROUND(AVG(price),2) >= 350000").show()

print("--- %s seconds ---" % (time.time() - start_time))

+----+--------------------+
|view|round(avg(price), 2)|
+----+--------------------+
|  87|           1072285.2|
|  52|           733780.26|
|  97|          1129040.15|
|  76|          1058802.78|
|  80|           991767.38|
|  57|            734340.5|
|  61|           746877.59|
|  75|          1114042.94|
|  53|            755214.8|
|  86|          1070444.25|
|  89|          1107839.15|
|  90|          1062654.16|
|  92|           970402.55|
|  84|          1117233.13|
|  85|          1056336.74|
|  98|          1053739.33|
|  70|           695865.58|
|  58|           759764.65|
|  94|           1033536.2|
|  59|            791453.0|
+----+--------------------+
only showing top 20 rows

--- 6.997702598571777 seconds ---


In [35]:
# 7. Cache the the temporary table home_data.
spark.sql("cache table home_table")

DataFrame[]

In [36]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_table')

True

In [37]:
# 9. Using the cached data, run the query that filters out the view ratings with average price
#  greater than or equal to $350,000. Determine the runtime and compare it to uncached runtime.
start_time = time.time()

spark.sql("SELECT view, ROUND(AVG(price),2) from home_table group by view having ROUND(AVG(price),2) < 350000").show()

print("--- %s seconds ---" % (time.time() - start_time))

+----+--------------------+
|view|round(avg(price), 2)|
+----+--------------------+
|   7|           288929.09|
|  15|           284907.04|
|  11|           280356.07|
|  29|           283881.72|
|  42|           289225.45|
|   3|           284314.53|
|  30|           281085.62|
|  34|           286124.07|
|   8|           279099.78|
|  22|           284908.42|
|  28|           285474.25|
|  35|           281767.41|
|  16|           291990.83|
|   0|           285069.21|
|  47|           292925.62|
|  43|           282606.92|
|   5|           278096.94|
|  31|           287988.84|
|  18|           287532.36|
|  27|            281702.6|
+----+--------------------+
only showing top 20 rows

--- 6.837559700012207 seconds ---


In [38]:
#determine the runtime and compare to the original runtime
print("the cached runtime of 6.83 seconds was shorter than the original runtime of 6.99 seconds")

the cached runtime of 6.83 seconds was shorter than the original runtime of 6.99 seconds


In [39]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data
home_df.write.parquet('home_parquet', mode='overwrite')

In [40]:
# 11. Read the parquet formatted data.
parquet_home_df = spark.read.parquet('home_parquet')

In [42]:
# 12. Create a temporary table for the parquet data.
parquet_home_df.createOrReplaceTempView('parquet_temp_homesales')

In [43]:
# 13. Run the query that filters out the view ratings with average price of greater than or equal to $350,000
# with the parquet DataFrame. Round your average to two decimal places.


start_time = time.time()

spark.sql("SELECT view, ROUND(AVG(price),2) from parquet_temp_homesales group by view having ROUND(AVG(price),2) < 350000").show()

print("--- %s seconds ---" % (time.time() - start_time))

+----+--------------------+
|view|round(avg(price), 2)|
+----+--------------------+
|   7|           288929.09|
|  15|           284907.04|
|  11|           280356.07|
|  29|           283881.72|
|  42|           289225.45|
|   3|           284314.53|
|  30|           281085.62|
|  34|           286124.07|
|   8|           279099.78|
|  22|           284908.42|
|  28|           285474.25|
|  35|           281767.41|
|  16|           291990.83|
|   0|           285069.21|
|  47|           292925.62|
|  43|           282606.92|
|   5|           278096.94|
|  31|           287988.84|
|  18|           287532.36|
|  27|            281702.6|
+----+--------------------+
only showing top 20 rows

--- 1.798896074295044 seconds ---


In [44]:
# Determine the runtime and compare it to the cached version.
print("the parquet runtime of 1.79 seconds is significantly shorter than the cached runtime of 6.83 seconds")

the parquet runtime of 1.79 seconds is significantly shorter than the cached runtime of 6.83 seconds


In [45]:
# 14. Uncache the home_sales temporary table.
spark.sql("uncache table home_data")

DataFrame[]

In [46]:
# 15. Check if the home_sales is no longer cached
spark.catalog.isCached('home_data')

False