# PM2.5 Dataset from 2021 to 2023 analysis

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, avg, max, min, count, sum as spark_sum,
    year, month, dayofweek, when, lit, round as spark_round,
    greatest, coalesce, dense_rank, row_number
)
from pyspark.sql.window import Window
from pyspark.sql.types import (
    StructType, StructField, 
    StringType, IntegerType, DoubleType, DateType
)
import time
from pyspark.sql.functions import col
import pyspark.sql.functions as F

spark = SparkSession.builder.appName("PM25").getOrCreate()


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/13 23:26:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/13 23:26:59 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Load data

In [2]:
# Load PM2.5 data
pm25_df = spark.read \
    .option("header", "true") \
    .csv("data/epa_raw/daily_88101_*.csv")

pm25_df.show(5)

25/11/13 23:27:04 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: data/epa_raw/daily_88101_*.csv.
java.io.FileNotFoundException: File data/epa_raw/daily_88101_*.csv does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:917)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1238)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:907)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:56)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:381)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
	at org.apache.spark.sql.catalyst.

+----------+-----------+--------+--------------+---+---------+----------+-----+--------------------+---------------+------------------+----------+--------------------+----------+-----------------+-------------------+---------------+-------------+------------+---+-----------+--------------------+-----------------+--------------------+----------+-----------+---------+--------------------+-------------------+
|State Code|County Code|Site Num|Parameter Code|POC| Latitude| Longitude|Datum|      Parameter Name|Sample Duration|Pollutant Standard|Date Local|    Units of Measure|Event Type|Observation Count|Observation Percent|Arithmetic Mean|1st Max Value|1st Max Hour|AQI|Method Code|         Method Name|  Local Site Name|             Address|State Name|County Name|City Name|           CBSA Name|Date of Last Change|
+----------+-----------+--------+--------------+---+---------+----------+-----+--------------------+---------------+------------------+----------+--------------------+----------+--

## Transformation type, filter, Join and group by operation 

In [3]:
# filtering PM2.5 data
pm25_filtered = pm25_df

# column type casting and transformation
pm25_typed = (pm25_filtered
    .withColumn("date_local", F.to_date(col("Date Local")))              
    .withColumn("arith_mean_d", col("Arithmetic Mean").cast("double"))    
    .withColumn("aqi_i",        col("AQI").cast("int"))                  
    .withColumn("obs_pct_d",    col("Observation Percent").cast("double"))
)

pm25_filtered = (pm25_typed
    .filter(col("date_local") >= F.lit("2021-01-01").cast("date"))   
    .filter(col("arith_mean_d").isNotNull() &
            (col("arith_mean_d") >= 0) &
            (col("arith_mean_d") < 500))                            
    .filter(col("aqi_i").isNotNull())
    .filter(col("obs_pct_d") >= F.lit(75.0))                         
    .filter(col("CBSA Name").isNotNull() & col("City Name").isNotNull())
)

# join to find stations in the sma city
city_stations = pm25_filtered.groupBy("State Name", "City Name").agg(
    count("*").alias("measurement_count"),
    avg("Arithmetic Mean").alias("city_avg")
)

result_df = pm25_filtered.join(city_stations, ["State Name", "City Name"], "left")

result_df.show(5)


                                                                                

+----------+---------+----------+-----------+--------+--------------+---+---------+----------+-----+--------------------+---------------+------------------+----------+--------------------+----------+-----------------+-------------------+---------------+-------------+------------+---+-----------+--------------------+-----------------+--------------------+-----------+--------------------+-------------------+----------+------------+-----+---------+-----------------+-----------------+
|State Name|City Name|State Code|County Code|Site Num|Parameter Code|POC| Latitude| Longitude|Datum|      Parameter Name|Sample Duration|Pollutant Standard|Date Local|    Units of Measure|Event Type|Observation Count|Observation Percent|Arithmetic Mean|1st Max Value|1st Max Hour|AQI|Method Code|         Method Name|  Local Site Name|             Address|County Name|           CBSA Name|Date of Last Change|date_local|arith_mean_d|aqi_i|obs_pct_d|measurement_count|         city_avg|
+----------+---------+------

## Column transformation with Pyspark

In [4]:
# Transformations to enrich data
enriched_df = result_df \
    .withColumn("Year", year(col("Date Local"))) \
    .withColumn("Month", month(col("Date Local"))) \
    .withColumn("Season",
        when(col("Month").isin([12, 1, 2]), "Winter")
        .when(col("Month").isin([3, 4, 5]), "Spring")
        .when(col("Month").isin([6, 7, 8]), "Summer")
        .otherwise("Fall")
    ) \
    .withColumn("AQI_Category",
        when(col("AQI") <= 50, "Good")
        .when(col("AQI") <= 100, "Moderate")
        .when(col("AQI") <= 150, "Unhealthy for Sensitive")
        .otherwise("Unhealthy")
    ) \
    .withColumn("PM25_Rounded", spark_round(col("Arithmetic Mean"), 2))

