In [21]:
# Import findspark and initialize. 
import findspark
findspark.init()

In [22]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [41]:
# 1. Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"

## NOTE: unable to load from URL, downloaded CSV file --> loaded CSV in next CELL

In [24]:
import pandas as pd

# Read the CSV file into a Pandas DataFrame
df = pd.read_csv("home_sales_revised.csv")

# Display the first 10 rows
df.head(10)

Unnamed: 0,id,date,date_built,price,bedrooms,bathrooms,sqft_living,sqft_lot,floors,waterfront,view
0,f8a53099-ba1c-47d6-9c31-7398aa8f6089,2022-04-08,2016,936923,4,3,3167,11733,2,1,76
1,7530a2d8-1ae3-4517-9f4a-befe060c4353,2021-06-13,2013,379628,2,2,2235,14384,1,0,23
2,43de979c-0bf0-4c9f-85ef-96dc27b258d5,2019-04-12,2014,417866,2,2,2127,10575,2,0,0
3,b672c137-b88c-48bf-9f18-d0a4ac62fb8b,2019-10-16,2016,239895,2,2,1631,11149,2,0,0
4,e0726d4d-d595-4074-8283-4139a54d0d63,2022-01-08,2017,424418,3,2,2249,13878,2,0,4
5,5aa00529-0533-46ba-870c-9e881580ef35,2019-01-30,2017,218712,2,3,1965,14375,2,0,7
6,131492a1-72e2-4a84-bf97-0db14973bfdb,2020-02-08,2017,419199,2,3,2062,8876,2,0,6
7,8d54a71b-c520-44e5-8ba1-5a84be03ad35,2019-07-21,2010,323956,2,3,1506,11816,1,0,25
8,e81aacfe-17fe-46b1-a52a-4753d1622b4a,2020-06-16,2016,181925,3,3,2137,11709,2,0,22
9,2ed8d509-7372-46d5-a9dd-9281a95467d4,2021-08-06,2015,258710,3,3,1918,9666,1,0,25


In [29]:
# Read the CSV file into a Spark DataFrame
spark_df = spark.read.csv("home_sales_revised.csv", header=True, inferSchema=True)

# Create a temporary view for the Spark DataFrame
spark_df.createOrReplaceTempView("home_sales")

# Show the first 10 rows of the Spark DataFrame
spark_df.show(10)

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
|5aa00529-0533-46b...|2019-01-30|      2017|218712|       2|        3|       196

In [31]:
# 3. What is the average price for a four bedroom house sold in each year rounded to two decimal places?
result = spark.sql("""
    SELECT date_built, bedrooms, ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    WHERE bedrooms = 4
    GROUP BY date_built, bedrooms
    ORDER BY date_built
""")

result.show()

+----------+--------+---------+
|date_built|bedrooms|avg_price|
+----------+--------+---------+
|      2010|       4|296800.75|
|      2011|       4| 302141.9|
|      2012|       4|298233.42|
|      2013|       4|299999.39|
|      2014|       4|299073.89|
|      2015|       4|307908.86|
|      2016|       4|296050.24|
|      2017|       4|296576.69|
+----------+--------+---------+



In [33]:
# 4. What is the average price of a home for each year the home was built that have 3 bedrooms and 3 bathrooms rounded to two decimal places?
result = spark.sql("""
    SELECT date_built, bedrooms, bathrooms, ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    WHERE bedrooms = 3 AND bathrooms = 3
    GROUP BY date_built, bedrooms, bathrooms
    ORDER BY date_built
    """)

result.show()

+----------+--------+---------+---------+
|date_built|bedrooms|bathrooms|avg_price|
+----------+--------+---------+---------+
|      2010|       3|        3|292859.62|
|      2011|       3|        3|291117.47|
|      2012|       3|        3|293683.19|
|      2013|       3|        3|295962.27|
|      2014|       3|        3|290852.27|
|      2015|       3|        3| 288770.3|
|      2016|       3|        3|290555.07|
|      2017|       3|        3|292676.79|
+----------+--------+---------+---------+



In [34]:
#  5. What is the average price of a home for each year built that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet rounded to two decimal places?
result = spark.sql("""
    SELECT date_built, bedrooms, bathrooms, floors, sqft_living, ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    WHERE bedrooms = 3 AND bathrooms = 3 AND floors = 2 AND sqft_living >= 2000
    GROUP BY date_built, bedrooms, bathrooms, floors, sqft_living
    ORDER BY date_built
""")

result.show()

