In [7]:
import os
import subprocess

# Set Spark version
spark_version = 'spark-3.4.0'
os.environ['SPARK_VERSION'] = spark_version

# Download Spark and Java
spark_url = f"https://archive.apache.org/dist/spark/{spark_version}/{spark_version}-bin-hadoop3.tgz"
spark_tar = f"{spark_version}-bin-hadoop3.tgz"

# Download and extract Spark
if not os.path.exists(spark_tar):
    subprocess.run(["curl", "-O", spark_url])
if not os.path.exists(f"{spark_version}-bin-hadoop3"):
    subprocess.run(["tar", "xzf", spark_tar])

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/libexec/java_home"
os.environ["SPARK_HOME"] = os.path.join(os.getcwd(), f"{spark_version}-bin-hadoop3")

# Install findspark
subprocess.run(["pip", "install", "findspark"], check=True)

# Start a SparkSession
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("Spark Setup Test") \
    .getOrCreate()

print("SparkSession started successfully!")



SparkSession started successfully!


In [8]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

24/12/09 20:11:11 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [9]:
# 1. Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"



In [11]:
# Load the CSV file into a DataFrame
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), header=True, inferSchema=True)

# Show the first few rows to confirm successful load
df.show(5)


                                                                                

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
+--------------------+----------+----------+------+--------+---------+----------

In [14]:
# Create a temporary view of the DataFrame
df.createOrReplaceTempView("home_sales")

In [15]:
# SQL query to calculate the average price for four-bedroom houses per year
avg_price_query = """
SELECT 
    YEAR(date) AS year, 
    ROUND(AVG(price), 2) AS avg_price
FROM 
    home_sales
WHERE 
    bedrooms = 4
GROUP BY 
    YEAR(date)
ORDER BY 
    year
"""
# Execute the query
avg_price_result = spark.sql(avg_price_query)

# Show the results
avg_price_result.show()


[Stage 10:>                                                         (0 + 1) / 1]

+----+---------+
|year|avg_price|
+----+---------+
|2019| 300263.7|
|2020|298353.78|
|2021|301819.44|
|2022|296363.88|
+----+---------+



                                                                                

In [18]:
df.printSchema()


