In [1]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.5.2'
spark_version = 'spark-3.5.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:3 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:5 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Ign:7 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Get:8 https://r2u.stat.illinois.edu/ubuntu jammy Release [5,713 B]
Get:9 https://r2u.stat.illinois.edu/ubuntu jammy Release.gpg [793 B]
Get:10 http://archive.ubuntu.com/ubuntu jammy-updates/restricted amd64 Packages [3,045 kB]
Get:11 http://archive.ubuntu.com/ubuntu jammy-updates/universe amd64 Packages [1,438 kB]
Get:12 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 Packages [2,499 kB]
Get:13 http://archive.ubuntu.com/ubuntu jammy-updates/multiverse amd64 Packages [51.8 k

In [2]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [3]:
# 1. Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"



In [4]:
# 2. Create a temporary view of the DataFrame.

spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), sep = ",", header = True)
#df.show()

df.createOrReplaceTempView('home_sales')
df.printSchema()


root
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- date_built: string (nullable = true)
 |-- price: string (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- bathrooms: string (nullable = true)
 |-- sqft_living: string (nullable = true)
 |-- sqft_lot: string (nullable = true)
 |-- floors: string (nullable = true)
 |-- waterfront: string (nullable = true)
 |-- view: string (nullable = true)



In [6]:
# 3. What is the average price for a four bedroom house sold per year, rounded to two decimal places?

#df.createOrReplaceTempView('home_sales')
#df.printSchema()

spark.sql("""
select date,cast(AVG(price) as numeric (9,2)) as avgPrice
from home_sales
where bedrooms == 4
group by date
order by date DESC
"""
).show()

+----------+---------+
|      date| avgPrice|
+----------+---------+
|2022-06-14|222403.00|
|2022-06-13|329998.88|
|2022-06-12|211683.20|
|2022-06-11|301498.07|
|2022-06-10|329435.69|
|2022-06-09|270900.92|
|2022-06-08|337155.00|
|2022-06-07|299127.36|
|2022-06-06|224655.83|
|2022-06-05|201773.38|
|2022-06-04|260888.60|
|2022-06-03|303992.00|
|2022-06-02|385372.67|
|2022-06-01|313151.00|
|2022-05-31|326754.33|
|2022-05-30|285152.17|
|2022-05-29|268591.00|
|2022-05-28|358086.18|
|2022-05-27|324787.00|
|2022-05-26|289457.25|
+----------+---------+
only showing top 20 rows



In [7]:
# 4. What is the average price of a home for each year the home was built,
# that have 3 bedrooms and 3 bathrooms, rounded to two decimal places?
spark.sql("""
select date_built, cast(AVG(price) as numeric (9,2)) as avgPrice
from home_sales
where bedrooms==3 and bathrooms==3
group by date_built
order by date_built DESC
"""
).show()


+----------+---------+
|date_built| avgPrice|
+----------+---------+
|      2017|292676.79|
|      2016|290555.07|
|      2015|288770.30|
|      2014|290852.27|
|      2013|295962.27|
|      2012|293683.19|
|      2011|291117.47|
|      2010|292859.62|
+----------+---------+



In [8]:
# 5. What is the average price of a home for each year the home was built,
# that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet, rounded to two decimal places?
spark.sql("""
select date_built as year_built, cast(AVG(price) as numeric (9,2)) as avgPrice
from home_sales
where bedrooms==3 and bathrooms==3 and floors==2 and sqft_living>= 2000
group by date_built
order by date_built DESC
"""
).show()


+----------+---------+
|year_built| avgPrice|
+----------+---------+
|      2017|280317.58|
|      2016|293965.10|
|      2015|297609.97|
|      2014|298264.72|
|      2013|303676.79|
|      2012|307539.97|
|      2011|276553.81|
|      2010|285010.22|
+----------+---------+



In [9]:
# 6. What is the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000? Order by descending view rating.
# Although this is a small dataset, determine the run time for this query.

start_time = time.time()

spark.sql("""
select DISTINCT view, cast(AVG(price) as numeric (9,2)) as avgPrice
from home_sales
group by view
having avgPrice >= 350000
order by view DESC
"""
).show(100)

print("--- %s seconds ---" % (time.time() - start_time))

+----+----------+
|view|  avgPrice|
+----+----------+
|  99|1061201.42|
|  98|1053739.33|
|  97|1129040.15|
|  96|1017815.92|
|  95|1054325.60|
|  94|1033536.20|
|  93|1026006.06|
|  92| 970402.55|
|  91|1137372.73|
|  90|1062654.16|
|  89|1107839.15|
|  88|1031719.35|
|  87|1072285.20|
|  86|1070444.25|
|  85|1056336.74|
|  84|1117233.13|
|  83|1033965.93|
|  82|1063498.00|
|  81|1053472.79|
|  80| 991767.38|
|  79|1009565.08|
|  78|1080649.37|
|  77|1076205.56|
|  76|1058802.78|
|  75|1114042.94|
|  74| 745077.00|
|  73| 752861.18|
|  72| 780914.67|
|  71| 775651.10|
|  70| 695865.58|
|  69| 750537.94|
|  68| 716785.44|
|  67| 737970.96|
|  66| 712475.00|
|  65| 736679.93|
|  64| 767036.67|
|  63| 711614.55|
|  62| 759150.14|
|  61| 746877.59|
|  60| 754939.65|
|  59| 791453.00|
|  58| 759764.65|
|  57| 734340.50|
|  56| 718176.40|
|  55| 771153.32|
|  54| 798684.82|
|  53| 755214.80|
|  52| 733780.26|
|  51| 788128.21|
| 100|1026669.50|
+----+----------+

--- 1.119396686553955 secon

In [10]:
# 7. Cache the the temporary table home_sales.
spark.sql('cache table home_sales')

DataFrame[]

In [11]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')

True

In [12]:
# 9. Using the cached data, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the uncached runtime.

start_time = time.time()

spark.sql("""
select DISTINCT view, cast(AVG(price) as numeric (9,2)) as avgPrice
from home_sales
group by view
having avgPrice >= 350000
order by view DESC
"""
).show(100)

print("--- %s seconds ---" % (time.time() - start_time))


+----+----------+
|view|  avgPrice|
+----+----------+
|  99|1061201.42|
|  98|1053739.33|
|  97|1129040.15|
|  96|1017815.92|
|  95|1054325.60|
|  94|1033536.20|
|  93|1026006.06|
|  92| 970402.55|
|  91|1137372.73|
|  90|1062654.16|
|  89|1107839.15|
|  88|1031719.35|
|  87|1072285.20|
|  86|1070444.25|
|  85|1056336.74|
|  84|1117233.13|
|  83|1033965.93|
|  82|1063498.00|
|  81|1053472.79|
|  80| 991767.38|
|  79|1009565.08|
|  78|1080649.37|
|  77|1076205.56|
|  76|1058802.78|
|  75|1114042.94|
|  74| 745077.00|
|  73| 752861.18|
|  72| 780914.67|
|  71| 775651.10|
|  70| 695865.58|
|  69| 750537.94|
|  68| 716785.44|
|  67| 737970.96|
|  66| 712475.00|
|  65| 736679.93|
|  64| 767036.67|
|  63| 711614.55|
|  62| 759150.14|
|  61| 746877.59|
|  60| 754939.65|
|  59| 791453.00|
|  58| 759764.65|
|  57| 734340.50|
|  56| 718176.40|
|  55| 771153.32|
|  54| 798684.82|
|  53| 755214.80|
|  52| 733780.26|
|  51| 788128.21|
| 100|1026669.50|
+----+----------+

--- 0.5448038578033447 seco

In [13]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data
df.write.partitionBy('date_built').parquet('home_sales',mode='overwrite')

In [14]:
# 11. Read the parquet formatted data.
p_df=spark.read.parquet('home_sales')

In [15]:
# 12. Create a temporary table for the parquet data.
p_df.createOrReplaceTempView('p_home_sales')

In [16]:
# 13. Using the parquet DataFrame, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the cached runtime.

start_time = time.time()
spark.sql("""
select DISTINCT view, cast(AVG(price) as numeric (9,2)) as avgPrice
from p_home_sales
group by view
having avgPrice >= 350000
order by view DESC
"""
).show(100)

print("--- %s seconds ---" % (time.time() - start_time))

+----+----------+
|view|  avgPrice|
+----+----------+
|  99|1061201.42|
|  98|1053739.33|
|  97|1129040.15|
|  96|1017815.92|
|  95|1054325.60|
|  94|1033536.20|
|  93|1026006.06|
|  92| 970402.55|
|  91|1137372.73|
|  90|1062654.16|
|  89|1107839.15|
|  88|1031719.35|
|  87|1072285.20|
|  86|1070444.25|
|  85|1056336.74|
|  84|1117233.13|
|  83|1033965.93|
|  82|1063498.00|
|  81|1053472.79|
|  80| 991767.38|
|  79|1009565.08|
|  78|1080649.37|
|  77|1076205.56|
|  76|1058802.78|
|  75|1114042.94|
|  74| 745077.00|
|  73| 752861.18|
|  72| 780914.67|
|  71| 775651.10|
|  70| 695865.58|
|  69| 750537.94|
|  68| 716785.44|
|  67| 737970.96|
|  66| 712475.00|
|  65| 736679.93|
|  64| 767036.67|
|  63| 711614.55|
|  62| 759150.14|
|  61| 746877.59|
|  60| 754939.65|
|  59| 791453.00|
|  58| 759764.65|
|  57| 734340.50|
|  56| 718176.40|
|  55| 771153.32|
|  54| 798684.82|
|  53| 755214.80|
|  52| 733780.26|
|  51| 788128.21|
| 100|1026669.50|
+----+----------+

--- 1.3462038040161133 seco

In [17]:
# 14. Uncache the home_sales temporary table.
spark.sql("uncache table home_sales")

DataFrame[]

In [18]:
# 15. Check if the home_sales is no longer cached
spark.catalog.isCached("home_sales")


False