+----------+--------+---------+------+-----------+---------+
|date_built|bedrooms|bathrooms|floors|sqft_living|avg_price|
+----------+--------+---------+------+-----------+---------+
|      2010|       3|        3|     2|       2131| 279680.0|
|      2010|       3|        3|     2|       2111| 259382.0|
|      2010|       3|        3|     2|       2172| 266280.0|
|      2010|       3|        3|     2|       2055| 177217.0|
|      2010|       3|        3|     2|       2395| 123326.0|
|      2010|       3|        3|     2|       2313| 130941.0|
|      2010|       3|        3|     2|       2149| 228717.0|
|      2010|       3|        3|     2|       2215| 145507.0|
|      2010|       3|        3|     2|       2387| 383197.0|
|      2010|       3|        3|     2|       3945| 890532.0|
|      2010|       3|        3|     2|       2483| 229909.0|
|      2010|       3|        3|     2|       2148|282430.33|
|      2010|       3|        3|     2|       2341| 384378.0|
|      2010|       3|   

In [36]:
# 6. What is the "view" rating for the average price of a home, rounded to two decimal places, where the homes are greater than
# or equal to $350,000? Although this is a small dataset, determine the run time for this query.

import time

# Record the start time
start_time = time.time()

# Run a SparkSQL query to calculate the "view" rating for the average price of homes greater than or equal to $350,000
result = spark.sql("""
    SELECT ROUND(AVG(view), 2) AS avg_view
    FROM home_sales
    WHERE price >= 350000
""")

# Show the result
result.show()

# Calculate and print the query runtime
end_time = time.time()
print("--- %s seconds ---" % (end_time - start_time))

+--------+
|avg_view|
+--------+
|   32.26|
+--------+

--- 0.1634998321533203 seconds ---


In [37]:
# 7. Cache the the temporary table home_sales.
spark.sql("CACHE TABLE home_sales")

DataFrame[]

In [38]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')

True

In [40]:
# 9. Using the cached data, run the query that filters out the view ratings with average price 
#  greater than or equal to $350,000. Determine the runtime and compare it to uncached runtime.

import time

# Record the start time
start_time = time.time()

# Run a SparkSQL query to filter out view ratings with average price greater than or equal to $350,000
result = spark.sql("""
    SELECT view
    FROM home_sales
    WHERE price >= 350000
""")

# Show the result
result.show()

# Calculate and print the query runtime
end_time = time.time()
print("--- %s seconds ---" % (end_time - start_time))

+----+
|view|
+----+
|  76|
|  23|
|   0|
|   4|
|   6|
|   7|
|  26|
|  34|
|  23|
|  48|
|  16|
|  10|
|  24|
|  91|
|  49|
|  37|
|  23|
|  25|
|  23|
|   1|
+----+
only showing top 20 rows

--- 0.06642603874206543 seconds ---


In [42]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data 
# Create Parquet partitioned by 'date_built'
spark_df.write.partitionBy("date_built").mode("overwrite").parquet("home_sales_partitioned")

                                                                                

In [44]:
# 11. Read the formatted parquet data.
parquet_df = spark.read.option("basePath", "home_sales_partitioned").parquet("home_sales_partitioned")

In [45]:
# 12. Create a temporary table for the parquet data.
parquet_df.createOrReplaceTempView("home_sales_parquet")

In [46]:
# 13. Run the query that filters out the view ratings with average price of greater than or eqaul to $350,000 
# with the parquet DataFrame. Round your average to two decimal places. 
# Determine the runtime and compare it to the cached version. 

start_time = time.time()

parquet_result = spark.sql("""
    SELECT view
    FROM home_sales_parquet
    WHERE price >= 350000
""")

# Show the result
parquet_result.show()

end_time = time.time()
print("--- %s seconds ---" % (end_time - start_time))

+----+
|view|
+----+
|  10|
|  24|
|   1|
|   4|
|  39|
|  90|
|  50|
|   6|
|   0|
|  26|
|  17|
|  10|
|  58|
|  82|
|   2|
|  44|
|  45|
|  88|
|  49|
|  56|
+----+
only showing top 20 rows

--- 0.21191191673278809 seconds ---


In [47]:
# Analysis: SPARK runs faster

# RUNTIME SPARK = 0.06642603874206543 seconds 
# RUNTIME PARQUET = 0.21191191673278809 seconds

In [48]:
# 14. Uncache the home_sales temporary table.
spark.catalog.uncacheTable("home_sales")

In [49]:
# 15. Check if the home_sales is no longer cached
is_cached = spark.catalog.isCached("home_sales")

if is_cached:
    print("The home_sales table is still cached.")
else:
    print("The home_sales table is no longer cached.")

The home_sales table is no longer cached.
