# [SparkSQL] Home Sales Dataset - Key Metrics
---
## Step #1 - Import Relevant Dependencies & Create Spark Session

In [1]:
import findspark

# Initialise findspark to find and connect with the Spark cluster; useful automation to make appropriate configs to my Python environment
findspark.init()

In [2]:
from pyspark.sql import SparkSession
import time

# Instantiate my Spark Session
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

## Step #2: Read `home_sales_revised.csv` into a Spark DataFrame
---

In [3]:
# Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles

# Store the URL of the AWS S3 bucket containing the CSV Dataset
csv_url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"

# Download the CSV file from the AWS S3 bucket and distribute it to all nodes in the Spark cluster
spark.sparkContext.addFile(csv_url)

# From this Spark job, read the CSV dataset into a Spark DataFrame that is comma separated and the first row are column headers
home_df = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), sep = ",", header = True)

## Step #3: Create a Temporary Table Called `home_sales`
---

In [4]:
# Create a temporary table or view
home_df.createOrReplaceTempView("home_sales")

# Now, the the temporary table can be used for  SparkSQL queries
query_all = spark.sql("SELECT * FROM home_sales")

# Preview the Temporary Table containing the Home Sales Dataset
query_all.show()

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
|5aa00529-0533-46b...|2019-01-30|      2017|218712|       2|        3|       196

## Step #4: Perform SparkSQL Queries
---
### Query #1: What is the average price for a four-bedroom house sold for each year? (Round to 2 d.p)

In [5]:
start_time = time.time()

query_one = spark.sql("""
                    SELECT YEAR(date) AS year_sold, ROUND(AVG(price), 2) AS avg_price
                    FROM home_sales
                    WHERE bedrooms = 4
                    GROUP BY year_sold
                    ORDER BY year_sold
                    """
                    )

print("Query #1 Runtime: %s seconds" % (time.time() - start_time))
query_one.show()

Query #1 Runtime: 0.1874985694885254 seconds
+---------+---------+
|year_sold|avg_price|
+---------+---------+
|     2019| 300263.7|
|     2020|298353.78|
|     2021|301819.44|
|     2022|296363.88|
+---------+---------+



### Query #2: What is the average price of a home for each year it was built that has three bedrooms and three bathrooms? (Round to 2 d.p)

In [6]:
start_time = time.time()

query_two = spark.sql("""
                    SELECT date_built AS year_built, ROUND(AVG(price), 2) AS avg_price
                    FROM home_sales
                    WHERE bedrooms = 3 AND bathrooms = 3
                    GROUP BY date_built
                    ORDER BY date_built
                    """
                    )

print("Query #2 Runtime: %s seconds" % (time.time() - start_time))
query_two.show()

Query #2 Runtime: 0.13862943649291992 seconds
+----------+---------+
|year_built|avg_price|
+----------+---------+
|      2010|292859.62|
|      2011|291117.47|
|      2012|293683.19|
|      2013|295962.27|
|      2014|290852.27|
|      2015| 288770.3|
|      2016|290555.07|
|      2017|292676.79|
+----------+---------+



### Query #3: What is the average price of a home for each year that has three bedrooms, three bathrooms, two floors, and is greater than or equal to 2,000 square feet? (Round to 2 d.p)

In [7]:
start_time = time.time()

query_three = spark.sql("""
                        SELECT date_built AS year_built, ROUND(AVG(price), 2) AS avg_price
                        FROM home_sales
                        WHERE bedrooms = 3 AND bathrooms = 3 AND floors = 2 AND sqft_living >= 2000
                        GROUP BY date_built
                        ORDER BY date_built
                        """
                        )

print("Query #3 Runtime: %s seconds" % (time.time() - start_time))
query_three.show()

Query #3 Runtime: 0.039893388748168945 seconds
+----------+---------+
|year_built|avg_price|
+----------+---------+
|      2010|285010.22|
|      2011|276553.81|
|      2012|307539.97|
|      2013|303676.79|
|      2014|298264.72|
|      2015|297609.97|
|      2016| 293965.1|
|      2017|280317.58|
+----------+---------+



### Query #4: What is the "view" rating for the average price of homes costing more than or equal to $350,000? (Round to 2 d.p)

In [8]:
start_time = time.time()

query_four = spark.sql("""
                       SELECT CAST(view AS INT) AS view_rating, ROUND(AVG(price), 2) AS avg_price
                       FROM home_sales
                       WHERE price >= 350000
                       GROUP BY view_rating
                       ORDER BY view_rating ASC
                       """
                       )

runtime_query_four = time.time() - start_time

print("Query #4 (Uncached) Runtime: %s seconds" % (runtime_query_four))
query_four.show()

