In [1]:
# Import findspark and initialise. 
import findspark
findspark.init()

In [15]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [29]:
# 1a. Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), header=True, inferSchema=True, quote="\"", escape="\"")

# 1b. Create a new DataFrame with the column year.
from pyspark.sql.functions import year
df = df.withColumn("year_sold", year(df["date"]))

# 1c. Show DataFrame.
df.show()


23/05/07 08:15:11 WARN SparkContext: The path https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv has been added already. Overwriting of added paths is not supported in the current version.
+--------------------+-------------------+----------+------+--------+---------+-----------+--------+------+----------+----+---------+
|                  id|               date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|year_sold|
+--------------------+-------------------+----------+------+--------+---------+-----------+--------+------+----------+----+---------+
|f8a53099-ba1c-47d...|2022-04-08 00:00:00|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|     2022|
|7530a2d8-1ae3-451...|2021-06-13 00:00:00|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|     2021|
|43de979c-0bf0-4c9...|2019-04-12 00:00:00|      2014|417866|       2|        2|      

In [32]:
# 1d. Describe DataFrame.
df.describe().show()

+-------+--------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+-------------------+------------------+------------------+
|summary|                  id|        date_built|             price|          bedrooms|        bathrooms|       sqft_living|          sqft_lot|            floors|         waterfront|              view|         year_sold|
+-------+--------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+-------------------+------------------+------------------+
|  count|               33287|             33287|             33287|             33287|            33287|             33287|             33287|             33287|              33287|             33287|             33287|
|   mean|                null|2013.5174091987863|314907.81797698804| 3.082224291765554|2.588758374140055|2116.505752

In [53]:
# 2. Create a temporary view of the DataFrame.
df.createOrReplaceTempView('home_sales')


In [35]:
# 3a. What is the average price for a four bedroom house sold in each year rounded to two decimal places?
avg_prices = spark.sql("""
SELECT 
    year_sold AS year,
    AVG(price)
FROM home_sales 
WHERE bedrooms = 4
GROUP BY year 
ORDER BY year
""").collect()

# 3b. Convert the Spark SQL query results to a list.
avg_prices_list = [row for row in avg_prices]

# 3c. Show the list rounded to two decimal places.
for row in avg_prices_list:
    print(f"Year Sold: {row[0]} Average Price: {round(row[1], 2)}")


Year Sold: 2019 Average Price: 300263.7
Year Sold: 2020 Average Price: 298353.78
Year Sold: 2021 Average Price: 301819.44
Year Sold: 2022 Average Price: 296363.88


In [36]:
# 4a. What is the average price of a home for each year the home was built that have 3 bedrooms and 3 bathrooms rounded to two decimal places?
avg_prices = spark.sql("""
SELECT
    date_built,
    AVG(price)
FROM home_sales
WHERE bedrooms = 3 AND bathrooms = 3
GROUP BY date_built
ORDER BY date_built
""").collect()

# 4b. Convert the Spark SQL query results to a list.
avg_prices_list = [row for row in avg_prices]

# 4c. Show the list rounded to two decimal places.
for row in avg_prices_list:
    print(f"Year Built: {row[0]} Average Price: {round(row[1], 2)}")




Year Built: 2010 Average Price: 292859.62
Year Built: 2011 Average Price: 291117.47
Year Built: 2012 Average Price: 293683.19
Year Built: 2013 Average Price: 295962.27
Year Built: 2014 Average Price: 290852.27
Year Built: 2015 Average Price: 288770.3
Year Built: 2016 Average Price: 290555.07
Year Built: 2017 Average Price: 292676.79


In [37]:
#  5a. What is the average price of a home for each year built that have 3 bedrooms, 3 bathrooms, 
# with two floors, and are greater than or equal to 2,000 square feet rounded to two decimal places?
avg_prices = spark.sql("""
SELECT
    date_built,
    AVG(price)
FROM home_sales
WHERE bedrooms = 3 AND bathrooms = 3 AND floors = 2 AND sqft_living >= 2000
GROUP BY date_built
ORDER BY date_built
""").collect()

# 5b. Convert the Spark SQL query results to a list.
avg_prices_list = [row for row in avg_prices]

# 5c. Show the list rounded to two decimal places.
for row in avg_prices_list:
    print(f"Year Built: {row[0]} Average Price: {round(row[1], 2)}")


Year Built: 2010 Average Price: 285010.22
Year Built: 2011 Average Price: 276553.81
Year Built: 2012 Average Price: 307539.97
Year Built: 2013 Average Price: 303676.79
Year Built: 2014 Average Price: 298264.72
Year Built: 2015 Average Price: 297609.97
Year Built: 2016 Average Price: 293965.1
Year Built: 2017 Average Price: 280317.58


In [65]:
# 6a. What is the "view" rating for the average price of a home, rounded to two decimal places, where the homes are greater than
# or equal to $350,000? Although this is a small dataset, determine the run time for this query.
start_time_nc = time.time()

spark.sql("""
SELECT
    AVG(view),
    AVG(price)
FROM home_sales
WHERE price >= 350000
""").show()

print("--- %s seconds ---" % (time.time() - start_time_nc))

avg_prices = spark.sql("""
SELECT
    AVG(view),
    AVG(price)
FROM home_sales
WHERE price >= 350000
""").collect()

#6b. Convert the Spark SQL query results to a list.
avg_prices_list = [row for row in avg_prices]

# 6c. Show the list rounded to two decimal places.
print()
for row in avg_prices_list:
    print(f"Average View Rating: {round(row[0], 2)} Average Price: {round(row[1], 2)}")


+---------+------------------+
|avg(view)|        avg(price)|
+---------+------------------+
|   32.264|473796.26220224716|
+---------+------------------+

--- 0.042181968688964844 seconds ---

Average View Rating: 32.26 Average Price: 473796.26


In [54]:
# 7. Cache the the temporary table home_sales.
spark.catalog.cacheTable('home_sales')


In [55]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')


True

In [66]:
# 9a. Using the cached data, run the query that filters out the view ratings with average price 
#  greater than or equal to $350,000. Determine the runtime and compare it to uncached runtime.
start_time_c = time.time()

spark.sql("""
SELECT
    AVG(view),
    AVG(price)
FROM home_sales
WHERE price >= 350000
""").show()

print("--- %s seconds ---" % (time.time() - start_time_c))

# 9b. Dsplay the difference between the run times.
print()
print(start_time_nc - start_time_c)


+---------+------------------+
|avg(view)|        avg(price)|
+---------+------------------+
|   32.264|473796.26220224716|
+---------+------------------+

--- 0.042514801025390625 seconds ---
-2.951399087905884


In [None]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data 


In [None]:
# 11. Read the formatted parquet data.


In [None]:
# 12. Create a temporary table for the parquet data.


In [None]:
# 13. Run the query that filters out the view ratings with average price of greater than or eqaul to $350,000 
# with the parquet DataFrame. Round your average to two decimal places. 
# Determine the runtime and compare it to the cached version. 

start_time = time.time()



print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
# 14. Uncache the home_sales temporary table.


In [None]:
# 15. Check if the home_sales is no longer cached

