# Project 13 -- Ritvik Indupuri

**TA Help:** John Smith, Alice Jones

- Help with figuring out how to write a function.
    
**Collaboration:** Friend1, Friend2
    
- Helped figuring out how to load the dataset.
- Helped debug error with my plot.

## Question 1

In [8]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import time

# 1.1: Initialize a SparkSession and load the data
spark = SparkSession.builder.appName("TDM_S").config("spark.driver.memory", "2g").getOrCreate()

df = spark.read.parquet("/anvil/projects/tdm/data/whin/weather.parquet")


In [7]:
# 2.1 Show the first 5 rows
df.show(5)

+----------+--------+---------+---------------+--------------------+-----------+----------------+---------------+--------+---------------+--------------------+----+---------------------+--------------+----------------------+-------------------+---------------------------+--------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+
|station_id|latitude|longitude|           name|    observation_time|temperature|temperature_high|temperature_low|humidity|solar_radiation|solar_radiation_high|rain|rain_inches_last_hour|wind_speed_mph|wind_direction_degrees|wind_gust_speed_mph|wind_gust_direction_degrees|pressure|soil_temp_1|soil_temp_2|soil_temp_3|soil_temp_4|soil_moist_1|soil_moist_2|soil_moist_3|soil_moist_4|
+----------+--------+---------+---------------+--------------------+-----------+----------------+---------------+--------+---------------+--------------------+----+---------------------+--------------+----------------------+------------

The process began by creating a SparkSession, which serves as the main entry point for working with PySpark. I set it up using the builder pattern (SparkSession.builder...getOrCreate()), and configured the session to allocate 2GB of driver memory through the .config() method.

Once the session was ready, I loaded the dataset with spark.read.parquet(). At this stage, the resulting DataFrame (df) is a distributed structure—it doesn’t immediately pull all the data into memory like Pandas would. Instead, it represents a plan for how the data will be accessed and processed. 

The first actual computation happens when calling .show(5), since that command triggers Spark to execute the necessary operations and return the first five rows.




## Question 2

In [9]:
# Deliverable 2.1

df.printSchema()

