### Air Quality Index (AQI) Trend Analysis – USA (2020–2024)

#### Data Preparation

In [0]:
# **Load and Merge Data for aqi_county_df**
aqi_county_df_2020 = spark.read.csv("/Volumes/workspace/2235_team2/aqi/annual_aqi_by_county_2020.csv", header=True, inferSchema=True)
aqi_county_df_2021 = spark.read.csv("/Volumes/workspace/2235_team2/aqi/annual_aqi_by_county_2021.csv", header=True, inferSchema=True)
aqi_county_df_2022 = spark.read.csv("/Volumes/workspace/2235_team2/aqi/annual_aqi_by_county_2022.csv", header=True, inferSchema=True)
aqi_county_df_2023 = spark.read.csv("/Volumes/workspace/2235_team2/aqi/annual_aqi_by_county_2023.csv", header=True, inferSchema=True)
aqi_county_df_2024 = spark.read.csv("/Volumes/workspace/2235_team2/aqi/annual_aqi_by_county_2024.csv", header=True, inferSchema=True)

aqi_county_df = aqi_county_df_2020.union(aqi_county_df_2021).union(aqi_county_df_2022).union(aqi_county_df_2023).union(aqi_county_df_2024)
aqi_county_df.show(2)
aqi_county_df.count()

In [0]:
aqi_county_df.printSchema()

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
#Importing FIPS Code for County and State
fips_code_county = spark.read.csv("/Volumes/workspace/2235_team2/aqi/FIPS_Codes_County.csv", header=True, inferSchema=True)
fips_code_state = spark.read.csv("/Volumes/workspace/2235_team2/aqi/FIPS_Codes_State.csv", header=True, inferSchema=True)


In [0]:
# Trim whitespace from the 'County' column in the fips_code_county DataFrame
fips_code_county = fips_code_county.withColumn('County', trim(fips_code_county['County']))
fips_code_county.show(2)
fips_code_county.printSchema()

In [0]:
# Convert the 'State' column to lowercase and trim whitespace in the fips_code_state DataFrame
fips_code_state = fips_code_state.withColumn('State', lower(trim(fips_code_state['State'])))

In [0]:
# Trim whitespace from the 'State Code' column in the fips_code_state DataFrame
fips_code_state = fips_code_state.withColumn('State Code', trim(fips_code_state['State Code']))

In [0]:
fips_code_state.show(5)

In [0]:
# Trim whitespace from the 'County' and 'State' columns in the aqi_county_df DataFrame
aqi_county_df_trimmed = aqi_county_df.withColumn('County', trim(aqi_county_df['County']))
aqi_county_df_trimmed = aqi_county_df.withColumn('State', trim(aqi_county_df['State']))
aqi_county_df_trimmed.show(5)

In [0]:
aqi_county_df = aqi_county_df_trimmed
aqi_county_df.show(2)

In [0]:
#Year-over-year trend of "Good" vs "Unhealthy" days.
#Check if there are null rows in Good Days & Unhealthy Days
aqi_county_df.filter(col("Good Days").isNull()==True).show()
aqi_county_df.filter(col("Unhealthy Days").isNull()==True).show()

#### A. National Trends

In [0]:
#Year-over-year trend of "Good" vs "Unhealthy" days.
days_trend_details = aqi_county_df.groupBy("Year","State").agg(count("County").alias("County_Count"), sum("Good Days").alias("Good Days"), sum("Unhealthy Days").alias("Unhealthy Days"))
display(days_trend_details)

In [0]:
#divide good days and bad days by county count and round them to 0 decimal places and finally sort them by year and State Ascending
days_trend = days_trend_details.withColumn("Good Days", days_trend_details["Good Days"]/days_trend_details["County_Count"]).withColumn("Unhealthy Days", days_trend_details["Unhealthy Days"]/days_trend_details["County_Count"]).withColumn("Good Days", round("Good Days",0)).withColumn("Unhealthy Days", round("Unhealthy Days",0)).withColumn("Year", days_trend_details["Year"].cast(IntegerType())).orderBy("Year","State")
display(days_trend)

