# Home Sales SparkSQL Challenge

## Setup

### Spark for Colab

In [1]:
import os

# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
spark_version = 'spark-3.5.5'
os.environ['SPARK_VERSION'] = spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/{spark_version}/{spark_version}-bin-hadoop3.tgz
!tar xf {spark_version}-bin-hadoop3.tgz
%pip install -q findspark

# Set Environment Variables
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-11-openjdk-amd64'
os.environ['SPARK_HOME'] = f'/content/spark-3.5.5-bin-hadoop3'

# Start a SparkSession
import findspark
findspark.init()

'apt-get' is not recognized as an internal or external command,
operable program or batch file.
The system cannot find the path specified.
'wget' is not recognized as an internal or external command,
operable program or batch file.
tar: Error opening archive: Failed to open 'spark-3.5.5-bin-hadoop3.tgz'


Note: you may need to restart the kernel to use updated packages.


Exception: Unable to find py4j in /content/spark-3.5.5-bin-hadoop3\python, your SPARK_HOME may not be configured correctly

### Import dependencies

In [None]:
# Import packages
from pyspark import SparkFiles
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName('SparkSQL').getOrCreate()

## Main Challenge

### Extract data from AWS S3 Bucket

In [None]:
# 1. Read the home_sales_revised.csv from the provided AWS S3 bucket location into a PySpark DataFrame.
url = 'https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv'

spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get('home_sales_revised.csv'), sep = ',', header = True, inferSchema = True)

### Examine the the data

In [None]:
# (check to make sure schema was read in properly)
df.printSchema()

NameError: name 'df' is not defined

In [None]:
# Examining summary of dataset
df.show(5)

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
+--------------------+----------+----------+------+--------+---------+----------

### Initiate SparkSQL

In [None]:
# 2. Create a temporary view of the DataFrame.
df.createOrReplaceTempView('home_sales')

In [None]:
# 3. What is the average price for a four bedroom house sold per year, rounded to two decimal places?
query = (
    '''
    SELECT
        YEAR(date) AS year,
        ROUND(AVG(price), 2) AS avg_price
    FROM
        home_sales
    WHERE
        bedrooms = 4
    GROUP BY
        YEAR(date)
    '''
)
spark.sql(query).show()

+----+---------+
|year|avg_price|
+----+---------+
|2022|296363.88|
|2019| 300263.7|
|2020|298353.78|
|2021|301819.44|
+----+---------+



In [None]:
# 4. What is the average price of a home for each year the home was built,
# that have 3 bedrooms and 3 bathrooms, rounded to two decimal places?
query = (
    '''
    SELECT
        date_built AS year_built,
        ROUND(AVG(price), 2) AS avg_price
    FROM
        home_sales
    WHERE
        bedrooms = 3 AND
        bathrooms = 3
    GROUP BY
        date_built
    '''
)

spark.sql(query).show()

+----------+---------+
|year_built|avg_price|
+----------+---------+
|      2015| 288770.3|
|      2013|295962.27|
|      2014|290852.27|
|      2012|293683.19|
|      2016|290555.07|
|      2010|292859.62|
|      2011|291117.47|
|      2017|292676.79|
+----------+---------+



In [None]:
# 5. What is the average price of a home for each year the home was built,
# that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet, rounded to two decimal places?
query = (
    '''
    SELECT
        date_built AS year_built,
        ROUND(AVG(price), 2) AS avg_price
    FROM
        home_sales
    WHERE
        bedrooms = 3 AND
        bathrooms = 3 AND
        floors = 2 AND
        sqft_living >= 2000
    GROUP BY
        date_built
    '''
)

spark.sql(query).show()

+----------+---------+
|year_built|avg_price|
+----------+---------+
|      2015|297609.97|
|      2013|303676.79|
|      2014|298264.72|
|      2012|307539.97|
|      2016| 293965.1|
|      2010|285010.22|
|      2011|276553.81|
|      2017|280317.58|
+----------+---------+



In [None]:
# 6. What is the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000? Order by descending view rating.
# Although this is a small dataset, determine the run time for this query.

start_time = time.time()

main_query = (
    '''
    SELECT
        view,
        ROUND(AVG(price), 2) AS avg_price
    FROM
        home_sales
    GROUP BY
        view
    HAVING
        avg_price >= 350000
    ORDER BY
        view DESC
    '''
)