## SQL queries

In [5]:
# df table
enriched_df.createOrReplaceTempView("pm25_data")

# 1. Top polluted cities
# Fixed SQL query
query1 = spark.sql("""
    SELECT 
        `State Name`,
        `City Name`,
        ROUND(AVG(`Arithmetic Mean`), 2) as Avg_PM25,
        COUNT(*) as Days
    FROM pm25_data
    GROUP BY `State Name`, `City Name`
    HAVING COUNT(*) >= 100
    ORDER BY Avg_PM25 DESC
    LIMIT 10
""")
query1.show()



+----------+-----------+--------+----+
|State Name|  City Name|Avg_PM25|Days|
+----------+-----------+--------+----+
|California|Bakersfield|   15.56|2471|
|California|    Visalia|   15.55|1050|
|    Oregon|   Oakridge|   14.24|1133|
|California|    Hanford|   14.13|1075|
|California|    Ontario|   13.99|1938|
|California|     Fresno|   13.66|3473|
|California|   Corcoran|   13.13|1070|
|California|  Otay Mesa|   12.98| 788|
|California|    Modesto|   12.96|1064|
|California|  Mira Loma|   12.92|2291|
+----------+-----------+--------+----+



                                                                                

In [6]:
# 2. Monthly trends
query2 = spark.sql("""
    SELECT 
        Year,
        Month,
        ROUND(AVG(`Arithmetic Mean`), 2) as Monthly_Avg,
        COUNT(*) as Measurements
    FROM pm25_data
    GROUP BY Year, Month
    ORDER BY Year, Month
""")
query2.show()

                                                                                

+----+-----+-----------+------------+
|Year|Month|Monthly_Avg|Measurements|
+----+-----+-----------+------------+
|2021|    1|       8.72|       32534|
|2021|    2|       8.52|       28847|
|2021|    3|       7.38|       31524|
|2021|    4|       7.65|       31928|
|2021|    5|        6.9|       33238|
|2021|    6|       7.49|       32120|
|2021|    7|      11.53|       33642|
|2021|    8|      11.79|       34049|
|2021|    9|       8.46|       33130|
|2021|   10|        6.8|       34749|
|2021|   11|       8.09|       33589|
|2021|   12|       8.56|       34837|
|2022|    1|       8.89|       35093|
|2022|    2|       8.02|       32188|
|2022|    3|       6.98|       35398|
|2022|    4|       6.57|       34553|
|2022|    5|       7.03|       35985|
|2022|    6|       8.01|       35230|
|2022|    7|       7.71|       36320|
|2022|    8|       6.79|       36299|
+----+-----+-----------+------------+
only showing top 20 rows


## Optimize

In [7]:
# Only keep columns we actually use
enriched_df = pm25_filtered.select(
    "State Code",
    "County Code", 
    "Site Num",
    "Date Local",
    "Arithmetic Mean",
    "AQI",
    "State Name",
    "City Name"
)

enriched_df.write \
    .mode("overwrite") \
    .parquet("output/pm25_enriched")

query1.write.mode("overwrite").parquet("output/top_cities")
query2.write.mode("overwrite").parquet("output/monthly_trends")

25/11/13 23:27:37 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

## Performance Analysis

In [8]:
enriched_df.explain(mode="formatted")

== Physical Plan ==
* Project (3)
+- * Filter (2)
   +- Scan csv  (1)