Query #4 (Uncached) Runtime: 0.039893150329589844 seconds
+-----------+---------+
|view_rating|avg_price|
+-----------+---------+
|          0|403848.51|
|          1|401044.25|
|          2|397389.25|
|          3| 398867.6|
|          4|399631.89|
|          5|401471.82|
|          6|395655.38|
|          7|403005.77|
|          8|398592.71|
|          9|401393.34|
|         10|401868.43|
|         11|399548.12|
|         12|401501.32|
|         13|398917.98|
|         14|398570.03|
|         15| 404673.3|
|         16|399586.53|
|         17|398474.49|
|         18|399332.91|
|         19|398953.17|
+-----------+---------+
only showing top 20 rows



## Step #5: Cache the Temporary Table `home_sales`
---

In [9]:
spark.catalog.cacheTable("home_sales")

## Step #6: Confirm that the Temporary Table `home_sales` is Cached
---

In [10]:
spark.catalog.isCached('home_sales')

True

## Step #7: Using the Cached Data, Run a SparkSQL Query that Filters Out the View Ratings with an Average Price of Greater than or Equal to $350,000. Determine the Runtime and Compare it to the Uncached Runtime
---

In [11]:
start_time = time.time()

query_five = spark.sql("""
                       SELECT CAST(view AS INT) AS view_rating, ROUND(AVG(price), 2) AS avg_price
                       FROM home_sales
                       WHERE price >= 350000
                       GROUP BY view_rating
                       ORDER BY view_rating ASC
                       """
                       )

runtime_query_five = time.time() - start_time

print("Query #5 (Cached) Runtime: %s seconds" % (runtime_query_five))
print()

if (runtime_query_five < runtime_query_four) :
    print("The Runtime for the Cached Query is FASTER than the Uncached Query")
    print()
else :
    print("The Runtime for the Uncached Query is FASTER than the Cached Query")
    print()

query_five.show()

Query #5 (Cached) Runtime: 0.030916929244995117 seconds

The Runtime for the Cached Query is FASTER than the Uncached Query

+-----------+---------+
|view_rating|avg_price|
+-----------+---------+
|          0|403848.51|
|          1|401044.25|
|          2|397389.25|
|          3| 398867.6|
|          4|399631.89|
|          5|401471.82|
|          6|395655.38|
|          7|403005.77|
|          8|398592.71|
|          9|401393.34|
|         10|401868.43|
|         11|399548.12|
|         12|401501.32|
|         13|398917.98|
|         14|398570.03|
|         15| 404673.3|
|         16|399586.53|
|         17|398474.49|
|         18|399332.91|
|         19|398953.17|
+-----------+---------+
only showing top 20 rows



## Step #8: Partition by the `date_built` Field on the Formatted Parquet Home Sales Data (`parquet_df`)
---

In [12]:
parquet_output_directory = "data_partitioned_by_date_built"

home_df.write.partitionBy("date_built").mode("overwrite").parquet(parquet_output_directory)

# Read the Formatted Paraquet Data
parquet_df = spark.read.parquet(parquet_output_directory)

## Step #9: Create a Temporary Table for the Parquet Data (`parquet_home_sales`)
---

In [13]:
parquet_df.createOrReplaceTempView("parquet_home_sales")

## Step #10: Using the Parquet Temporary Table (`parquet_home_sales`), Run a SparkSQL Query that Filters Out the View Ratings with an Average Price of Greater than or Equal to $350,000. Determine the Runtime and Compare it to the Uncached Runtime
---

In [14]:
start_time = time.time()

query_six = spark.sql("""
                      SELECT CAST(view AS INT) AS view_rating, ROUND(AVG(price), 2) AS avg_price
                      FROM parquet_home_sales
                      WHERE price >= 350000
                      GROUP BY view_rating
                      ORDER BY view_rating ASC
                      """
                      )

runtime_query_six = time.time() - start_time

print("Query #6 (Parquet) Runtime: %s seconds" % (runtime_query_six))
print()

if (runtime_query_six < runtime_query_four) :
    print("The Runtime for the Parquet Query is FASTER than the Uncached Query")
    print()
else :
    print("The Runtime for the Uncached Query is FASTER than the Parquet Query")
    print()

query_six.show()

Query #6 (Parquet) Runtime: 0.01795196533203125 seconds

The Runtime for the Parquet Query is FASTER than the Uncached Query

+-----------+---------+
|view_rating|avg_price|
+-----------+---------+
|          0|403848.51|
|          1|401044.25|
|          2|397389.25|
|          3| 398867.6|
|          4|399631.89|
|          5|401471.82|
|          6|395655.38|
|          7|403005.77|
|          8|398592.71|
|          9|401393.34|
|         10|401868.43|
|         11|399548.12|
|         12|401501.32|
|         13|398917.98|
|         14|398570.03|
|         15| 404673.3|
|         16|399586.53|
|         17|398474.49|
|         18|399332.91|
|         19|398953.17|
+-----------+---------+
only showing top 20 rows



## Step #11: Uncache the `home_sales` Temporary Table
---

In [15]:
spark.catalog.uncacheTable('home_sales')

## Step #12: Verify the `home_sales` Temporary Table is Uncached
---

In [16]:
spark.catalog.isCached('home_sales')

False

In [17]:
# Terminate the Spark session
spark.stop()