<a href="https://colab.research.google.com/github/njgeorge000158/Home_Sales/blob/main/Home_Sales.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#*******************************************************************************************
 #
 #  File Name:  HomeSalesColab.ipynb
 #
 #  File Description:
 #      This interactive Python notebook, HomeSalesColab.ipynb, uses SparkSQL
 #      to determine key metrics about home sales data.
 #
 #
 #  Date            Description                             Programmer
 #  ----------      ------------------------------------    ------------------
 #  11/25/2023      Initial Development                     N. James George
 #
 #******************************************************************************************/

import os
import time

sparkVersionString \
    = 'spark-3.5.0'

os.environ['SPARK_VERSION'] \
    = sparkVersionString


# These commands install Apache Spark and Java.
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark


# These lines of code set the environmental variables.
os.environ['JAVA_HOME'] \
    = '/usr/lib/jvm/java-11-openjdk-amd64'

os.environ['SPARK_HOME'] \
    = f'/content/{sparkVersionString}-bin-hadoop3'

0% [Working]            Hit:1 http://security.ubuntu.com/ubuntu jammy-security InRelease
0% [Waiting for headers] [Connecting to cloud.r-project.org] [Connected to ppa.launchpadcontent.net                                                                                                     Hit:2 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Waiting for headers] [Connecting to cloud.r-project.org] [Connected to ppa.launchpadcontent.net                                                                                                     Hit:3 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
0% [Waiting for headers] [Connecting to cloud.r-project.org] [Connected to ppa.launchpadcontent.net                                                                                                     Hit:4 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
0% [Connecting to cloud.r-project.org] [Connected to ppa.launchpadcontent.net (185.125.190.80)] [Con              

In [2]:
import findspark
findspark.init()

from pyspark import SparkFiles
from pyspark.sql import SparkSession
from pyspark.sql.functions import round, avg

# This line of code creates a Spark Session.
currentSparkSession \
    = SparkSession.builder.appName('SparkSQL').getOrCreate()

In [3]:
# 1. Read in the AWS S3 bucket into a DataFrame.

urlString \
  = 'https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv'

currentSparkSession.sparkContext.addFile(urlString)

homeSalesDataFrame \
    = currentSparkSession.read.csv \
          (SparkFiles.get('home_sales_revised.csv'),
           sep = ',',
           header = True)

homeSalesDataFrame.show()

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
|5aa00529-0533-46b...|2019-01-30|      2017|218712|       2|        3|       196

In [4]:
# This line of code prints the table schema.
homeSalesDataFrame.printSchema()

root
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- date_built: string (nullable = true)
 |-- price: string (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- bathrooms: string (nullable = true)
 |-- sqft_living: string (nullable = true)
 |-- sqft_lot: string (nullable = true)
 |-- floors: string (nullable = true)
 |-- waterfront: string (nullable = true)
 |-- view: string (nullable = true)



In [5]:
# This line of code prints the column names and data types.
homeSalesDataFrame.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 33287 entries, 0 to 33286
Data columns (total 11 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   id           33287 non-null  object
 1   date         33287 non-null  object
 2   date_built   33287 non-null  object
 3   price        33287 non-null  object
 4   bedrooms     33287 non-null  object
 5   bathrooms    33287 non-null  object
 6   sqft_living  33287 non-null  object
 7   sqft_lot     33287 non-null  object
 8   floors       33287 non-null  object
 9   waterfront   33287 non-null  object
 10  view         33287 non-null  object
dtypes: object(11)
memory usage: 2.8+ MB


In [6]:
# 2. Create a temporary view of the DataFrame.

homeSalesDataFrame.createOrReplaceTempView('home_sales')

currentSparkSession.sql('SELECT * FROM home_sales').show()

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
|5aa00529-0533-46b...|2019-01-30|      2017|218712|       2|        3|       196

In [7]:
# 3. What is the average price for a four bedroom house sold in each year
# rounded to two decimal places?

queryString \
  = """SELECT YEAR(date) AS year,
       ROUND(AVG(price), 2) AS average_price
       FROM home_sales
       WHERE bedrooms == 4
       GROUP BY year
       ORDER BY year DESC"""

currentSparkSession.sql(queryString).show()

+----+-------------+
|year|average_price|
+----+-------------+
|2022|    296363.88|
|2021|    301819.44|
|2020|    298353.78|
|2019|     300263.7|
+----+-------------+



In [8]:
# 4. What is the average price of a home for each year the home was built
# that have 3 bedrooms and 3 bathrooms rounded to two decimal places?

queryString \
  = """SELECT YEAR(date_built) AS year_built,
       ROUND(AVG(price), 2) AS average_price
       FROM home_sales
       WHERE bedrooms == 3
          AND bathrooms == 3
       GROUP BY year_built
       ORDER BY year_built DESC"""

currentSparkSession.sql(queryString).show()

+----------+-------------+
|year_built|average_price|
+----------+-------------+
|      2017|    292676.79|
|      2016|    290555.07|
|      2015|     288770.3|
|      2014|    290852.27|
|      2013|    295962.27|
|      2012|    293683.19|
|      2011|    291117.47|
|      2010|    292859.62|
+----------+-------------+



In [9]:
# 5. What is the average price of a home for each year built that has 3 bedrooms,
# 3 bathrooms, with two floors, and are greater than or equal to 2,000 square feet
# rounded to two decimal places?

queryString \
  = """SELECT YEAR(date_built) AS year_built,
       ROUND(AVG(price), 2) AS average_price
       FROM home_sales
       WHERE bedrooms == 3
          AND bathrooms == 3
          AND floors == 2
          AND sqft_living >= 2000
       GROUP BY year_built
       ORDER BY year_built DESC"""

currentSparkSession.sql(queryString).show()

+----------+-------------+
|year_built|average_price|
+----------+-------------+
|      2017|    280317.58|
|      2016|     293965.1|
|      2015|    297609.97|
|      2014|    298264.72|
|      2013|    303676.79|
|      2012|    307539.97|
|      2011|    276553.81|
|      2010|    285010.22|
+----------+-------------+



In [10]:
# 6. What is the "view" rating for the average price of a home, rounded to two
# decimal places, where the homes are greater than or equal to $350,000?
# Although this is a small dataset, determine the run time for this query.

startTimeFloat = time.time()

queryString \
  = """SELECT view, ROUND(AVG(price), 2) AS average_price
       FROM home_sales
       GROUP BY view
       HAVING AVG(price) >= 350000
       ORDER BY view DESC"""

currentSparkSession.sql(queryString).show()

print("--- %s seconds ---" % (time.time() - startTimeFloat))

+----+-------------+
|view|average_price|
+----+-------------+
|  99|   1061201.42|
|  98|   1053739.33|
|  97|   1129040.15|
|  96|   1017815.92|
|  95|    1054325.6|
|  94|    1033536.2|
|  93|   1026006.06|
|  92|    970402.55|
|  91|   1137372.73|
|  90|   1062654.16|
|  89|   1107839.15|
|  88|   1031719.35|
|  87|    1072285.2|
|  86|   1070444.25|
|  85|   1056336.74|
|  84|   1117233.13|
|  83|   1033965.93|
|  82|    1063498.0|
|  81|   1053472.79|
|  80|    991767.38|
+----+-------------+
only showing top 20 rows

--- 1.1227097511291504 seconds ---


In [11]:
# 7. Cache the the temporary table home_sales.

currentSparkSession.sql('cache table home_sales')

DataFrame[]

In [12]:
# 8. Check if the table is cached.

currentSparkSession.catalog.isCached('home_sales')

True

In [13]:
# 9. Using the cached data, run the query that filters out the view ratings
# with average price greater than or equal to $350,000. Determine the
# runtime and compare it to uncached runtime.

startTimeFloat = time.time()

queryString \
  = """SELECT view, ROUND(AVG(price), 2) AS average_price
       FROM home_sales
       GROUP BY view
       HAVING AVG(price) >= 350000
       ORDER BY view DESC"""

currentSparkSession.sql(queryString).show()

print("--- %s seconds ---" % (time.time() - startTimeFloat))

+----+-------------+
|view|average_price|
+----+-------------+
|  99|   1061201.42|
|  98|   1053739.33|
|  97|   1129040.15|
|  96|   1017815.92|
|  95|    1054325.6|
|  94|    1033536.2|
|  93|   1026006.06|
|  92|    970402.55|
|  91|   1137372.73|
|  90|   1062654.16|
|  89|   1107839.15|
|  88|   1031719.35|
|  87|    1072285.2|
|  86|   1070444.25|
|  85|   1056336.74|
|  84|   1117233.13|
|  83|   1033965.93|
|  82|    1063498.0|
|  81|   1053472.79|
|  80|    991767.38|
+----+-------------+
only showing top 20 rows

--- 0.7347908020019531 seconds ---


In [23]:
print('The cached runtime, 0.735 seconds, is shorter than the uncached runtime, 1.123 seconds')

The cached runtime, 0.735 seconds, is shorter than the uncached runtime, 1.123 seconds


In [15]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data.

homeSalesDataFrame.write.partitionBy('date_built').mode('overwrite').parquet('p_home_sales')

In [16]:
# 11. Read the parquet formatted data.

parquetFormattedDataFrame = currentSparkSession.read.parquet('p_home_sales')

In [17]:
# 12. Create a temporary table for the parquet data.

parquetFormattedDataFrame.createOrReplaceTempView('parquet_home_sales')

In [18]:
# 13. Run the query that filters out the view ratings with average price
# of greater than or equal to $350,000 with the parquet DataFrame. Round
# your average to two decimal places. Determine the runtime and compare
# it to the cached version.
startTimeFloat = time.time()

queryString \
  = """SELECT view, ROUND(AVG(price), 2) AS average_price
       FROM parquet_home_sales
       GROUP BY view
       HAVING AVG(price) >= 350000
       ORDER BY view DESC"""

currentSparkSession.sql(queryString).show()

print("--- %s seconds ---" % (time.time() - startTimeFloat))

+----+-------------+
|view|average_price|
+----+-------------+
|  99|   1061201.42|
|  98|   1053739.33|
|  97|   1129040.15|
|  96|   1017815.92|
|  95|    1054325.6|
|  94|    1033536.2|
|  93|   1026006.06|
|  92|    970402.55|
|  91|   1137372.73|
|  90|   1062654.16|
|  89|   1107839.15|
|  88|   1031719.35|
|  87|    1072285.2|
|  86|   1070444.25|
|  85|   1056336.74|
|  84|   1117233.13|
|  83|   1033965.93|
|  82|    1063498.0|
|  81|   1053472.79|
|  80|    991767.38|
+----+-------------+
only showing top 20 rows

--- 1.6975457668304443 seconds ---


In [19]:
startTimeFloat = time.time()

filteredParquetHousesDataFrame \
    = parquetFormattedDataFrame.filter(parquetFormattedDataFrame.price >= 350000)

averagePricesDataFrame \
    = filteredParquetHousesDataFrame.groupBy('view') \
          .agg(round(avg('price'), 2) \
          .alias('average_price')) \
          .sort(filteredParquetHousesDataFrame.view.desc())

averagePricesDataFrame.show()

print("--- %s seconds ---" % (time.time() - startTimeFloat))

+----+-------------+
|view|average_price|
+----+-------------+
|  99|   1061201.42|
|  98|   1053739.33|
|  97|   1129040.15|
|  96|   1017815.92|
|  95|    1054325.6|
|  94|    1033536.2|
|  93|   1026006.06|
|  92|    970402.55|
|  91|   1137372.73|
|  90|   1062654.16|
|   9|    401393.34|
|  89|   1107839.15|
|  88|   1031719.35|
|  87|    1072285.2|
|  86|   1070444.25|
|  85|   1056336.74|
|  84|   1117233.13|
|  83|   1033965.93|
|  82|    1063498.0|
|  81|   1053472.79|
+----+-------------+
only showing top 20 rows

--- 1.8456358909606934 seconds ---


In [24]:
print('The uncached runtime, 1.123 seconds, is shorter than the runtime for the parquet data, 1.698 seconds')

The uncached runtime, 1.123 seconds, is shorter than the runtime for the parquet data, 1.698 seconds


In [21]:
# 14. Uncache the home_sales temporary table.

currentSparkSession.sql('uncache table home_sales')

DataFrame[]

In [22]:
# 15. Check if the home_sales is no longer cached.

if currentSparkSession.catalog.isCached('home_sales'):

  print('The table, home_sales, is cached.')

else:

  print('The table, home_sales, is NOT cached.')

The table, home_sales, is NOT cached.
