In [30]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.4.0'
spark_version = 'spark-3.5.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
0% [Connecting to archive.ubuntu.com] [Waiting for headers] [Connected to r2u.stat.illinois.edu (192                                                                                                    Hit:2 http://security.ubuntu.com/ubuntu jammy-security InRelease
0% [Connecting to archive.ubuntu.com] [Connected to r2u.stat.illinois.edu (192.17.190.167)] [Waiting                                                                                                    Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (185.125.190.83)] [Waiting for headers] [Connecting to ppa.laun                                                                                                    Ign:4 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:5 https://r2u.stat.illinois.edu/ubuntu jammy Release
Hit:6 http://arc

In [123]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [124]:
# 1. Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"
spark.sparkContext.addFile(url)
home_df = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), sep=",", header=True, ignoreLeadingWhiteSpace=True)
home_df.show()


+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
|5aa00529-0533-46b...|2019-01-30|      2017|218712|       2|        3|       196

In [125]:
# 2. Create a temporary view of the DataFrame.

home_df.createOrReplaceTempView('home_data')

In [126]:
# 3. What is the average price for a four bedroom house sold per year, rounded to two decimal places?

avg_4_bedroom = spark.sql(
    "SELECT ROUND(AVG(price),2) AS AVG_Price, YEAR(date) from home_data where bedrooms == 4 group by YEAR(date) ORDER BY YEAR(date) desc")
avg_4_bedroom.show()

+---------+----------+
|AVG_Price|year(date)|
+---------+----------+
|296363.88|      2022|
|301819.44|      2021|
|298353.78|      2020|
| 300263.7|      2019|
+---------+----------+



In [127]:
# 4. What is the average price of a home for each year the home was built,
# that have 3 bedrooms and 3 bathrooms, rounded to two decimal places?

avg_3_bed_3_bath = spark.sql(
    "SELECT ROUND(AVG(price),2) AS AVG_Price, date_built from home_data where bedrooms == 3 AND bathrooms == 3 group by date_built ORDER BY date_built desc")
avg_3_bed_3_bath.show()


+---------+----------+
|AVG_Price|date_built|
+---------+----------+
|292676.79|      2017|
|290555.07|      2016|
| 288770.3|      2015|
|290852.27|      2014|
|295962.27|      2013|
|293683.19|      2012|
|291117.47|      2011|
|292859.62|      2010|
+---------+----------+



In [128]:
# 5. What is the average price of a home for each year the home was built,
# that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet, rounded to two decimal places?

avg_3_bed_3_bath_2_floors = spark.sql(
    "SELECT ROUND(AVG(price),2) AS AVG_Price, date_built from home_data where bedrooms == 3 AND bathrooms == 3 AND floors == 2 AND sqft_living >= 2000 group by date_built ORDER BY date_built desc")
avg_3_bed_3_bath_2_floors.show()

+---------+----------+
|AVG_Price|date_built|
+---------+----------+
|280317.58|      2017|
| 293965.1|      2016|
|297609.97|      2015|
|298264.72|      2014|
|303676.79|      2013|
|307539.97|      2012|
|276553.81|      2011|
|285010.22|      2010|
+---------+----------+



In [129]:
# 6. What is the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000? Order by descending view rating.
# Although this is a small dataset, determine the run time for this query.

start_time = time.time()

spark.sql(
    """SELECT view, ROUND(AVG(price),2) AS AVG_Price from home_data group by view having ROUND(AVG(price),2) >= 350000 ORDER BY view desc""").show()


print("--- %s seconds ---" % (time.time() - start_time))

+----+----------+
|view| AVG_Price|
+----+----------+
|  99|1061201.42|
|  98|1053739.33|
|  97|1129040.15|
|  96|1017815.92|
|  95| 1054325.6|
|  94| 1033536.2|
|  93|1026006.06|
|  92| 970402.55|
|  91|1137372.73|
|  90|1062654.16|
|  89|1107839.15|
|  88|1031719.35|
|  87| 1072285.2|
|  86|1070444.25|
|  85|1056336.74|
|  84|1117233.13|
|  83|1033965.93|
|  82| 1063498.0|
|  81|1053472.79|
|  80| 991767.38|
+----+----------+
only showing top 20 rows

--- 1.0339107513427734 seconds ---


In [130]:
# 7. Cache the the temporary table home_sales.
spark.sql("cache table home_data")

DataFrame[]

In [131]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_data')

True

In [132]:
# 9. Using the cached data, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the uncached runtime.

start_time = time.time()

spark.sql(
    """SELECT view, ROUND(AVG(price),2) AS AVG_Price from home_data group by view having ROUND(AVG(price),2) >= 350000 ORDER BY view desc""").show()


print("--- %s seconds ---" % (time.time() - start_time))


+----+----------+
|view| AVG_Price|
+----+----------+
|  99|1061201.42|
|  98|1053739.33|
|  97|1129040.15|
|  96|1017815.92|
|  95| 1054325.6|
|  94| 1033536.2|
|  93|1026006.06|
|  92| 970402.55|
|  91|1137372.73|
|  90|1062654.16|
|  89|1107839.15|
|  88|1031719.35|
|  87| 1072285.2|
|  86|1070444.25|
|  85|1056336.74|
|  84|1117233.13|
|  83|1033965.93|
|  82| 1063498.0|
|  81|1053472.79|
|  80| 991767.38|
+----+----------+
only showing top 20 rows

--- 0.2525360584259033 seconds ---


In [133]:
print("the cached runtime of 0.25 seconds was shorter than the original runtime of 1.03 seconds")

the cached runtime of 0.25 seconds was shorter than the original runtime of 1.03 seconds


In [134]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data
home_df.write.partitionBy('date_built').parquet('p_home_sales',mode='overwrite')

In [135]:
# 11. Read the parquet formatted data.
p_homes_df = spark.read.parquet('p_home_sales')

In [136]:
# 12. Create a temporary table for the parquet data.
p_homes_df.createOrReplaceTempView('parquet_sales')

In [137]:
# 13. Using the parquet DataFrame, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the cached runtime.

start_time = time.time()

spark.sql(
    """SELECT view, ROUND(AVG(price),2) AS AVG_Price from home_data group by view having ROUND(AVG(price),2) >= 350000 ORDER BY view desc""").show()


print("--- %s seconds ---" % (time.time() - start_time))

+----+----------+
|view| AVG_Price|
+----+----------+
|  99|1061201.42|
|  98|1053739.33|
|  97|1129040.15|
|  96|1017815.92|
|  95| 1054325.6|
|  94| 1033536.2|
|  93|1026006.06|
|  92| 970402.55|
|  91|1137372.73|
|  90|1062654.16|
|  89|1107839.15|
|  88|1031719.35|
|  87| 1072285.2|
|  86|1070444.25|
|  85|1056336.74|
|  84|1117233.13|
|  83|1033965.93|
|  82| 1063498.0|
|  81|1053472.79|
|  80| 991767.38|
+----+----------+
only showing top 20 rows

--- 0.41431617736816406 seconds ---


In [138]:
print("the parquet runtime of 0.41 seconds was shorter than the original runtime of 1.03 seconds, but slightly longer than the cahced version of 0.25 seconds")

the parquet runtime of 0.41 seconds was shorter than the original runtime of 1.03 seconds, but slightly longer than the cahced version of 0.25 seconds


In [139]:
# 14. Uncache the home_sales temporary table.
spark.sql("uncache table home_data")

DataFrame[]

In [140]:
# 15. Check if the home_sales is no longer cached

spark.catalog.isCached('home_data')

False