In [0]:
#drop county count
days_trend_National = days_trend.drop("County_Count")
display(days_trend_National)

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
# Rename columns and save the DataFrame as a table
days_trend_National = days_trend_National.withColumnRenamed('Good Days', 'Good_Days').withColumnRenamed('Unhealthy Days', 'Unhealthy_Days')
days_trend_National.write.mode("overwrite").saveAsTable('workspace.2235_team2.days_trend_National')

In [0]:
days_trend_National.printSchema()

In [0]:
#aggregate by year
days_trend_National_by_year = days_trend_National.groupBy("Year").agg(count("State").alias("State_Count"), sum("Good Days").alias("Good_Days"), sum("Unhealthy Days").alias("Unhealthy_Days")).orderBy("Year")
display(days_trend_National_by_year)

In [0]:
#divide good days and bad days by State Count and round them to 0 decimal places
days_trend_Year = days_trend_National.withColumn("Good Days", days_trend_National["Good Days"]/days_trend_National["State_Count"]).withColumn("Unhealthy Days", days_trend_National["Unhealthy Days"]/days_trend_National["State_Count"]).withColumn("Good Days", round("Good Days",0)).withColumn("Unhealthy Days", round("Unhealthy Days",0)).withColumn("Year", days_trend_National["Year"].cast(IntegerType()))
display(days_trend_Year)

In [0]:
# Drop the 'State_Count' column from the DataFrame
days_trend_Year = days_trend_Year.drop("State_Count")
display(days_trend_Year)

In [0]:
#Annual 'Max AQI', '90th Percentile AQI', and 'Median AQI' comparison and round it to 2 decimal places
aqi_comparison_detail = aqi_county_df.groupBy("Year","State").agg(avg("max aqi").alias("Max_AQI"), avg("90th percentile aqi").alias("90th_Percentile_AQI"), avg("median aqi").alias("Median_AQI"))
aqi_comparison = aqi_comparison_detail.withColumn("Max_AQI", round("Max_AQI",2)).withColumn("90th_Percentile_AQI", round("90th_Percentile_AQI",2)).withColumn("Median_AQI", round("Median_AQI",2)).withColumn("Year", aqi_comparison_detail["Year"].cast(IntegerType())).orderBy("Year","State")
display(aqi_comparison_detail)

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
# Save the AQI comparison details DataFrame as a table
aqi_comparison_detail.write.mode("overwrite").saveAsTable('workspace.2235_team2.aqi_comparison_detail')

In [0]:
# Aggregate AQI metrics by year and round them to 2 decimal places
aqi_comparison_annual = aqi_comparison.groupBy("Year").agg(avg("Max AQI").alias("Max AQI"), avg("90th Percentile AQI").alias("90th Percentile AQI"), avg("Median AQI").alias("Median AQI")).orderBy("Year")
aqi_comparison_annual = aqi_comparison_annual.withColumn("Max AQI", round("Max AQI",2)).withColumn("90th Percentile AQI", round("90th Percentile AQI",2)).withColumn("Median AQI", round("Median AQI",2)).withColumn("Year", aqi_comparison_annual["Year"].cast(IntegerType()))
display(aqi_comparison_annual)

In [0]:
# Aggregate pollutant data by year and state, summing up the days for each pollutant and counting the number of counties
pollutant = aqi_county_df.groupBy("Year","State").agg(count("County").alias("County_Count"),sum("Days CO").alias("CO"), sum("Days NO2").alias("NO2"), sum("Days Ozone").alias("Ozone"), sum("`Days PM2.5`").alias("PM2.5"), sum("Days PM10").alias("PM10"))
display(pollutant)

In [0]:
#divide by county count and round them to 0 decimal places and finally sort them by year and State Ascending
from pyspark.sql.functions import col, round
from pyspark.sql.types import IntegerType

pollutant_by_state = (
    pollutant
    .withColumn("CO", col("CO") / col("County_Count"))
    .withColumn("NO2", col("NO2") / col("County_Count"))
    .withColumn("Ozone", col("Ozone") / col("County_Count"))
    .withColumn("PM2.5", col("`PM2.5`") / col("County_Count"))
    .withColumn("PM10", col("PM10") / col("County_Count"))
    .withColumn("CO", round(col("CO"), 0))
    .withColumn("NO2", round(col("NO2"), 0))
    .withColumn("Ozone", round(col("Ozone"), 0))
    .withColumn("PM2.5", round(col("`PM2.5`"), 0))
    .withColumn("PM10", round(col("PM10"), 0))
    .withColumn("Year", col("Year").cast(IntegerType()))
    .orderBy("Year", "State")
)
display(pollutant_by_state)