root
 |-- id: string (nullable = true)
 |-- date: date (nullable = true)
 |-- date_built: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: integer (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: integer (nullable = true)
 |-- floors: integer (nullable = true)
 |-- waterfront: integer (nullable = true)
 |-- view: integer (nullable = true)



In [19]:
spark.sql("SHOW TABLES").show()


+---------+----------+-----------+
|namespace| tableName|isTemporary|
+---------+----------+-----------+
|         |home_sales|       true|
+---------+----------+-----------+



In [20]:
avg_price_built_year_query = """
SELECT 
    your_column_for_year_built AS year_built, 
    ROUND(AVG(your_column_for_price), 2) AS avg_price
FROM 
    home_sales
WHERE 
    your_column_for_bedrooms = 3 AND your_column_for_bathrooms = 3
GROUP BY 
    your_column_for_year_built
ORDER BY 
    your_column_for_year_built
"""



In [22]:
# SQL query to calculate the average price of homes for each year they were built,
# filtered for 3 bedrooms, 3 bathrooms, two floors, and size >= 2,000 square feet
avg_price_built_year_query = """
SELECT 
    date_built AS year_built, 
    ROUND(AVG(price), 2) AS avg_price
FROM 
    home_sales
WHERE 
    bedrooms = 3 
    AND bathrooms = 3 
    AND floors = 2 
    AND sqft_living >= 2000
GROUP BY 
    date_built
ORDER BY 
    date_built
"""

# Execute the query
avg_price_built_year_result = spark.sql(avg_price_built_year_query)

# Show the results
avg_price_built_year_result.show()





+----------+---------+
|year_built|avg_price|
+----------+---------+
|      2010|285010.22|
|      2011|276553.81|
|      2012|307539.97|
|      2013|303676.79|
|      2014|298264.72|
|      2015|297609.97|
|      2016| 293965.1|
|      2017|280317.58|
+----------+---------+



In [23]:
import time

# Start the timer
start_time = time.time()

# SQL query to calculate the average price of homes per "view" rating,
# filtered for average price >= $350,000, rounded to two decimal places
avg_price_view_query = """
SELECT 
    view, 
    ROUND(AVG(price), 2) AS avg_price
FROM 
    home_sales
GROUP BY 
    view
HAVING 
    AVG(price) >= 350000
ORDER BY 
    view DESC
"""

# Execute the query
avg_price_view_result = spark.sql(avg_price_view_query)

# Show the results
avg_price_view_result.show()

# Stop the timer and display the runtime
print("--- %s seconds ---" % (time.time() - start_time))


+----+----------+
|view| avg_price|
+----+----------+
| 100| 1026669.5|
|  99|1061201.42|
|  98|1053739.33|
|  97|1129040.15|
|  96|1017815.92|
|  95| 1054325.6|
|  94| 1033536.2|
|  93|1026006.06|
|  92| 970402.55|
|  91|1137372.73|
|  90|1062654.16|
|  89|1107839.15|
|  88|1031719.35|
|  87| 1072285.2|
|  86|1070444.25|
|  85|1056336.74|
|  84|1117233.13|
|  83|1033965.93|
|  82| 1063498.0|
|  81|1053472.79|
+----+----------+
only showing top 20 rows

--- 0.8571460247039795 seconds ---


In [24]:
# Cache the temporary table home_sales
spark.sql("CACHE TABLE home_sales")

# Verify caching by running a query
spark.sql("SELECT COUNT(*) FROM home_sales").show()



[Stage 20:>                                                         (0 + 1) / 1]

+--------+
|count(1)|
+--------+
|   33287|
+--------+



                                                                                

In [25]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')

True

In [26]:
# 9. Using the cached data, run the last query above, that calculates 
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000. 
# Determine the runtime and compare it to the uncached runtime.

start_time = time.time()



print("--- %s seconds ---" % (time.time() - start_time))


--- 6.4849853515625e-05 seconds ---


In [27]:
# Write the home sales data to Parquet format, partitioned by the "date_built" field
output_path = "home_sales_partitioned"  # Specify your desired output directory
df.write.partitionBy("date_built").parquet(output_path)

print(f"Data has been partitioned by 'date_built' and saved to {output_path}")


[Stage 26:>                                                         (0 + 1) / 1]

Data has been partitioned by 'date_built' and saved to home_sales_partitioned


                                                                                

In [28]:
# Path to the Parquet-formatted data
parquet_path = "home_sales_partitioned"  # Replace with your actual Parquet directory path

# Read the Parquet data into a DataFrame
parquet_df = spark.read.parquet(parquet_path)

# Show the first few rows to verify successful read
parquet_df.show(5)


+--------------------+----------+------+--------+---------+-----------+--------+------+----------+----+----------+
|                  id|      date| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|date_built|
+--------------------+----------+------+--------+---------+-----------+--------+------+----------+----+----------+
|2ed8d509-7372-46d...|2021-08-06|258710|       3|        3|       1918|    9666|     1|         0|  25|      2015|
|941bad30-eb49-4a7...|2020-05-09|229896|       3|        3|       2197|    8641|     1|         0|   3|      2015|
|c797ca12-52cd-4b1...|2019-06-08|288650|       2|        3|       2100|   10419|     2|         0|   7|      2015|
|0cfe57f3-28c2-472...|2019-10-04|308313|       3|        3|       1960|    9453|     2|         0|   2|      2015|
|d715f295-2fbf-4e9...|2021-05-17|391574|       3|        2|       1635|    8040|     2|         0|  10|      2015|
+--------------------+----------+------+--------+---------+-----------+--------+

In [29]:
# Create a temporary view for the Parquet DataFrame
parquet_df.createOrReplaceTempView("home_sales_parquet")

# Verify the temporary table by running a query
spark.sql("SELECT * FROM home_sales_parquet LIMIT 5").show()

+--------------------+----------+------+--------+---------+-----------+--------+------+----------+----+----------+
|                  id|      date| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|date_built|
+--------------------+----------+------+--------+---------+-----------+--------+------+----------+----+----------+
|2ed8d509-7372-46d...|2021-08-06|258710|       3|        3|       1918|    9666|     1|         0|  25|      2015|
|941bad30-eb49-4a7...|2020-05-09|229896|       3|        3|       2197|    8641|     1|         0|   3|      2015|
|c797ca12-52cd-4b1...|2019-06-08|288650|       2|        3|       2100|   10419|     2|         0|   7|      2015|
|0cfe57f3-28c2-472...|2019-10-04|308313|       3|        3|       1960|    9453|     2|         0|   2|      2015|
|d715f295-2fbf-4e9...|2021-05-17|391574|       3|        2|       1635|    8040|     2|         0|  10|      2015|
+--------------------+----------+------+--------+---------+-----------+--------+

In [30]:
# 13. Using the parquet DataFrame, run the last query above, that calculates 
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000. 
# Determine the runtime and compare it to the cached runtime.

start_time = time.time()



print("--- %s seconds ---" % (time.time() - start_time))

--- 8.416175842285156e-05 seconds ---


In [31]:
# Uncache the temporary table home_sales
spark.sql("UNCACHE TABLE home_sales")

# Confirm the table is uncached
print("The home_sales table has been uncached.")



The home_sales table has been uncached.


In [32]:
# Check if the home_sales table is cached
is_cached = spark.catalog.isCached("home_sales")

# Print the result
if is_cached:
    print("The home_sales table is still cached.")
else:
    print("The home_sales table is no longer cached.")



The home_sales table is no longer cached.
