In [1]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.4.0'
spark_version = 'spark-3.4.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease
0% [Connecting to archive.ubuntu.com (91.189.91.39)] [Connecting to security.ub                                                                               Hit:2 http://ppa.launchpad.net/cran/libgit2/ubuntu focal InRelease
                                                                               Hit:3 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu focal InRelease
                                                                               Hit:4 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu focal InRelease
                                                                               Hit:5 http://ppa.launchpad.net/ubuntugis/ppa/ubuntu focal InRelease
                                                                               Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
Get:7 https://cloud.r-project.org/bin/linu

In [2]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [3]:
# 1. Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.1.2/22-big-data/home_sales_revised.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), header=True, inferSchema=True)


In [4]:
df.show(5)

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
+--------------------+----------+----------+------+--------+---------+----------

In [5]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- date: date (nullable = true)
 |-- date_built: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: integer (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: integer (nullable = true)
 |-- floors: integer (nullable = true)
 |-- waterfront: integer (nullable = true)
 |-- view: integer (nullable = true)



In [23]:
# 2. Create a temporary view of the DataFrame.
df.createOrReplaceTempView('home_sales')


In [None]:
help(round)

Help on built-in function round in module builtins:

round(number, ndigits=None)
    Round a number to a given precision in decimal digits.
    
    The return value is an integer if ndigits is omitted or None.  Otherwise
    the return value has the same type as the number.  ndigits may be negative.



In [24]:
# 3. What is the average price for a four bedroom house sold in each year rounded to two decimal places?
from pyspark.sql.functions import avg
df.filter("bedrooms==4").select(avg("price")).show()




+-----------------+
|       avg(price)|
+-----------------+
|299661.0056283447|
+-----------------+



In [25]:
# 4. What is the average price of a home for each year the home was built that have 3 bedrooms and 3 bathrooms rounded to two decimal places?
df2=df.filter((df.bedrooms==3) & (df.bathrooms==3)).groupBy(df.date_built)
df2.avg('price').show()



+----------+------------------+
|date_built|        avg(price)|
+----------+------------------+
|      2015| 288770.2966101695|
|      2013|295962.27145085804|
|      2014| 290852.2661870504|
|      2012| 293683.1872074883|
|      2016|  290555.073964497|
|      2010|  292859.615942029|
|      2011|291117.46706586826|
|      2017| 292676.7887740029|
+----------+------------------+



In [26]:
# 5. What is the average price of a home for each year built that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet rounded to two decimal places?
df3 = df.filter((df.floors==2) & (df.sqft_living >= 2000) & (df.bedrooms==3) & (df.bathrooms==3)).groupBy(df.date_built)
df3.avg('price').show()


+----------+------------------+
|date_built|        avg(price)|
+----------+------------------+
|      2015| 297609.9679144385|
|      2013|      303676.79375|
|      2014| 298264.7183908046|
|      2012|307539.97402597405|
|      2016| 293965.1046511628|
|      2010| 285010.2215909091|
|      2011| 276553.8128654971|
|      2017|280317.57692307694|
+----------+------------------+



In [48]:
# 6. What is the "view" rating for the average price of a home, rounded to two decimal places, where the homes are greater than
# or equal to $350,000? Although this is a small dataset, determine the run time for this query.

df.createOrReplaceTempView('houses')

start_time = time.time()

spark.sql("""SELECT view, AVG(price) FROM houses WHERE price >= 350000 GROUP BY 1""").show()


print("--- %s seconds ---" % (time.time() - start_time))

+----+------------------+
|view|        avg(price)|
+----+------------------+
|  31|399856.95135135134|
|  85|1056336.7435897435|
|  65| 736679.9333333333|
|  53|          755214.8|
|  78|1080649.3666666667|
|  34| 401419.7451923077|
|  81|1053472.7878787878|
|  28|402124.62176165805|
|  76|1058802.7777777778|
|  26|401506.96774193546|
|  27| 399537.6586826347|
|  44|400598.04761904763|
|  12| 401501.3243243243|
|  91| 1137372.731707317|
|  22| 402022.6847826087|
|  93|1026006.0606060605|
|  47| 398447.4976744186|
|   1| 401044.2513368984|
|  52| 733780.2608695652|
|  13|         398917.98|
+----+------------------+
only showing top 20 rows

--- 0.7880640029907227 seconds ---


In [None]:
# 6. What is the "view" rating for the average price of a home, rounded to two decimal places, where the homes are greater than
# or equal to $350,000? Although this is a small dataset, determine the run time for this query.


start_time = time.time()


print("--- %s seconds ---" % (time.time() - start_time))

--- 7.200241088867188e-05 seconds ---


In [28]:
# 7. Cache the the temporary table home_sales.
spark.sql("cache table home_sales")

DataFrame[]

In [29]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')

True

In [49]:
# 9. Using the cached data, run the query that filters out the view ratings with average price of greater than $350,000. 
# Determine the runtime and compare it to uncached runtime.

start_time = time.time()

spark.sql("""SELECT view, AVG(price) FROM houses WHERE price >= 350000 GROUP BY 1""").show()

print("--- %s seconds ---" % (time.time() - start_time))


+----+------------------+
|view|        avg(price)|
+----+------------------+
|  31|399856.95135135134|
|  85|1056336.7435897435|
|  65| 736679.9333333333|
|  53|          755214.8|
|  78|1080649.3666666667|
|  34| 401419.7451923077|
|  81|1053472.7878787878|
|  28|402124.62176165805|
|  76|1058802.7777777778|
|  26|401506.96774193546|
|  27| 399537.6586826347|
|  44|400598.04761904763|
|  12| 401501.3243243243|
|  91| 1137372.731707317|
|  22| 402022.6847826087|
|  93|1026006.0606060605|
|  47| 398447.4976744186|
|   1| 401044.2513368984|
|  52| 733780.2608695652|
|  13|         398917.98|
+----+------------------+
only showing top 20 rows

--- 0.6079213619232178 seconds ---


In [None]:
# 9. Using the cached data, run the query that filters out the view ratings with average price of greater than $350,000. 
# Determine the runtime and compare it to uncached runtime.

start_time = time.time()


print("--- %s seconds ---" % (time.time() - start_time))


--- 5.459785461425781e-05 seconds ---


In [30]:
# 10. Partition the home sales dataset by the date_built field.
df.write.partitionBy("date_built").mode("overwrite").parquet("houses_partitioned")

In [31]:
# 11. Read the formatted parquet data.
h_df_p=spark.read.parquet('houses_partitioned')

In [32]:
# 12. Create a temporary table for the parquet data.
h_df_p.createOrReplaceTempView('houses_date_built')

In [None]:
# 13. Run the query that filters out the view ratings with average price of greater than $350,000 with the parquet DataFrame. 
# Round your average to two decimal places. 
# Determine the runtime and compare it to the cached version. 

start_time = time.time()

spark.sql("""SELECT view, AVG(price) FROM houses WHERE price >= 350000 GROUP BY 1""").show()

print("--- %s seconds ---" % (time.time() - start_time))

--- 7.104873657226562e-05 seconds ---


In [33]:
# 14. Uncache the home_sales temporary table.
spark.sql("uncache table home_sales")

DataFrame[]

In [34]:
# 15. Check if the home_sales is no longer cached
spark.catalog.isCached("home_sales")


False