In [2]:
import os
# Using the correct spark version

spark_version = 'spark-3.5.1'
os.environ['SPARK_VERSION']=spark_version

# Installs Spark and Java

!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Sets Environment Variables

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Starts a SparkSession

import findspark
findspark.init()

0% [Working]            Hit:1 http://security.ubuntu.com/ubuntu jammy-security InRelease
0% [Connecting to archive.ubuntu.com] [Connected to cloud.r-project.org (52.85.151.8)] [Connecting t                                                                                                    Hit:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to ppa.launchpadcontent.net (185.125.190.80)] [Wai                                                                                                    Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy In

In [3]:
# Imports packages

from pyspark.sql import SparkSession
import time

# Creates a SparkSession

spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [14]:
# Reads in the AWS S3 bucket into a DataFrame

from pyspark import SparkFiles

url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"

spark.sparkContext.addFile(url)

df = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), header = True)

In [21]:
# Creates a temporary view of the DataFrame

df.createOrReplaceTempView('home_sales')

In [30]:
# Average price for a four bedroom house sold per year, rounded to two decimal places

spark.sql("""
          SELECT EXTRACT(year FROM date), ROUND(AVG(price),2) FROM home_sales WHERE bedrooms == 4 GROUP BY EXTRACT(year FROM date)
          ORDER BY EXTRACT(year FROM date)
          """).show()


+-----------------------+--------------------+
|extract(year FROM date)|round(avg(price), 2)|
+-----------------------+--------------------+
|                   2019|            300263.7|
|                   2020|           298353.78|
|                   2021|           301819.44|
|                   2022|           296363.88|
+-----------------------+--------------------+



In [33]:
# Average price of a home per year the home was built that have 3 bedrooms and 3 bathrooms, rounded to two decimal places

spark.sql("""
          SELECT EXTRACT(year FROM date), ROUND(AVG(price),2) FROM home_sales WHERE bedrooms == 3 AND bathrooms == 3 GROUP BY EXTRACT(year FROM date)
          ORDER BY EXTRACT(year FROM date)
          """).show()



+-----------------------+--------------------+
|extract(year FROM date)|round(avg(price), 2)|
+-----------------------+--------------------+
|                   2019|           287287.82|
|                   2020|           294204.16|
|                   2021|           294211.46|
|                   2022|           292725.69|
+-----------------------+--------------------+



In [35]:
# Average price of a home for each year the home was built, that have 3 bedrooms/bathrooms, with two floors,and are >=2,000 square feet

spark.sql("""
          SELECT EXTRACT(year FROM date), ROUND(AVG(price),2) FROM home_sales
          WHERE bedrooms == 3 AND bathrooms == 3 AND floors = 2 AND sqft_living >= 2000
          GROUP BY EXTRACT(year FROM date)
          ORDER BY EXTRACT(year FROM date)
          """).show()




+-----------------------+--------------------+
|extract(year FROM date)|round(avg(price), 2)|
+-----------------------+--------------------+
|                   2019|           289859.14|
|                   2020|           292289.09|
|                   2021|           296330.96|
|                   2022|           290242.99|
+-----------------------+--------------------+



In [42]:
# Average price of a home per "view" rating, having an average home price >= $350,000 and ordered by descending view rating, and its run time

start_time = time.time()

spark.sql("""
          SELECT view, ROUND(AVG(price),2) FROM home_sales
          GROUP BY view
          HAVING AVG(price) >= 350000
          ORDER BY view DESC
          """).show()



print("--- %s seconds ---" % (time.time() - start_time))

+----+--------------------+
|view|round(avg(price), 2)|
+----+--------------------+
|  99|          1061201.42|
|  98|          1053739.33|
|  97|          1129040.15|
|  96|          1017815.92|
|  95|           1054325.6|
|  94|           1033536.2|
|  93|          1026006.06|
|  92|           970402.55|
|  91|          1137372.73|
|  90|          1062654.16|
|  89|          1107839.15|
|  88|          1031719.35|
|  87|           1072285.2|
|  86|          1070444.25|
|  85|          1056336.74|
|  84|          1117233.13|
|  83|          1033965.93|
|  82|           1063498.0|
|  81|          1053472.79|
|  80|           991767.38|
+----+--------------------+
only showing top 20 rows

--- 1.1588048934936523 seconds ---


In [43]:
# Caches the the temporary table home_sales

spark.sql("CACHE table home_sales")


DataFrame[]

In [44]:
# Checks if the table is cached

spark.catalog.isCached('home_sales')

True

In [45]:
# Stores inital time

start_time = time.time()


# Runs the last query above using cached data that calculates the average price of a home per "view" rating having an average home price >= $350,000

spark.sql("""
          SELECT view, ROUND(AVG(price),2) FROM home_sales
          GROUP BY view
          HAVING AVG(price) >= 350000
          ORDER BY view DESC
          """).show()

# Calculates the runtime

print("--- %s seconds ---" % (time.time() - start_time))


+----+--------------------+
|view|round(avg(price), 2)|
+----+--------------------+
|  99|          1061201.42|
|  98|          1053739.33|
|  97|          1129040.15|
|  96|          1017815.92|
|  95|           1054325.6|
|  94|           1033536.2|
|  93|          1026006.06|
|  92|           970402.55|
|  91|          1137372.73|
|  90|          1062654.16|
|  89|          1107839.15|
|  88|          1031719.35|
|  87|           1072285.2|
|  86|          1070444.25|
|  85|          1056336.74|
|  84|          1117233.13|
|  83|          1033965.93|
|  82|           1063498.0|
|  81|          1053472.79|
|  80|           991767.38|
+----+--------------------+
only showing top 20 rows

--- 0.6465854644775391 seconds ---


In [46]:
# Partitions by the "date_built" field on the formatted parquet home sales data

df.write.partitionBy("date_built").parquet("home_sales_partition")


In [51]:
# Reads the parquet formatted data

parquet_df = spark.read.parquet('home_sales_partition')


In [52]:
# Creates a temporary table for the parquet data

parquet_df.createOrReplaceTempView('parquet_table')


In [53]:
# Stores initial time

start_time = time.time()


# Uses the parquet DataFrame to run the previous query(average price of a home per "view" rating having an average home pricer >= $350,000)

spark.sql("""
          SELECT view, ROUND(AVG(price),2) FROM parquet_table
          GROUP BY view
          HAVING AVG(price) >= 350000
          ORDER BY view DESC
          """).show()


# Calculates total time

print("--- %s seconds ---" % (time.time() - start_time))

+----+--------------------+
|view|round(avg(price), 2)|
+----+--------------------+
|  99|          1061201.42|
|  98|          1053739.33|
|  97|          1129040.15|
|  96|          1017815.92|
|  95|           1054325.6|
|  94|           1033536.2|
|  93|          1026006.06|
|  92|           970402.55|
|  91|          1137372.73|
|  90|          1062654.16|
|  89|          1107839.15|
|  88|          1031719.35|
|  87|           1072285.2|
|  86|          1070444.25|
|  85|          1056336.74|
|  84|          1117233.13|
|  83|          1033965.93|
|  82|           1063498.0|
|  81|          1053472.79|
|  80|           991767.38|
+----+--------------------+
only showing top 20 rows

--- 0.9401202201843262 seconds ---


In [54]:
# Uncaches the home_sales temporary table

spark.sql("UNCACHE table home_sales")




DataFrame[]

In [55]:
# Checks if the home_sales is no longer cached

spark.catalog.isCached('home_sales')

False