<a href="https://colab.research.google.com/github/njgeorge000158/Home-Sales-Analysis-with-Google-Colab-and-Apache-Spark/blob/main/home_sales_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [26]:
#*******************************************************************************************
 #
 #  File Name:  home_sales_colab.ipynb
 #
 #  File Description:
 #      This Google Colab notebook, home_sales_colab.ipynb, uses PySpark and SparkSQL
 #      to determine key metrics about home sales data.
 #
 #
 #  Date            Description                             Programmer
 #  ----------      ------------------------------------    ------------------
 #  11/25/2023      Initial Development                     Nicholas J. George
 #
 #******************************************************************************************/

from google.colab import drive
drive.mount('/content/gdrive/')

import sys
sys.path.insert(0,'./gdrive/MyDrive/home_sales_analysis')

import os
import time


spark_version_string = 'spark-3.5.1'

os.environ['SPARK_VERSION'] = spark_version_string


# These commands install Apache Spark and Java.
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark
!pip install -U dataframe_image
!pip install -U hvplot


# These lines of code set the environmental variables.
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-11-openjdk-amd64'

os.environ['SPARK_HOME'] = f'/content/{spark_version_string}-bin-hadoop3'


import logx

logx.set_logs_directory_path('./gdrive/MyDrive/home_sales_analysis/logs')

logx.set_images_directory_path('./gdrive/MyDrive/home_sales_analysis/images')

Drive already mounted at /content/gdrive/; to attempt to forcibly remount, call drive.mount("/content/gdrive/", force_remount=True).
Hit:1 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done


In [27]:
import findspark
findspark.init()

from pyspark import SparkFiles
from pyspark.sql import SparkSession
from pyspark.sql.functions import round, avg

# This line of code creates a Spark Session.
current_spark_session = SparkSession.builder.appName('SparkSQL').getOrCreate()

In [28]:
CONSTANT_LOCAL_FILE_NAME = 'home_sales_colab.ipynb'


logx.set_log_mode(False)

logx.set_image_mode(False)


logx.begin_program('home_sales_colab')

Program execution begins...



In [29]:
# These lines of code read in the AWS S3 bucket into a DataFrame.
url_string \
  = 'https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv'

current_spark_session.sparkContext.addFile(url_string)

home_sales_pyspark_dataframe \
    = current_spark_session.read.csv(SparkFiles.get('home_sales_revised.csv'), sep = ',', header = True)

home_sales_pyspark_dataframe.show()

logx.log_write_object(home_sales_pyspark_dataframe.toPandas())

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
|5aa00529-0533-46b...|2019-01-30|      2017|218712|       2|        3|       196

In [30]:
# This line of code prints the table schema.
home_sales_pyspark_dataframe.printSchema()