In [0]:
#drop county count
pollutant_by_state = pollutant_by_state.drop("County_Count")
display(pollutant_by_state)

In [0]:
from pyspark.sql.functions import expr

pollutant_columns = ["CO", "NO2", "Ozone", "`PM2.5`", "PM10"]

# Transform the DataFrame from wide format to long format using the stack function
pollutant_melted = pollutant_by_state.selectExpr(
    "Year", "State",
    "stack(5, 'CO', CO, 'NO2', NO2, 'Ozone', Ozone, 'PM2.5', `PM2.5`, 'PM10', PM10) as (Pollutant, Values)"
)
display(pollutant_melted)

Databricks visualization. Run in Databricks to view.

In [0]:
# Save the transformed pollutant data in long format as a table
pollutant_melted.write.mode("overwrite").saveAsTable('workspace.2235_team2.pollutant_melted')

In [0]:
#aggregate by year
pollutant_by_year = pollutant_by_state.groupBy("Year").agg(count("State").alias("State_Count"), sum("CO").alias("CO"), sum("NO2").alias("NO2"), sum("Ozone").alias("Ozone"), sum("`PM2.5`").alias("PM2.5"), sum("PM10").alias("PM10")).orderBy("Year")
display(pollutant_by_year) 

In [0]:
#divide by state count and round them to 0 decimal places and finally sort them by year Ascending
from pyspark.sql.functions import col, round
from pyspark.sql.types import IntegerType

pollutant_aqi = (
    pollutant_by_year
    .withColumn("CO", col("CO") / col("State_Count"))
    .withColumn("NO2", col("NO2") / col("State_Count"))
    .withColumn("Ozone", col("Ozone") / col("State_Count"))
    .withColumn("PM2.5", col("`PM2.5`") / col("State_Count"))
    .withColumn("PM10", col("PM10") / col("State_Count"))
    .withColumn("CO", round(col("CO"), 0))
    .withColumn("NO2", round(col("NO2"), 0))
    .withColumn("Ozone", round(col("Ozone"), 0))
    .withColumn("PM2.5", round(col("`PM2.5`"), 0))
    .withColumn("PM10", round(col("PM10"), 0))
    .withColumn("Year", col("Year").cast(IntegerType()))
    .orderBy(col("Year").asc())
)
display(pollutant_aqi)

In [0]:
#drop state count
pollutant_aqi = pollutant_aqi.drop("State_Count")
display(pollutant_aqi)

### B. County-Level Analysis

In [0]:
#Top 10 counties with most "Unhealthy for Sensitive Groups Days", "Unhealthy Days" and "Hazardous days".
#Combining them together not yielding correct top 10
county_total = aqi_county_df.groupBy("State","County").agg(sum("Unhealthy for Sensitive Groups Days").alias("Unhealthy for Sensitive Groups Days"), sum("Unhealthy Days").alias("Unhealthy Days"), sum("Hazardous Days").alias("Hazardous Days")).orderBy(col("Unhealthy for Sensitive Groups Days").desc(), col("Unhealthy Days").desc(), col("Hazardous Days").desc()).limit(10)
display(county_total)


In [0]:
#Top 10 counties with most "Unhealthy for Sensitive Groups Days"
county_total_usgd = aqi_county_df.groupBy("State","County").agg(sum("Unhealthy for Sensitive Groups Days").alias("Unhealthy_for_Sensitive_Groups_Days")).orderBy(col("Unhealthy_for_Sensitive_Groups_Days").desc()).limit(10)
display(county_total_usgd)

Databricks visualization. Run in Databricks to view.

In [0]:
# Save the top 10 counties with the most "Unhealthy for Sensitive Groups Days" as a table
county_total_usgd.write.mode("overwrite").saveAsTable('workspace.2235_team2.county_total_usgd')

In [0]:
#Top 10 counties with most  "Unhealthy Days"
county_total_ud = aqi_county_df.groupBy("State","County").agg(sum("Unhealthy Days").alias("Unhealthy_Days")).orderBy(col("Unhealthy_Days").desc()).limit(10)
display(county_total_ud)

Databricks visualization. Run in Databricks to view.