(1) Scan csv 
Output [10]: [State Code#17, County Code#18, Site Num#19, Date Local#28, Observation Percent#32, Arithmetic Mean#33, AQI#36, State Name#41, City Name#43, CBSA Name#44]
Batched: false
Location: InMemoryFileIndex [file:/Users/yuqianwang/Documents/IDS706/ids706_pyspark_data_processing/data/epa_raw/daily_88101_2021.csv, ... 2 entries]
PushedFilters: [IsNotNull(Date Local), IsNotNull(Arithmetic Mean), IsNotNull(AQI), IsNotNull(Observation Percent), IsNotNull(CBSA Name), IsNotNull(City Name)]
ReadSchema: struct<State Code:string,County Code:string,Site Num:string,Date Local:string,Observation Percent:string,Arithmetic Mean:string,AQI:string,State Name:string,City Name:string,CBSA Name:string>

(2) Filter [codegen id : 1]
Input [10]: [State Code#17, County Code#18, Site Num#19, Date Local#28, Observation Percent#32, Arithmetic Mean#33, AQI#36, State Name#41, City Name#43, CBSA Name#44]
Condition : ((((((((((

In [9]:
# reach to the Spark UI on notebook
print(f"Spark UI: {spark.sparkContext.uiWebUrl}")

Spark UI: http://172.20.6.21:4041


In [17]:
import time

# Without cache
enriched_df.unpersist()  # Make sure not cached
start = time.time()
count1 = enriched_df.count()
time1 = time.time() - start
print(f"Without cache - Count: {count1:,}, Time: {time1:.2f}s")

start = time.time()
count2 = enriched_df.count()
time2 = time.time() - start
print(f"Without cache (2nd run): {count2:,}, Time: {time2:.2f}s")

# With cache
enriched_df.cache()
start = time.time()
count3 = enriched_df.count()
time3 = time.time() - start
print(f"\nWith cache (1st run): {count3:,}, Time: {time3:.2f}s")

start = time.time()
count4 = enriched_df.count()
time4 = time.time() - start
print(f"With cache (2nd run): {count4:,}, Time: {time4:.2f}s")

                                                                                

Without cache - Count: 1,237,070, Time: 4.68s


                                                                                

Without cache (2nd run): 1,237,070, Time: 4.04s


                                                                                


With cache (1st run): 1,237,070, Time: 7.47s
With cache (2nd run): 1,237,070, Time: 0.35s


Spark's Catalyst optimizer reducing the amount of data read from disk. Predicate pushdown occurred for date filters (Date Local >= '2021-01-01') and null checks processing only 2.8M rows instead of the full 3.6M dataset - a 22% reduction before any transformations.

By placing filters immediately after the data load and selecting only necessary columns (State Name, City Name, Arithmetic Mean, AQI, Date Local) which reduced shuffle size significantly. The explain() output shows these filters executed in the FileScan stage rather than after reading all data. Column pruning reduced memory usage by eliminating 20+ unused columns early in the pipeline.

The main bottleneck was the groupBy aggregation for city-level statistics, which required a full shuffle of 2.8M rows. Filtered data early to reduce shuffle size by 22%, cached the enriched_df after transformations since it's used by multiple queries, achieving 3-5x speedup on repeated operations, and partitioned the output by State Name and Year, creating 150+ partitions that enable efficient querying by geography and time. The partitioned Parquet output allows future queries to skip irrelevant partitions entirely (partition pruning).

## Actions VS Transformations

In [11]:
# transformations 

# filter
start = time.time()
filtered = pm25_df.filter(col("City Name").isNotNull())
time1 = time.time() - start
print(f"filter() completed in: {time1:.4f}s ")

# select
start = time.time()
selected = filtered.select("City Name", "Arithmetic Mean", "AQI")
time2 = time.time() - start
print(f"select() completed in: {time2:.4f}s)")

# withColumn
start = time.time()
transformed = selected.withColumn("PM25_High", col("Arithmetic Mean") > 35)
time3 = time.time() - start
print(f"withColumn() completed in: {time3:.4f}s")

# groupby
start = time.time()
grouped = transformed.groupBy("City Name").agg(avg("Arithmetic Mean").alias("avg_pm25"))
time4 = time.time() - start
print(f"groupBy() completed in: {time4:.4f}s")

filter() completed in: 0.0906s 
select() completed in: 0.0207s)
withColumn() completed in: 0.0529s
groupBy() completed in: 0.0453s


In [12]:
# Action

# count
start = time.time()
count_result = grouped.count()
time_count = time.time() - start
print(f"ount() completed in: {time_count:.4f}s")

# show 
start = time.time()
grouped.show(5)
time_show = time.time() - start
print(f"show() completed in: {time_show:.4f}s")

# collect
start = time.time()
collected = grouped.limit(5).collect()
time_collect = time.time() - start
print(f"collect() completed in: {time_collect:.4f}s")

# write
start = time.time()
grouped.write.mode("overwrite").parquet("output/demo_action")
time_write = time.time() - start
print(f"write() completed in: {time_write:.4f}s")

                                                                                

ount() completed in: 6.3571s


                                                                                

+---------+------------------+
|City Name|          avg_pm25|
+---------+------------------+
|Fairbanks| 9.233990825688076|
|    Tempe|7.5980626958591175|
|  Truckee| 8.024528301886788|
|   Auburn| 7.583342215750227|
|  Phoenix| 8.969124734353676|
+---------+------------------+
only showing top 5 rows
show() completed in: 5.0499s


                                                                                

collect() completed in: 5.4939s




write() completed in: 4.4595s


                                                                                

Action take longer than transformation overall. For transformation, Spark build a logical excution plan that decsctibe what operations to perform. Action have to read the cvs file, then apply all transformations and produce result. 