In [1]:
# Activate Spark in our Colab notebook.
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
spark_version = "spark-3.3.1"
# spark_version = 'spark-3.<enter version>'
os.environ["SPARK_VERSION"]=spark_version
# Verify the name of the hadoop2 TGZ file into the spark version folder, eg: spark-3.3.1-bin-hadoop2.tgz
hadoop_version = "hadoop2"
os.environ["HADOOP_VERSION"] = "hadoop2"

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-$HADOOP_VERSION.tgz
!tar xf $SPARK_VERSION-bin-$HADOOP_VERSION.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-{hadoop_version}"

# Start a SparkSession
import findspark
findspark.init()

'apt-get' is not recognized as an internal or external command,
operable program or batch file.
The system cannot find the path specified.
'wget' is not recognized as an internal or external command,
operable program or batch file.
tar: Error opening archive: Failed to open '$SPARK_VERSION-bin-$HADOOP_VERSION.tgz'


Exception: Unable to find py4j in /content/spark-3.3.1-bin-hadoop2\python, your SPARK_HOME may not be configured correctly

In [2]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [3]:
# 1. Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.1.2/22-big-data/home_sales_revised.csv"



In [4]:
# 2. Create a temporary view of the DataFrame.

df.createOrReplaceTempView('sales')

In [5]:
# 3. What is the average price for a four bedroom house sold in each year rounded to two decimal places?
spark.sql("SELECT year, round(avg(price),2) as `Average 4-bedroom Sale Price` FROM sales WHERE bedrooms=4 GROUP BY year").show()


In [6]:
# 4. What is the average price of a home for each year the home was built that have 3 bedrooms and 3 bathrooms rounded to two decimal places?
spark.sql("SELECT date_built as Year, round(avg(price),2) as `Average 3-bed, 3-bath Sale Price` FROM sales WHERE bedrooms=3 and bathrooms=3 GROUP BY date_built").show()


In [7]:
# 5. What is the average price of a home for each year built that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet rounded to two decimal places?

spark.sql("SELECT date_built as Year, round(avg(price),2) as `Average Sale Price` FROM sales WHERE bedrooms=3 and bathrooms=3 and floors=2 and sqft_living>2000 GROUP BY date_built").show()


In [8]:
# 6. What is the "view" rating for the average price of a home, rounded to two decimal places, where the homes are greater than
# or equal to $350,000? Although this is a small dataset, determine the run time for this query.

start_time = time.time()

spark.sql("SELECT round(avg(view),2) as `Average View Rating`, round(avg(price),2) as `Average Sale Price` FROM sales WHERE price>=350000 GROUP BY date_built").show()

print("--- %s seconds ---" % (time.time() - start_time))

--- 7.200241088867188e-05 seconds ---


In [9]:
# 7. Cache the the temporary table home_sales.
spark.sql("cache table sales")

In [10]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')

In [11]:
# 9. Using the cached data, run the query that filters out the view ratings with average price of greater than $350,000. 
# Determine the runtime and compare it to uncached runtime.

start_time = time.time()



print("--- %s seconds ---" % (time.time() - start_time))


--- 5.459785461425781e-05 seconds ---


In [12]:
# 10. Partition the home sales dataset by the date_built field.
df.write.partitionBy("date_built").mode("overwrite").parquet("sales_partitioned")

In [13]:
# 11. Read the formatted parquet data.
p_df=spark.read.parquet('sales_partitioned')

In [14]:
# 12. Create a temporary table for the parquet data.
p_df.createOrReplaceTempView('sales_partitioned')

In [15]:
# 13. Run the query that filters out the view ratings with average price of greater than $350,000 with the parquet DataFrame. 
# Round your average to two decimal places. 
# Determine the runtime and compare it to the cached version. 

start_time = time.time()



print("--- %s seconds ---" % (time.time() - start_time))

--- 7.104873657226562e-05 seconds ---


In [16]:
# 14. Uncache the home_sales temporary table.
spark.sql("uncache table sales")

In [17]:
# 15. Check if the home_sales is no longer cached

if spark.catalog.isCached("sales"):
  print("a table is till cached")
else:
  print("all clear")