root
 |-- station_id: long (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- name: string (nullable = true)
 |-- observation_time: string (nullable = true)
 |-- temperature: double (nullable = true)
 |-- temperature_high: double (nullable = true)
 |-- temperature_low: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- solar_radiation: double (nullable = true)
 |-- solar_radiation_high: double (nullable = true)
 |-- rain: double (nullable = true)
 |-- rain_inches_last_hour: double (nullable = true)
 |-- wind_speed_mph: double (nullable = true)
 |-- wind_direction_degrees: double (nullable = true)
 |-- wind_gust_speed_mph: double (nullable = true)
 |-- wind_gust_direction_degrees: double (nullable = true)
 |-- pressure: double (nullable = true)
 |-- soil_temp_1: double (nullable = true)
 |-- soil_temp_2: double (nullable = true)
 |-- soil_temp_3: double (nullable = true)
 |-- soil_temp_4: double (nullable = true)
 |

In [10]:
# Deliverable 2.2

start_time = time.time()

# Define the transformation
result_df = df.groupBy("name") \
              .agg(avg("humidity").alias("average_humidity"),
                   avg("solar_radiation").alias("average_solar_radiation"),
                   avg("pressure").alias("average_pressure")) \
              .filter((col("average_humidity") > 50) & (col("average_solar_radiation") > 100)) \
              .sort(desc("average_pressure"))

end_time = time.time()
print(f"Time taken to define operations: {end_time - start_time} seconds")

Time taken to define operations: 0.1461491584777832 seconds


In [11]:
# Deliverable 2.3

start_time = time.time()

# triggers the actual computation
result_df.show(5)

end_time = time.time()
print(f"Time taken to show first 5 rows: {end_time - start_time} seconds")



+----------------+-----------------+-----------------------+------------------+
|            name| average_humidity|average_solar_radiation|  average_pressure|
+----------------+-----------------+-----------------------+------------------+
|WHIN089E-BENT009|74.77029578351164|     257.34197608558844|30.216278791692908|
|      Bringhurst|80.17502311929556|     168.08889779559118|30.190432274525733|
| WHIN020-FOUN001|76.72776597425857|     146.12622262805766|30.159863727349002|
|        Idaville|77.37444567283207|     166.06921923967843|30.151303970200168|
|         Wolcott|67.00495091507085|     104.93433155080214| 30.11409279198035|
+----------------+-----------------+-----------------------+------------------+
only showing top 5 rows
Time taken to show first 5 rows: 1.8336148262023926 seconds


                                                                                

This question shows Spark’s lazy evaluation model. In Deliverable 2.2, when the result_df was defined, the operation completed almost instantly because Spark didn’t actually run the query. Instead, it constructed a Directed Acyclic Graph (DAG) that described the sequence of tasks to be performed later.

The actual computation occurred in step 2.3 with the .show(5) command. That action forced Spark to execute the plan, triggering the Catalyst optimizer to refine the instructions and then carry them out on the available resources (in this case, the local machine). Only at that point were the first five rows produced, which explains why the second timing measurement reflected a noticeably longer processing duration.



## Question 3

In [15]:
# Deliverable 3.1

df.createOrReplaceTempView("weather_view")

In [14]:
start_time = time.time()

# Deliverable 3.2
# Define the SQL query 
result_df_sql = spark.sql("""
    SELECT 
        name,
        AVG(humidity) AS average_humidity,
        AVG(solar_radiation) AS average_solar_radiation,
        AVG(pressure) AS average_pressure
    FROM weather_view
    GROUP BY name
    HAVING average_humidity > 50 AND average_solar_radiation > 100
    ORDER BY average_pressure DESC
""")

end_time = time.time()
print(f"Time taken to define SQL query: {end_time - start_time} seconds")

Time taken to define SQL query: 0.11014533042907715 seconds


In [16]:
# Deliverable 3.3

start_time = time.time()

# This 'action' triggers the computation
result_df_sql.show(5)

end_time = time.time()
print(f"Time taken to show first 5 rows (SQL): {end_time - start_time} seconds")

+----------------+-----------------+-----------------------+------------------+
|            name| average_humidity|average_solar_radiation|  average_pressure|
+----------------+-----------------+-----------------------+------------------+
|WHIN089E-BENT009|74.77029578351164|     257.34197608558844|30.216278791692908|
|      Bringhurst|80.17502311929556|     168.08889779559118|30.190432274525733|
| WHIN020-FOUN001|76.72776597425857|     146.12622262805766|30.159863727349002|
|        Idaville|77.37444567283207|     166.06921923967843|30.151303970200168|
|         Wolcott|67.00495091507085|     104.93433155080214| 30.11409279198035|
+----------------+-----------------+-----------------------+------------------+
only showing top 5 rows
Time taken to show first 5 rows (SQL): 0.840869665145874 seconds


                                                                                

In [None]:
#Deliverable 3.4

#Printing just the first few rows happens very quickly. On most systems it completes within a handful of seconds, often between one and five, though the exact time depends on how busy the machine is at that moment. The show() function is what actually kicks off the computation, since Spark has to run through the full set of operations—loading, grouping, filtering, and sorting—before it can display the sample output


In [None]:
# Deliverable 3.5

#In practice, the performance is nearly identical. Whether you write the query with the DataFrame API or with Spark SQL, Spark translates both into the same optimized execution plan through its Catalyst engine. That means the underlying work being done is the same, so you shouldn’t expect any noticeable difference in how long it takes to show the first five rows.


This question demonstrated that Spark SQL is fully integrated within PySpark. By creating a temporary view using createOrReplaceTempView, I was able to run a standard SQL query that replicated the same logic from Question 2. The key takeaway from Deliverable 3.5 was that the runtime was essentially identical to the DataFrame API approach. This confirms that Spark’s strength lies not in one interface over the other, but in the Catalyst optimizer, which translates both SQL queries and DataFrame operations into the same optimized execution plan. As a result, developers can choose whichever syntax they prefer without sacrificing performance


## Question 4

In [20]:
# Deliverable 4.1
newdf = df.withColumn("observation_time", to_timestamp("observation_time"))

In [19]:
#Deliverable 4.2

newdf = newdf.withColumn("year", year("observation_time"))

In [21]:
# Deliverable 4.3

newdf = newdf.withColumn("month", month("observation_time"))

In [23]:


# Deliverable 4.4: Group by month and aggregate
result_monthly_df = newdf.groupBy("month") \
                         .agg(avg("temperature").alias("avg_temperature"),
                              avg("solar_radiation").alias("avg_solar_radiation"))


In [24]:
# Deliverable 4.5

# Sort the results
result_monthly_sorted = result_monthly_df.sort(desc("avg_temperature"))

In [25]:
# Deliverable 4.6: Print the top 5 results
result_monthly_sorted.show(5)



+-----+-----------------+-------------------+
|month|  avg_temperature|avg_solar_radiation|
+-----+-----------------+-------------------+
|    7| 73.9766158271431| 253.92342957199992|
|    6|73.41862618893836| 267.88055625964927|
|    8|71.00368368101219| 253.58956703962878|
|    9|66.70186356073211|  179.6384488924556|
|    5|60.18404719420269|  232.1310546777555|
+-----+-----------------+-------------------+
only showing top 5 rows


                                                                                

A key concept in this question was that Spark DataFrames are immutable. Unlike Pandas, you can’t directly alter a column in place. Instead, you use withColumn() to generate a new DataFrame that includes the updated or additional column. For the transformations, I applied functions such as to_timestamp, year, and month from pyspark.sql.functions to engineer the time‑based features. The final step was an aggregation using the familiar groupBy and agg pattern from Question 2, this time grouped on the newly created month column.


## Question 5

In [26]:
# Deliverable 5.1
result_monthly_sorted.explain(mode="cost")

== Optimized Logical Plan ==
Sort [avg_temperature#651 DESC NULLS LAST], true, Statistics(sizeInBytes=9.9 MiB)
+- Aggregate [month#619], [month#619, avg(temperature#481) AS avg_temperature#651, avg(solar_radiation#485) AS avg_solar_radiation#652], Statistics(sizeInBytes=9.9 MiB)
   +- Project [temperature#481, solar_radiation#485, month(cast(cast(observation_time#480 as timestamp) as date)) AS month#619], Statistics(sizeInBytes=9.9 MiB)
      +- Relation [station_id#476L,latitude#477,longitude#478,name#479,observation_time#480,temperature#481,temperature_high#482,temperature_low#483,humidity#484,solar_radiation#485,solar_radiation_high#486,rain#487,rain_inches_last_hour#488,wind_speed_mph#489,wind_direction_degrees#490,wind_gust_speed_mph#491,wind_gust_direction_degrees#492,pressure#493,soil_temp_1#494,soil_temp_2#495,soil_temp_3#496,soil_temp_4#497,soil_moist_1#498,soil_moist_2#499,soil_moist_3#500,... 1 more fields] parquet, Statistics(sizeInBytes=85.1 MiB)

== Physical Plan ==
Adapt

In [None]:
# Deliverable 5.2

#To determine the counts, you need to execute the code from Deliverable 5.1 and then review the output under the headings == Physical Plan == and == Optimized Logical Plan ==. The totals can differ slightly depending on the Spark release you’re working with.
#- Physical Plan: Typically contains a larger number of operations, often in the range of 7–10. These are low‑level execution details such as HashAggregate, Exchange, and FileScan.
#- Optimized Logical Plan: Usually shorter, around 3–5 steps. It represents the computation at a higher level, with abstract operations like Aggregate, Sort, and Project.




In [37]:


#Deliverable 5.3

# This query performs a self-join on the station dataset to identify temperature
# measurements taken exactly one hour apart at the same station. It then computes
# the one-hour temperature change, aggregates results by station, and filters for
# "busy" stations (those with more than 1000 observations). Finally, it sorts the
# stations by their average temperature change in descending order.


df_with_time = df.withColumn("observation_time", to_timestamp("observation_time"))

df_t1 = df_with_time.alias("t1")
df_t2 = df_with_time.alias("t2")

complex_query = (
    df_t1.join(
        df_t2,
        (col("t1.station_id") == col("t2.station_id")) &
        (col("t1.observation_time") == expr("t2.observation_time - INTERVAL 1 HOUR"))
    )
    .select(
        col("t1.station_id"),
        col("t1.name").alias("station_name"),
        (col("t2.temperature") - col("t1.temperature")).alias("temp_change_1hr")
    )
    .groupBy("station_name")
    .agg(
        count("*").alias("num_observations"),
        avg("temp_change_1hr").alias("avg_temp_change")
    )
    .filter(col("num_observations") > 1000)
    .sort(desc("avg_temp_change"))
)



In [38]:
# Deliverable 5.4

complex_query.explain(mode="cost")

== Optimized Logical Plan ==
Sort [avg_temp_change#836 DESC NULLS LAST], true, Statistics(sizeInBytes=121.3 TiB)
+- Filter (num_observations#835L > 1000), Statistics(sizeInBytes=121.3 TiB)
   +- Aggregate [station_name#833], [station_name#833, count(1) AS num_observations#835L, avg(temp_change_1hr#834) AS avg_temp_change#836], Statistics(sizeInBytes=121.3 TiB)
      +- Project [name#479 AS station_name#833, (temperature#810 - temperature#481) AS temp_change_1hr#834], Statistics(sizeInBytes=99.2 TiB)
         +- Join Inner, ((station_id#476L = station_id#805L) AND (observation_time#804 = observation_time#831 + INTERVAL '-01' HOUR)), Statistics(sizeInBytes=209.4 TiB)
            :- Project [station_id#476L, name#479, cast(observation_time#480 as timestamp) AS observation_time#804, temperature#481], Statistics(sizeInBytes=18.4 MiB)
            :  +- Filter (isnotnull(station_id#476L) AND isnotnull(cast(observation_time#480 as timestamp))), Statistics(sizeInBytes=85.1 MiB)
            :   

In [None]:
# Deliverable 5.5

#For the complex query I ran, the Physical Plan contained 18 steps, while the Optimized Logical Plan had 7 steps.




In [None]:
# Deliverable 5.6

#The Optimized Logical Plan was far easier to interpret. It laid out the high‑level operations—such as Sort, Filter, Aggregate, and Join—almost like a recipe that directly reflected the query I wrote. 
#In contrast, the Physical Plan was much more detailed, showing the intricate details of execution with operations like SortMergeJoin, HashAggregate, and Exchange (which handles data shuffling). The Physical Plan is essential for performance tuning, but the Logical Plan is what makes the query’s intent clear to a human reader.


The .explain() method is invaluable for both debugging and optimization because it exposes two complementary perspectives. The Optimized Logical Plan provides a concise, human‑readable outline of the query logic (joins, filters, aggregations, etc.), while the Physical Plan dives into the low‑level execution details, including how Spark performs joins, aggregates, and data shuffles. As seen in my complex query from 5.3, the Physical Plan can grow lengthy and intricate, whereas the Logical Plan remains compact and focused on the business logic. This makes the Logical Plan the better tool when the goal is simply to understand what the query is doing.


## Pledge

By submitting this work I hereby pledge that this is my own, personal work. I've acknowledged in the designated place at the top of this file all sources that I used to complete said work, including but not limited to: online resources, books, and electronic communications. I've noted all collaboration with fellow students and/or TA's. I did not copy or plagiarize another's work.

> As a Boilermaker pursuing academic excellence, I pledge to be honest and true in all that I do. Accountable together – We are Purdue.