spark.sql(main_query).show()

uncached_time = time.time() - start_time
print("--- %s seconds ---" % uncached_time)

+----+----------+
|view| avg_price|
+----+----------+
| 100| 1026669.5|
|  99|1061201.42|
|  98|1053739.33|
|  97|1129040.15|
|  96|1017815.92|
|  95| 1054325.6|
|  94| 1033536.2|
|  93|1026006.06|
|  92| 970402.55|
|  91|1137372.73|
|  90|1062654.16|
|  89|1107839.15|
|  88|1031719.35|
|  87| 1072285.2|
|  86|1070444.25|
|  85|1056336.74|
|  84|1117233.13|
|  83|1033965.93|
|  82| 1063498.0|
|  81|1053472.79|
+----+----------+
only showing top 20 rows

--- 1.3643617630004883 seconds ---


### Caching & Optimization

#### Create a cache and verify

In [None]:
# 7. Cache the the temporary table home_sales.
spark.sql('CACHE TABLE home_sales')

DataFrame[]

In [None]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')

True

In [None]:
# 9. Using the cached data, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the uncached runtime.

start_time = time.time()

spark.sql(main_query).show()

cached_time = time.time() - start_time
print("--- %s seconds ---" % cached_time)

+----+----------+
|view| avg_price|
+----+----------+
| 100| 1026669.5|
|  99|1061201.42|
|  98|1053739.33|
|  97|1129040.15|
|  96|1017815.92|
|  95| 1054325.6|
|  94| 1033536.2|
|  93|1026006.06|
|  92| 970402.55|
|  91|1137372.73|
|  90|1062654.16|
|  89|1107839.15|
|  88|1031719.35|
|  87| 1072285.2|
|  86|1070444.25|
|  85|1056336.74|
|  84|1117233.13|
|  83|1033965.93|
|  82| 1063498.0|
|  81|1053472.79|
+----+----------+
only showing top 20 rows

--- 1.0009794235229492 seconds ---


In [None]:
diff = uncached_time - cached_time
print(f'Time difference between cached run and uncached run was {diff} seconds.')

Time difference between cached run and uncached run was 0.36338233947753906 seconds.


#### Partitioning for speed

In [None]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data
df.write.partitionBy('date_built').mode('overwrite').parquet('partitioned_home_sales')

In [None]:
# 11. Read the parquet formatted data.
p_df = spark.read.parquet('partitioned_home_sales')

In [None]:
# 12. Create a temporary table for the parquet data.
p_df.createOrReplaceTempView('p_home_sales')

In [None]:
# 13. Using the parquet DataFrame, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the cached runtime.

start_time = time.time()

spark.sql(main_query).show()

partitioned_time = time.time() - start_time

print("--- %s seconds ---" % partitioned_time)

+----+----------+
|view| avg_price|
+----+----------+
| 100| 1026669.5|
|  99|1061201.42|
|  98|1053739.33|
|  97|1129040.15|
|  96|1017815.92|
|  95| 1054325.6|
|  94| 1033536.2|
|  93|1026006.06|
|  92| 970402.55|
|  91|1137372.73|
|  90|1062654.16|
|  89|1107839.15|
|  88|1031719.35|
|  87| 1072285.2|
|  86|1070444.25|
|  85|1056336.74|
|  84|1117233.13|
|  83|1033965.93|
|  82| 1063498.0|
|  81|1053472.79|
+----+----------+
only showing top 20 rows

--- 0.604501485824585 seconds ---


#### Checking results and uncaching

In [None]:
part_against_cached_time = cached_time - partitioned_time
part_against_uncached_time = uncached_time - partitioned_time

print('Cached time - partitioned time: %s' % part_against_cached_time)
print('Uncached time - partitioned time: %s' % part_against_uncached_time)

Cached time - partitioned time: 0.39647793769836426
Uncached time - partitioned time: 0.7598602771759033


In [None]:
# 14. Uncache the home_sales temporary table.
spark.sql('UNCACHE TABLE home_sales')

DataFrame[]

In [None]:
# 15. Check if the home_sales is no longer cached
spark.catalog.isCached('home_sales')

False