root
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- date_built: string (nullable = true)
 |-- price: string (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- bathrooms: string (nullable = true)
 |-- sqft_living: string (nullable = true)
 |-- sqft_lot: string (nullable = true)
 |-- floors: string (nullable = true)
 |-- waterfront: string (nullable = true)
 |-- view: string (nullable = true)



In [31]:
# This line of code prints the column names and data types.
home_sales_pyspark_dataframe.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 33287 entries, 0 to 33286
Data columns (total 11 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   id           33287 non-null  object
 1   date         33287 non-null  object
 2   date_built   33287 non-null  object
 3   price        33287 non-null  object
 4   bedrooms     33287 non-null  object
 5   bathrooms    33287 non-null  object
 6   sqft_living  33287 non-null  object
 7   sqft_lot     33287 non-null  object
 8   floors       33287 non-null  object
 9   waterfront   33287 non-null  object
 10  view         33287 non-null  object
dtypes: object(11)
memory usage: 2.8+ MB


In [32]:
# This line of code prints the column statistics.
home_sales_pyspark_dataframe.toPandas().describe()

Unnamed: 0,id,date,date_built,price,bedrooms,bathrooms,sqft_living,sqft_lot,floors,waterfront,view
count,33287,33287,33287,33287,33287,33287,33287,33287,33287,33287,33287
unique,33287,1261,8,31796,7,5,2143,6937,3,2,101
top,f8a53099-ba1c-47d6-9c31-7398aa8f6089,2020-08-16,2015,398982,3,3,2402,12788,2,0,41
freq,1,44,4248,4,11016,16230,49,17,16493,32533,667


In [33]:
# These lines of code create a temporary view of the dataframe.
home_sales_pyspark_dataframe.createOrReplaceTempView('home_sales')

current_spark_session.sql('SELECT * FROM home_sales').show()

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
|5aa00529-0533-46b...|2019-01-30|      2017|218712|       2|        3|       196

In [34]:
# These lines of code find and display the average price for a four bedroom house sold in each year
# rounded to two decimal places.
query_string \
  = """SELECT YEAR(date) AS year,
       ROUND(AVG(price), 2) AS average_price
       FROM home_sales
       WHERE bedrooms == 4
       GROUP BY year
       ORDER BY year DESC"""

current_spark_session.sql(query_string).show()

+----+-------------+
|year|average_price|
+----+-------------+
|2022|    296363.88|
|2021|    301819.44|
|2020|    298353.78|
|2019|     300263.7|
+----+-------------+



In [35]:
# These lines of code find and display the average price of a home for each year the home was built
# that have 3 bedrooms and 3 bathrooms rounded to two decimal places?
query_string \
  = """SELECT YEAR(date_built) AS year_built,
       ROUND(AVG(price), 2) AS average_price
       FROM home_sales
       WHERE bedrooms == 3
          AND bathrooms == 3
       GROUP BY year_built
       ORDER BY year_built DESC"""

current_spark_session.sql(query_string).show()

+----------+-------------+
|year_built|average_price|
+----------+-------------+
|      2017|    292676.79|
|      2016|    290555.07|
|      2015|     288770.3|
|      2014|    290852.27|
|      2013|    295962.27|
|      2012|    293683.19|
|      2011|    291117.47|
|      2010|    292859.62|
+----------+-------------+



In [36]:
# These lines of code find and display the average price of a home for each year built that has 3 bedrooms,
# 3 bathrooms, with two floors, and are greater than or equal to 2,000 square feet rounded to two decimal
# places?
query_string \
  = """SELECT YEAR(date_built) AS year_built,
       ROUND(AVG(price), 2) AS average_price
       FROM home_sales
       WHERE bedrooms == 3
          AND bathrooms == 3
          AND floors == 2
          AND sqft_living >= 2000
       GROUP BY year_built
       ORDER BY year_built DESC"""

current_spark_session.sql(query_string).show()

+----------+-------------+
|year_built|average_price|
+----------+-------------+
|      2017|    280317.58|
|      2016|     293965.1|
|      2015|    297609.97|
|      2014|    298264.72|
|      2013|    303676.79|
|      2012|    307539.97|
|      2011|    276553.81|
|      2010|    285010.22|
+----------+-------------+



In [37]:
# These lines of code find and display the "view" rating for the average price of a home, rounded to two
# decimal places, where the homes are greater than or equal to $350,000.
start_time_float = time.time()

query_string \
  = """SELECT view, ROUND(AVG(price), 2) AS average_price
       FROM home_sales
       GROUP BY view
       HAVING AVG(price) >= 350000
       ORDER BY view DESC"""

current_spark_session.sql(query_string).show()

logx.print_and_log_text("--- %s seconds ---" % (time.time() - start_time_float))

+----+-------------+
|view|average_price|
+----+-------------+
|  99|   1061201.42|
|  98|   1053739.33|
|  97|   1129040.15|
|  96|   1017815.92|
|  95|    1054325.6|
|  94|    1033536.2|
|  93|   1026006.06|
|  92|    970402.55|
|  91|   1137372.73|
|  90|   1062654.16|
|  89|   1107839.15|
|  88|   1031719.35|
|  87|    1072285.2|
|  86|   1070444.25|
|  85|   1056336.74|
|  84|   1117233.13|
|  83|   1033965.93|
|  82|    1063498.0|
|  81|   1053472.79|
|  80|    991767.38|
+----+-------------+
only showing top 20 rows

--- 0.6290180683135986 seconds ---


In [38]:
# This line of code caches the the temporary table, home_sales.
current_spark_session.sql('cache table home_sales')

DataFrame[]

In [39]:
# This line of code checks if the table is cached.
current_spark_session.catalog.isCached('home_sales')

True

In [40]:
# These lines of code, using the cached data, runs the query that filters out the view ratings with
# average price greater than or equal to $350,000.
start_time_float = time.time()

query_string \
  = """SELECT view, ROUND(AVG(price), 2) AS average_price
       FROM home_sales
       GROUP BY view
       HAVING AVG(price) >= 350000
       ORDER BY view DESC"""

current_spark_session.sql(query_string).show()

logx.print_and_log_text("--- %s seconds ---" % (time.time() - start_time_float))

+----+-------------+
|view|average_price|
+----+-------------+
|  99|   1061201.42|
|  98|   1053739.33|
|  97|   1129040.15|
|  96|   1017815.92|
|  95|    1054325.6|
|  94|    1033536.2|
|  93|   1026006.06|
|  92|    970402.55|
|  91|   1137372.73|
|  90|   1062654.16|
|  89|   1107839.15|
|  88|   1031719.35|
|  87|    1072285.2|
|  86|   1070444.25|
|  85|   1056336.74|
|  84|   1117233.13|
|  83|   1033965.93|
|  82|    1063498.0|
|  81|   1053472.79|
|  80|    991767.38|
+----+-------------+
only showing top 20 rows

--- 0.5580081939697266 seconds ---


In [41]:
logx.print_and_log_text('The cached runtime, 0.601 seconds, is shorter than the uncached runtime, 2.059 seconds')

The cached runtime, 0.601 seconds, is shorter than the uncached runtime, 2.059 seconds


In [42]:
# This line of code partitions the formatted parquet home sales data by the "date_built" field.
home_sales_pyspark_dataframe.write.partitionBy('date_built').mode('overwrite').parquet('p_home_sales')

In [43]:
# This line of code reads the parquet formatted data.
home_sales_parquet_pyspark_dataframe = current_spark_session.read.parquet('p_home_sales')

logx.log_write_object(home_sales_parquet_pyspark_dataframe.toPandas())

In [44]:
# This line of code creates a temporary table for the parquet data.
home_sales_parquet_pyspark_dataframe.createOrReplaceTempView('parquet_home_sales')

In [45]:
# These lines of code run the query that filters out the view ratings with average price of greater
# than or equal to $350,000 in the parquet dataframe and round the0 average to two decimal places.
start_time_float = time.time()

query_string \
  = """SELECT view, ROUND(AVG(price), 2) AS average_price
       FROM parquet_home_sales
       GROUP BY view
       HAVING AVG(price) >= 350000
       ORDER BY view DESC"""

current_spark_session.sql(query_string).show()

logx.print_and_log_text("--- %s seconds ---" % (time.time() - start_time_float))

+----+-------------+
|view|average_price|
+----+-------------+
|  99|   1061201.42|
|  98|   1053739.33|
|  97|   1129040.15|
|  96|   1017815.92|
|  95|    1054325.6|
|  94|    1033536.2|
|  93|   1026006.06|
|  92|    970402.55|
|  91|   1137372.73|
|  90|   1062654.16|
|  89|   1107839.15|
|  88|   1031719.35|
|  87|    1072285.2|
|  86|   1070444.25|
|  85|   1056336.74|
|  84|   1117233.13|
|  83|   1033965.93|
|  82|    1063498.0|
|  81|   1053472.79|
|  80|    991767.38|
+----+-------------+
only showing top 20 rows

--- 0.49675464630126953 seconds ---


In [46]:
start_time_float = time.time()

filtered_houses_parquet_pyspark_dataframe \
    = home_sales_parquet_pyspark_dataframe \
        .filter(home_sales_parquet_pyspark_dataframe.price >= 350000)

average_prices_dataframe \
    = filtered_houses_parquet_pyspark_dataframe.groupBy('view') \
          .agg(round(avg('price'), 2) \
          .alias('average_price')) \
          .sort(filtered_houses_parquet_pyspark_dataframe.view.desc())

average_prices_dataframe.show()

logx.print_and_log_text("--- %s seconds ---" % (time.time() - start_time_float))

logx.log_write_object(filtered_houses_parquet_pyspark_dataframe.toPandas())

+----+-------------+
|view|average_price|
+----+-------------+
|  99|   1061201.42|
|  98|   1053739.33|
|  97|   1129040.15|
|  96|   1017815.92|
|  95|    1054325.6|
|  94|    1033536.2|
|  93|   1026006.06|
|  92|    970402.55|
|  91|   1137372.73|
|  90|   1062654.16|
|   9|    401393.34|
|  89|   1107839.15|
|  88|   1031719.35|
|  87|    1072285.2|
|  86|   1070444.25|
|  85|   1056336.74|
|  84|   1117233.13|
|  83|   1033965.93|
|  82|    1063498.0|
|  81|   1053472.79|
+----+-------------+
only showing top 20 rows

--- 0.7498946189880371 seconds ---


In [47]:
logx.print_and_log_text('The uncached runtime, 2.059 seconds, is longer than the runtime for the parquet data, 0.929 seconds')

The uncached runtime, 2.059 seconds, is longer than the runtime for the parquet data, 0.929 seconds


In [48]:
# This line of code uncaches the home_sales temporary table.
current_spark_session.sql('uncache table home_sales')

DataFrame[]

In [49]:
# These lines of code check if the home_sales is no longer cached.
if current_spark_session.catalog.isCached('home_sales'):

  logx.print_and_log_text('The table, home_sales, is cached.')

else:

  logx.print_and_log_text('The table, home_sales, is NOT cached.')

The table, home_sales, is NOT cached.


In [50]:
# logx.end_program()