In [None]:
# Import findspark and initialize. 
import findspark
findspark.init()

In [2]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

<class 'ModuleNotFoundError'>: No module named 'pyspark'

In [3]:
# 1. Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"
spark.sparkContext.addFile(url)
df_sales = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), sep=",", header=True)
df_sales.show(10)

<class 'ModuleNotFoundError'>: No module named 'pyspark'

In [4]:
# 2. Create a temporary view of the DataFrame.
df_sales.createOrReplaceTempView('home_sales')


<class 'NameError'>: name 'df_sales' is not defined

In [5]:
# 3. What is the average price for a four bedroom house sold per year, rounded to two decimal places?
q_3 = """
SELECT 
    YEAR(date) AS year,
    FORMAT_NUMBER(ROUND(AVG(price), 2), '$#,##0.00') AS average_price
FROM home_sales
WHERE bedrooms = 4
GROUP BY year
ORDER BY year DESC
"""
spark.sql(q_3).show()

+----+-------------+
|year|average_price|
+----+-------------+
|2022|  $296,363.88|
|2021|  $301,819.44|
|2020|  $298,353.78|
|2019|  $300,263.70|
+----+-------------+



In [6]:
# 4. What is the average price of a home for each year the home was built,
# that have 3 bedrooms and 3 bathrooms, rounded to two decimal places?
q_4 = """
SELECT 
    YEAR(date_built) AS year,
    FORMAT_NUMBER(ROUND(AVG(price), 2), '$#,##0.00') AS average_price
FROM home_sales
WHERE bedrooms = 3 AND bathrooms = 3
GROUP BY year
ORDER BY year 
"""
spark.sql(q_4).show()

+----+-------------+
|year|average_price|
+----+-------------+
|2010|  $292,859.62|
|2011|  $291,117.47|
|2012|  $293,683.19|
|2013|  $295,962.27|
|2014|  $290,852.27|
|2015|  $288,770.30|
|2016|  $290,555.07|
|2017|  $292,676.79|
+----+-------------+



In [7]:
# 5. What is the average price of a home for each year the home was built,
# that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet, rounded to two decimal places?
q_5 = """
SELECT 
    YEAR(date_built) AS year,
    FORMAT_NUMBER(ROUND(AVG(price), 2), '$#,##0.00') AS average_price
FROM home_sales
WHERE bedrooms = 3 AND bathrooms = 3
AND floors = 2 AND sqft_living >= 2000
GROUP BY year
ORDER BY year 
"""
spark.sql(q_5).show()


+----+-------------+
|year|average_price|
+----+-------------+
|2010|  $285,010.22|
|2011|  $276,553.81|
|2012|  $307,539.97|
|2013|  $303,676.79|
|2014|  $298,264.72|
|2015|  $297,609.97|
|2016|  $293,965.10|
|2017|  $280,317.58|
+----+-------------+



In [8]:
# 6. What is the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000? Order by descending view rating. 
# Although this is a small dataset, determine the run time for this query.

start_time = time.time()
q_6 = """
SELECT 
    view,
    FORMAT_NUMBER(ROUND(AVG(price), 2), '$#,##0.00') AS average_price
FROM home_sales
GROUP BY view
HAVING AVG(price) >= 350000
ORDER BY view DESC 
"""
spark.sql(q_6).show(10)

time_uncached = (time.time() - start_time)

print("--- %s seconds ---" % time_uncached)

+----+-------------+
|view|average_price|
+----+-------------+
|  99|$1,061,201.42|
|  98|$1,053,739.33|
|  97|$1,129,040.15|
|  96|$1,017,815.92|
|  95|$1,054,325.60|
|  94|$1,033,536.20|
|  93|$1,026,006.06|
|  92|  $970,402.55|
|  91|$1,137,372.73|
|  90|$1,062,654.16|
+----+-------------+
only showing top 10 rows

--- 0.42646312713623047 seconds ---


In [9]:
# 7. Cache the the temporary table home_sales.
spark.sql('cache table home_sales')

DataFrame[]

In [10]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')

True

In [11]:
# 9. Using the cached data, run the last query above, that calculates 
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000. 
# Determine the runtime and compare it to the uncached runtime.

start_time = time.time()

spark.sql(q_6).show(10)

time_cached = (time.time() - start_time)

print("unCached--- %s seconds ---" % time_uncached)
print("Cached--- %s seconds ---" % time_cached)
print("Delta between--- %s seconds ---" % (time_uncached - time_cached))


+----+-------------+
|view|average_price|
+----+-------------+
|  99|$1,061,201.42|
|  98|$1,053,739.33|
|  97|$1,129,040.15|
|  96|$1,017,815.92|
|  95|$1,054,325.60|
|  94|$1,033,536.20|
|  93|$1,026,006.06|
|  92|  $970,402.55|
|  91|$1,137,372.73|
|  90|$1,062,654.16|
+----+-------------+
only showing top 10 rows

unCached--- 0.42646312713623047 seconds ---
Cached--- 0.26602983474731445 seconds ---
Delta between--- 0.16043329238891602 seconds ---


In [14]:
# 12. Create a temporary table for the parquet data.
p_df_sales.createOrReplaceTempView('p_sales')

NameError: name 'p_df_sales' is not defined

In [15]:
# 13. Using the parquet DataFrame, run the last query above, that calculates 
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000. 
# Determine the runtime and compare it to the cached runtime.

start_time = time.time()

q_13 = """
SELECT 
    view,
    FORMAT_NUMBER(ROUND(AVG(price), 2), '$#,##0.00') AS average_price
FROM home_sales
GROUP BY view
HAVING AVG(price) >= 350000
ORDER BY view DESC 
"""
##############################################################################
################################### ALERT* ###################################
### would use FROM p_sales if I wasn't running into a hadoop parquet issue ###
##############################################################################
spark.sql(q_13).show(10)

time_parqed = (time.time() - start_time)

print("unCached--- %s seconds ---" % time_uncached)
print("Cached--- %s seconds ---" % time_cached)
print("Parqed*--- %s seconds ---" % time_parqed)
print("Delta between C&P*--- %s seconds ---" % (time_cached - time_parqed))

+----+-------------+
|view|average_price|
+----+-------------+
|  99|$1,061,201.42|
|  98|$1,053,739.33|
|  97|$1,129,040.15|
|  96|$1,017,815.92|
|  95|$1,054,325.60|
|  94|$1,033,536.20|
|  93|$1,026,006.06|
|  92|  $970,402.55|
|  91|$1,137,372.73|
|  90|$1,062,654.16|
+----+-------------+
only showing top 10 rows

unCached--- 0.42646312713623047 seconds ---
Cached--- 0.26602983474731445 seconds ---
Parqed*--- 0.2599978446960449 seconds ---
Delta between C&P*--- 0.006031990051269531 seconds ---


In [16]:
# 14. Uncache the home_sales temporary table.
spark.sql('uncache table home_sales')

DataFrame[]

In [17]:
# 15. Check if the home_sales is no longer cached

spark.catalog.isCached("home_sales")

False