In [1]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.5.3'
spark_version = 'spark-3.5.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

'apt-get' is not recognized as an internal or external command,
operable program or batch file.
The system cannot find the path specified.
'wget' is not recognized as an internal or external command,
operable program or batch file.
tar: Error opening archive: Failed to open '$SPARK_VERSION-bin-hadoop3.tgz'


Exception: Unable to find py4j in /content/spark-3.5.3-bin-hadoop3\python, your SPARK_HOME may not be configured correctly

In [None]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [None]:
# 1. Read the home_sales_revised.csv from the provided AWS S3 bucket location into a PySpark DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), header=True, inferSchema=True)

In [None]:
# 2. Create a temporary view of the DataFrame.
df.createOrReplaceTempView("home_sales")

In [None]:
# 3. What is the average price for a four bedroom house sold per year, rounded to two decimal places?
query = """
SELECT 
    YEAR(sold_date) AS year,
    ROUND(AVG(price), 2) AS avg_price
FROM home_sales
WHERE bedrooms = 4
GROUP BY year
ORDER BY year;
"""

# Execute the query
result = spark.sql(query)

# Show results
result.show()

In [None]:
# 4. What is the average price of a home for each year the home was built,
# that have 3 bedrooms and 3 bathrooms, rounded to two decimal places?
query = """
SELECT 
    YEAR(build_date) AS year_built,
    ROUND(AVG(price), 2) AS avg_price
FROM home_sales
WHERE bedrooms = 3 
    AND bathrooms = 3
GROUP BY year_built
ORDER BY year_built;
"""

# Execute the query
result = spark.sql(query)

# Show results
result.show()

In [None]:
# 5. What is the average price of a home for each year the home was built,
# that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet, rounded to two decimal places?
query = """
SELECT 
    YEAR(build_date) AS year_built,
    ROUND(AVG(price), 2) AS avg_price
FROM home_sales
WHERE bedrooms = 3 
    AND bathrooms = 3
    AND floors = 2
    AND sqft_living >= 2000
GROUP BY year_built
ORDER BY year_built;
"""


result = spark.sql(query)

# Show results
result.show()

In [None]:
# 6. What is the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000? Order by descending view rating.
# Although this is a small dataset, determine the run time for this query.
import time
start_time = time.time()
query = """
SELECT 
    view,
    ROUND(AVG(price), 2) AS avg_price
FROM home_sales
GROUP BY view
HAVING AVG(price) >= 350000
ORDER BY view DESC;
"""


result = spark.sql(query)


result.show()



print("--- %s seconds ---" % (time.time() - start_time))

--- 7.200241088867188e-05 seconds ---


In [None]:
# 7. Cache the the temporary table home_sales.
spark.sql("CACHE TABLE home_sales")

In [None]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')

In [None]:
# 9. Using the cached data, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the uncached runtime.

start_time = time.time()
query = """
SELECT 
    view,
    ROUND(AVG(price), 2) AS avg_price
FROM home_sales
GROUP BY view
HAVING AVG(price) >= 350000
ORDER BY view DESC;
"""

result = spark.sql(query)


result.show()

print("--- %s seconds ---" % (time.time() - start_time))


--- 5.459785461425781e-05 seconds ---


In [None]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data
df.write.partitionBy("date_built").parquet("path_to_output/home_sales_partitioned.parquet")

In [None]:
# 11. Read the parquet formatted data.
df_parquet = spark.read.parquet("path_to_output/home_sales_partitioned.parquet")

df_parquet.show(5)

In [None]:
# 12. Create a temporary table for the parquet data.
df_parquet.createOrReplaceTempView("home_sales_parquet")

In [None]:
# 13. Using the parquet DataFrame, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the cached runtime.

start_time = time.time()
query = """
SELECT 
    view,
    ROUND(AVG(price), 2) AS avg_price
FROM home_sales_parquet
GROUP BY view
HAVING AVG(price) >= 350000
ORDER BY view DESC;
"""

result_parquet = spark.sql(query)

result_parquet.show()

print("--- %s seconds ---" % (time.time() - start_time))

--- 7.104873657226562e-05 seconds ---


In [None]:
# 14. Uncache the home_sales temporary table.
spark.catalog.uncacheTable("home_sales")

In [None]:
# 15. Check if the home_sales is no longer cached
cached_status = spark.catalog.isCached("home_sales")
print(f"Is 'home_sales' cached? {cached_status}")