In [0]:
# Save the top 10 counties with the most "Unhealthy Days" as a table
county_total_ud.write.mode("overwrite").saveAsTable('workspace.2235_team2.county_total_ud')

In [0]:
#Top 10 counties with most "Hazardous days".
county_total_hd = aqi_county_df.groupBy("State","County").agg(sum("Hazardous Days").alias("Hazardous_Days")).orderBy(col("Hazardous_Days").desc()).limit(10)
display(county_total_hd)

Databricks visualization. Run in Databricks to view.

In [0]:
# Save the top 10 counties with the most "Hazardous Days" as a table
county_total_hd.write.mode("overwrite").saveAsTable('workspace.2235_team2.county_total_hd')

In [0]:
#State-wise Comparison of All its Counties based on "Unhealthy for Sensitive Groups", "Unhealthy" and "Hazardous" days.
from pyspark.sql.window import Window

# Aggregate by state and county
state_county = aqi_county_df.groupBy("State", "County") \
    .agg(
        sum("Unhealthy for Sensitive Groups Days").alias("Total_Unhealthy_Sensitive"),
        sum("Unhealthy Days").alias("Total_Unhealthy"),
        sum("Hazardous Days").alias("Total_Hazardous")
    )

# Rank counties within each state for each category
window_spec = Window.partitionBy("State").orderBy(desc("Total_Unhealthy_Sensitive"))
state_county = state_county.withColumn("Rank_Unhealthy_Sensitive",rank().over(window_spec))

window_spec2 = Window.partitionBy("State").orderBy(desc("Total_Unhealthy"))
state_county = state_county.withColumn("Rank_Unhealthy",rank().over(window_spec2))

window_spec3 = Window.partitionBy("State").orderBy(desc("Total_Hazardous"))
state_county = state_county.withColumn("Rank_Hazardous",rank().over(window_spec3))

#Order state_county by State followed by Rank of each category
state_county = state_county.orderBy("State", "Rank_Unhealthy_Sensitive", "Rank_Unhealthy", "Rank_Hazardous")
display(state_county)


Databricks visualization. Run in Databricks to view.

In [0]:
# Save the state-wise comparison of all its counties based on "Unhealthy for Sensitive Groups", "Unhealthy" and "Hazardous" days as a table
state_county.write.mode("overwrite").saveAsTable('workspace.2235_team2.state_county')

In [0]:
# Display for a specific state, e.g., 'CA'
state_county.filter(col("State") == "California").display()

In [0]:
# Aggregate pollutant days by year and county
pollutant_county_year = aqi_county_df.groupBy("Year","County").agg(sum("Days CO").alias("CO"), sum("Days NO2").alias("NO2"), sum("Days Ozone").alias("Ozone"), sum("`Days PM2.5`").alias("PM2.5"), sum("Days PM10").alias("PM10"))

# Rank counties within each year for each pollutant category
window_spec4 = Window.partitionBy("Year").orderBy(desc("CO"))
pollutant_county_year = pollutant_county_year.withColumn("Rank_CO",rank().over(window_spec4))

window_spec5 = Window.partitionBy("Year").orderBy(desc("NO2"))
pollutant_county_year = pollutant_county_year.withColumn("Rank_NO2",rank().over(window_spec5))

window_spec6 = Window.partitionBy("Year").orderBy(desc("Ozone"))
pollutant_county_year = pollutant_county_year.withColumn("Rank_Ozone",rank().over(window_spec6))

window_spec7 = Window.partitionBy("Year").orderBy(desc("`PM2.5`"))
pollutant_county_year = pollutant_county_year.withColumn("Rank_PM2.5",rank().over(window_spec7))

window_spec8 = Window.partitionBy("Year").orderBy(desc("PM10"))
pollutant_county_year = pollutant_county_year.withColumn("Rank_PM10",rank().over(window_spec8))

# Order pollutant_county by Year followed by Rank of each category
pollutant_county_year = pollutant_county_year.orderBy("Year", "Rank_CO", "Rank_NO2", "Rank_Ozone", "Rank_PM10")
display(pollutant_county_year)

Databricks visualization. Run in Databricks to view.

In [0]:
pollutant_county_year.write.mode("overwrite").saveAsTable('workspace.2235_team2.pollutant_county_year')