In [2]:
from dask_jobqueue import SLURMCluster

# Compose SLURM script
cluster = SLURMCluster(queue='caslake', cores=5, memory='80GB', 
                       processes=5, walltime='03:00:00', interface='ib0',
                       job_extra=['--account=macs30123']
                      )

# Request resources
cluster.scale(jobs=1)

In [86]:
! squeue -u mnghiem

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
          31328817   caslake dask-wor  mnghiem  R    1:29:38      1 midway3-0072


In [4]:
from dask.distributed import Client

client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: http://172.25.0.65:8787/status,

0,1
Dashboard: http://172.25.0.65:8787/status,Workers: 5
Total threads: 5,Total memory: 74.50 GiB

0,1
Comm: tcp://172.25.0.65:42979,Workers: 5
Dashboard: http://172.25.0.65:8787/status,Total threads: 5
Started: Just now,Total memory: 74.50 GiB

0,1
Comm: tcp://172.25.2.72:36975,Total threads: 1
Dashboard: http://172.25.2.72:34991/status,Memory: 14.90 GiB
Nanny: tcp://172.25.2.72:39171,
Local directory: /scratch/local/jobs/31328817/dask-worker-space/worker-hgg484p8,Local directory: /scratch/local/jobs/31328817/dask-worker-space/worker-hgg484p8

0,1
Comm: tcp://172.25.2.72:42187,Total threads: 1
Dashboard: http://172.25.2.72:39801/status,Memory: 14.90 GiB
Nanny: tcp://172.25.2.72:35389,
Local directory: /scratch/local/jobs/31328817/dask-worker-space/worker-aeqbw_s4,Local directory: /scratch/local/jobs/31328817/dask-worker-space/worker-aeqbw_s4

0,1
Comm: tcp://172.25.2.72:41693,Total threads: 1
Dashboard: http://172.25.2.72:35775/status,Memory: 14.90 GiB
Nanny: tcp://172.25.2.72:36773,
Local directory: /scratch/local/jobs/31328817/dask-worker-space/worker-w_uu0j73,Local directory: /scratch/local/jobs/31328817/dask-worker-space/worker-w_uu0j73

0,1
Comm: tcp://172.25.2.72:34341,Total threads: 1
Dashboard: http://172.25.2.72:40529/status,Memory: 14.90 GiB
Nanny: tcp://172.25.2.72:37355,
Local directory: /scratch/local/jobs/31328817/dask-worker-space/worker-n_l7hwxq,Local directory: /scratch/local/jobs/31328817/dask-worker-space/worker-n_l7hwxq

0,1
Comm: tcp://172.25.2.72:39133,Total threads: 1
Dashboard: http://172.25.2.72:35961/status,Memory: 14.90 GiB
Nanny: tcp://172.25.2.72:45747,
Local directory: /scratch/local/jobs/31328817/dask-worker-space/worker-t_z180xn,Local directory: /scratch/local/jobs/31328817/dask-worker-space/worker-t_z180xn


In [10]:
import dask.dataframe as dd
import dask

daily_dtype = {
    "State Name": "string",
    "county Name": "string",
    "State Code": "string",
    "County Code": "string",
    "Date": "string",
    "AQI": "float64",
    "Defining Parameter": "string"
}    

hourly_dtype = {
    "State Name": "string",
    "County Name": "string",
    "State Code": "string",
    "County Code": "string",
    "Date Local": "string",
    "Time Local": "string",
    "Sample Measurement": "float64",
    "Units of Measure": "string"
}

asthma_dtype = {
    "CountyFIPS": "string",
    "Year": "string",
    "Value": "string"
}

daily_aqi = dd.read_csv('/scratch/midway3/mnghiem/project-aqi-data/daily_aqi_by_county/*.csv', 
                        usecols=["State Name", "county Name", "State Code", "County Code", "Date", "AQI", "Defining Parameter"],
                        dtype=daily_dtype)
hourly_pm2 = dd.read_csv('/scratch/midway3/mnghiem/project-aqi-data/hourly_pm25_by_county/*.csv',
                         usecols=["State Name", "County Name", "State Code", "County Code", "Date Local", "Time Local", "Sample Measurement", "Units of Measure"],
                        dtype=hourly_dtype)
tract_daily_pm25 = dd.read_csv('/scratch/midway3/mnghiem/project-aqi-data/daily_pm25_by_tract.csv')
asthma = dd.read_csv('/scratch/midway3/mnghiem/project-aqi-data/asthma_prevalence.csv',
                    usecols=["CountyFIPS", "Year", "Value"],
                    dtype=asthma_dtype)

In [13]:
asthma["CountyFIPS"] = asthma["CountyFIPS"].str.zfill(5)
asthma["Value"] = asthma["Value"].str[:-1].astype("float64")
asthma["Year"] = asthma["Year"].astype("int")

In [16]:
# Sampling for test
# daily_aqi = daily_aqi.sample(frac=0.001)
# hourly_pm2 = hourly_pm2.sample(frac=0.001)
# tract_daily_pm25 = tract_daily_pm25.sample(frac=0.001)

In [17]:
import re

# Define snake_case conversion
def to_snake_case(colname):
    colname = re.sub(r'[\s\-]+', '_', colname)
    colname = re.sub(r'([a-z])([A-Z])', r'\1_\2', colname)
    return colname.lower()

# Rename columns to snake_case
hourly_pm2.columns = [to_snake_case(col) for col in hourly_pm2.columns]
daily_aqi.columns = [to_snake_case(col) for col in daily_aqi.columns]
asthma.columns = [to_snake_case(col) for col in asthma.columns]

In [18]:
tract_daily_pm25["date"] = dd.to_datetime(tract_daily_pm25["date"])
daily_aqi["date"] = dd.to_datetime(daily_aqi["date"])
daily_aqi["year"] = daily_aqi["date"].dt.year
daily_aqi["fips"] = daily_aqi["state_code"] + daily_aqi["county_code"]
hourly_pm2["date_local"] = dd.to_datetime(hourly_pm2["date_local"])
hourly_pm2["hour"] = hourly_pm2["time_local"].str.slice(0, 2).astype("int64")
hourly_pm2["time"] = hourly_pm2["date_local"].dt.year
hourly_pm2["fips"] = hourly_pm2["state_code"] + hourly_pm2["county_code"]
hourly_pm2 = hourly_pm2.drop(columns="time_local")

In [59]:
daily_aqi["fips"] = daily_aqi["state_code"] + daily_aqi["county_code"]
hourly_pm2["fips"] = hourly_pm2["state_code"] + hourly_pm2["county_code"]
asthma = asthma.rename(columns={
    "county_fips": "fips",
    "value": "percent"
})

In [60]:
asthma.dtypes

fips        string
year         int64
percent    float64
dtype: object

In [43]:
daily_aqi.head(5)

Unnamed: 0,State Name,county Name,State Code,County Code,Date,AQI,Defining Parameter
73418,New Jersey,Bergen,34,3,1980-09-07,56.0,NO2
18003,California,Ventura,6,111,1980-06-27,245.0,Ozone
120487,Utah,Utah,49,49,1980-03-19,32.0,Ozone
118879,Texas,Travis,48,453,1980-01-01,43.0,Ozone
48225,Kentucky,Muhlenberg,21,177,1980-05-14,58.0,Ozone


In [44]:
tract_daily_pm25.head(5)

Unnamed: 0,year,date,statefips,countyfips,ctfips,latitude,longitude,DS_PM_pred,DS_PM_stdd
887536,2020,11JAN2020,36,21,36021001200,-73.7825,42.25676,8.285,4.6723
869314,2020,11JAN2020,21,95,21095970500,-83.29163,36.93443,2.017,1.147
570524,2020,07JAN2020,46,47,46047964100,-103.52949,43.23404,2.674,1.8148
373185,2020,05JAN2020,26,77,26077002801,-85.61531,42.37519,5.62,3.4496
292200,2020,04JAN2020,27,53,27053026616,-93.50103,45.05047,5.667,1.9203


In [45]:
hourly_pm2.head(5)

Unnamed: 0,State Code,County Code,Date Local,Time Local,Sample Measurement,Units of Measure,State Name,County Name
139979,6,19,2010-09-03,06:00,35.0,Micrograms/cubic meter (LC),California,Fresno
2293,1,33,2010-04-15,08:00,22.6,Micrograms/cubic meter (LC),Alabama,Colbert
143604,6,27,2010-02-05,11:00,8.4,Micrograms/cubic meter (LC),California,Inyo
137371,6,19,2010-05-15,18:00,12.0,Micrograms/cubic meter (LC),California,Fresno
146800,6,27,2010-06-19,17:00,-6.9,Micrograms/cubic meter (LC),California,Inyo


In [21]:
# Load median household income from 5-year ACS 2023
from census import Census
from us import states
import pandas as pd

API_KEY = "9df5183d7032ec5b9690b0ca901ef955922d1fa1"

c = Census(API_KEY)

ACS_YEAR = 2023
ACS_SOURCE = 'acs5'
variable = "B19013_001E"

# Fetch data from ACS
results = c.acs5.state_county(
    fields=(variable,),
    state_fips="*",
    county_fips="*",
    year=ACS_YEAR
)

# Convert to DataFrame
county_income = pd.DataFrame(results)

# Format and enrich columns
county_income["state_fips"] = county_income["state"].str.zfill(2)
county_income["county_fips"] = county_income["county"].str.zfill(3)
county_income["fips"] = county_income["state_fips"] + county_income["county_fips"]
county_income["median_income"] = county_income[variable].astype(float)

# Optional: Add state abbreviation for reference
state_fips_to_code = {s.fips.zfill(2): s.abbr for s in states.STATES}
county_income["state_code"] = county_income["state_fips"].map(state_fips_to_code)

# Final column order
county_income = county_income[["fips", "median_income"]]

In [23]:
# Step 2: Define race variables + total population
race_vars = {
    "white": "B02001_002E",
    "black": "B02001_003E",
    "native": "B02001_004E",
    "asian": "B02001_005E",
    "pacific": "B02001_006E",
    "other": "B02001_007E",
    "2plus": "B02001_008E"
}
total_var = "B02001_001E"  # Total population

# Step 3: Define fields to query (race + total pop)
fields = list(race_vars.values()) + [total_var]

# Step 4: Query Census API for all counties
data = c.acs5.state_county(
    fields=fields,
    state_fips="*",
    county_fips="*",
    year=ACS_YEAR
)

# Load into DataFrame
majority_race = pd.DataFrame(data)

# Add and format FIPS columns
majority_race["state_fips"] = majority_race["state"].str.zfill(2)
majority_race["county_fips"] = majority_race["county"].str.zfill(3)
majority_race["fips"] = majority_race["state_fips"] + majority_race["county_fips"]

# Rename and cast columns
majority_race = majority_race.rename(columns={v: k for k, v in race_vars.items()})
majority_race = majority_race.rename(columns={total_var: "total_pop"})

# Convert race columns to float
race_cols = list(race_vars.keys())
majority_race[race_cols + ["total_pop"]] = majority_race[race_cols + ["total_pop"]].astype(float)

# Determine majority race and population count
majority_race["race"] = majority_race[race_cols].idxmax(axis=1)
majority_race["count"] = majority_race.lookup(majority_race.index, majority_race["race"])
majority_race["percentage"] = (majority_race["count"] / majority_race["total_pop"]) * 100

majority_race = majority_race[["fips", "race", "percentage"]]

  majority_race["count"] = majority_race.lookup(majority_race.index, majority_race["race"])


In [68]:
majority_race

Unnamed: 0,fips,state_fips,county_fips,race,count,percentage
0,01001,01,001,white,43616.0,73.570043
1,01003,01,003,white,198721.0,82.819396
2,01005,01,005,black,11616.0,46.920063
3,01007,01,007,white,16634.0,75.090285
4,01009,01,009,white,53062.0,89.492680
...,...,...,...,...,...,...
3217,72145,72,145,2plus,26581.0,49.171260
3218,72147,72,147,other,6252.0,76.739904
3219,72149,72,149,2plus,9236.0,42.409771
3220,72151,72,151,other,20950.0,70.141958


In [69]:
county_income

Unnamed: 0,fips,state_fips,state_code,county_fips,median_income
0,01001,01,AL,001,69841.0
1,01003,01,AL,003,75019.0
2,01005,01,AL,005,44290.0
3,01007,01,AL,007,51215.0
4,01009,01,AL,009,61096.0
...,...,...,...,...,...
3217,72145,72,,145,23877.0
3218,72147,72,,147,17531.0
3219,72149,72,,149,24882.0
3220,72151,72,,151,21279.0


In [24]:
# Average daily AQI time series (US)
us_yearly_avg_aqi = daily_aqi.groupby(["year"]).agg({"aqi": "mean"}).reset_index()
us_yearly_avg_aqi = us_yearly_avg_aqi.compute()

In [25]:
import altair as alt
line = alt.Chart(us_yearly_avg_aqi).mark_line().encode(
    x=alt.X("year:N"),
    y="aqi:Q"
).properties(
    width=600,
    height=300,
    title="Yearly Average AQI in the US"
)

regression = line.transform_regression(
    "year", "aqi", method="linear"
).mark_line(color="orange", strokeDash=[5, 5]).encode(
    tooltip=["year:Q", "aqi:Q"]
)

line + regression

In [26]:
# Daily highest AQI time series (US)
us_highest_daily_aqi = daily_aqi.groupby("year")["aqi"].max().reset_index()
us_highest_daily_aqi = us_highest_daily_aqi.compute()

In [29]:
bar = alt.Chart(us_highest_daily_aqi).mark_bar().encode(
    x="year:N",
    y="aqi:Q"
)

regression = bar.transform_regression(
    "year", "aqi", method="linear"
).mark_line(color="orange", strokeDash=[5, 5]).encode(
    tooltip=["year:Q", "aqi:Q"]
)

bar + regression

In [59]:
# # Get county-level max AQI per year
# county_year_max = daily_aqi.groupby(["year", "fips"])["aqi"].max().reset_index()

# # Compute yearly average of those county max AQIs
# yearly_avg_of_max = county_year_max.groupby("year")["aqi"].mean().reset_index()

# yearly_avg_of_max = yearly_avg_of_max.compute()
# yearly_avg_of_max

Unnamed: 0,year,aqi
0,1980,53.333333
1,1981,47.625
2,1982,51.943089
3,1983,55.427419
4,1984,49.024194
5,1985,52.679389
6,1986,48.689922
7,1987,56.535714
8,1988,52.72
9,1989,51.793548


In [34]:
# Counties that see a worsening trend in AQI (how many improved? any patterns?)
from sklearn.linear_model import LinearRegression

def compute_slope(df, val):
    if df.shape[0] < 2:
        return pd.Series({"slope": float("nan")})
    X = df["year"].values.reshape(-1, 1)
    y = df[val].values
    model = LinearRegression().fit(X, y)
    return pd.Series({"slope": model.coef_[0]})

county_aqi_trend = daily_aqi.groupby("fips").apply(compute_slope, val="aqi", meta={"slope": "f8"}).reset_index().compute()

county_negative_trend = county_aqi_trend[county_aqi_trend["slope"] < 0].sort_values("slope")
county_negative_trend.shape[0]/county_aqi_trend.shape[0]

0.4459770114942529

In [37]:
# Days over unhealthy AQI level (US)
daily_aqi["unhealthy"] = (daily_aqi["aqi"] >= 101).astype(int)
county_days_unhealthy = daily_aqi.groupby(["fips", "year"])["unhealthy"].sum().reset_index()
us_days_unhealthy = county_days_unhealthy.groupby(["year"])["unhealthy"].mean().reset_index()
us_days_unhealthy = us_days_unhealthy.compute()

In [38]:
alt.Chart(us_days_unhealthy).mark_line().encode(
    x=alt.X("year:N"),
    y="aqi:Q"
).properties(
    width=600,
    height=300,
    title="Average days with unhealthy AQI level (over 101)"
)

In [39]:
county_days_unhealthy_trend = daily_aqi.groupby("fips").apply(compute_slope, val="unhealthy", meta={"slope": "f8"}).reset_index().compute()

county_positive_unhealthy_days_trend = county_days_unhealthy_trend[county_days_unhealthy_trend["slope"] > 0].sort_values("slope")
county_positive_unhealthy_days_trend

Unnamed: 0,fips,slope
18,40085,1.088663e-17
3,11001,4.188306e-05
28,55027,7.907014e-05
5,06009,1.176747e-04
1,01089,1.937984e-04
...,...,...
5,16059,1.661264e-02
17,37135,2.040816e-02
27,80002,2.305543e-02
16,36083,2.406417e-02


In [41]:
us_unhealthy_param = daily_aqi.groupby(["year", "defining_parameter"])["unhealthy"].count().reset_index()
us_unhealthy_param = us_unhealthy_param.compute()
us_unhealthy_param

Unnamed: 0,year,defining_parameter,unhealthy
0,1980,CO,39
1,1980,NO2,19
2,1980,Ozone,78
3,1981,CO,38
4,1981,NO2,22
...,...,...,...
200,2023,PM10,9
201,2023,PM2.5,162
202,2024,Ozone,108
203,2024,PM10,9


In [44]:
us_unhealthy_param

Unnamed: 0,year,defining_parameter,unhealthy
0,1980,CO,39
1,1980,NO2,19
2,1980,Ozone,78
3,1981,CO,38
4,1981,NO2,22
...,...,...,...
200,2023,PM10,9
201,2023,PM2.5,162
202,2024,Ozone,108
203,2024,PM10,9


In [46]:
us_total_unhealthy_param = us_unhealthy_param.groupby(["year"])["unhealthy"].sum().reset_index()
us_unhealthy_param = us_unhealthy_param.merge(us_total_unhealthy_param, on="year", how="left")
us_unhealthy_param

Unnamed: 0,year,defining_parameter,unhealthy_x,unhealthy_y
0,1980,CO,39,136
1,1980,NO2,19,136
2,1980,Ozone,78,136
3,1981,CO,38,145
4,1981,NO2,22,145
...,...,...,...,...
200,2023,PM10,9,325
201,2023,PM2.5,162,325
202,2024,Ozone,108,207
203,2024,PM10,9,207


In [48]:
us_unhealthy_param["percentage"] = (us_unhealthy_param["unhealthy_x"] / us_unhealthy_param["unhealthy_y"]).astype(float)

alt.Chart(us_unhealthy_param).mark_bar().encode(
    x="percentage:Q",
    y="year:N",
    color="defining_parameter:N"
)

In [50]:
# PM2.5 by time of day (national average)
hourly_pm2["year"] = hourly_pm2["date_local"].dt.year
us_avg_pm2_by_hour = hourly_pm2.groupby(["hour"])["sample_measurement"].mean().reset_index()
us_avg_pm2_by_hour = us_avg_pm2_by_hour.compute()

In [54]:
alt.Chart(us_avg_pm2_by_hour).mark_bar().encode(
    x=alt.X("hour:N"),
    y="sample_measurement:Q"
).properties(
    title="PM2.5 by time of day (national average)"
)

In [55]:
# Yearly average concentration for PM2.5
us_avg_pm2_by_year = hourly_pm2.groupby(["year"])["sample_measurement"].mean().reset_index()
us_avg_pm2_by_year = us_avg_pm2_by_year.compute()

In [56]:
alt.Chart(us_avg_pm2_by_year).mark_bar().encode(
    x=alt.X("year:N"),
    y="sample_measurement:Q"
).properties(
    title="Yearly average concentration for PM2.5"
)

In [83]:
# Asthma prevalence prediction: AQI (annual avg, #days unhealthy), pm2.5 (annual avg, #days > 15), #ozone, county, year
county_avg_aqi = daily_aqi.groupby(["year", "fips"])["aqi"].mean().reset_index()
df = county_days_unhealthy.merge(county_avg_aqi, on=["year", "fips"], how="left")
df = df.rename(columns={
    "unhealthy": "aqi_days_unhealthy"})

county_avg_pm25 = hourly_pm2.groupby(["year", "fips"])["sample_measurement"].mean().reset_index()
county_days_pm25_unhealthy = filtered_hourly.groupby(["date_local", "year", "fips"])["sample_measurement"].mean().reset_index()
county_days_pm25_unhealthy["unhealthy"] = (county_days_pm25_unhealthy["sample_measurement"] >= 15).astype(float)
county_days_pm25_unhealthy = county_days_pm25_unhealthy.groupby(["year", "fips"])["unhealthy"].count().reset_index()

df = df.merge(county_avg_pm25, on=["year", "fips"], how="inner")
df = df.merge(county_days_pm25_unhealthy, on=["year", "fips"], how="inner")
df = df.rename(columns={
    "unhealthy": "pm25_days_unhealthy",
    "sample_measurement": "avg_pm25"
})

df = df.merge(asthma, on=["fips", "year"], how="inner")

In [64]:
df.head(5)

Unnamed: 0,fips,year,aqi_days_unhealthy,aqi,avg_pm25,pm25_days_unhealthy,percent
0,1113,2018,0,52.0,8.625,8,11.3
1,4021,2018,1,107.5,7.409091,20,10.7
2,6009,2018,0,49.0,14.5,4,9.9
3,6015,2018,0,17.0,6.6,2,10.5
4,6019,2018,0,63.0,12.245833,23,10.2


In [84]:
df.compute()

Unnamed: 0,fips,year,aqi_days_unhealthy,aqi,avg_pm25,pm25_days_unhealthy,percent
0,01113,2018,0,52.0,8.625000,8,11.3
1,04021,2018,1,107.5,7.409091,20,10.7
2,06009,2018,0,49.0,14.500000,4,9.9
3,06015,2018,0,17.0,6.600000,2,10.5
4,06019,2018,0,63.0,12.245833,23,10.2
...,...,...,...,...,...,...,...
543,55111,2021,0,47.0,6.220000,15,10.4
544,55119,2021,0,27.0,5.615789,18,10.7
545,56005,2021,0,27.0,7.940000,5,9.5
546,56037,2021,0,50.0,3.618182,11,9.5


In [72]:
# Environmental justice (race+income)
county_stats = majority_race.merge(county_income, on="fips", how="inner")
county_aqi_ri = county_avg_aqi.merge(county_stats, on="fips", how="inner").compute()
aqi_by_race = county_aqi_ri.groupby("race")["aqi"].mean().reset_index()
aqi_by_race

Unnamed: 0,race,aqi
0,2plus,37.571429
1,asian,51.7
2,black,43.028571
3,native,16.428571
4,other,59.944444
5,white,41.74165


In [93]:
# AQI by race
alt.Chart(county_aqi_ri).mark_boxplot(extent='min-max').encode(
    y='race:N',
    x='aqi:Q'
)

In [99]:
aqi_by_income = county_aqi_ri.groupby("median_income")["aqi"].mean().reset_index()
aqi_by_income["income_quantile"] = pd.qcut(aqi_by_income["median_income"], q=10, labels=range(1, 11))
aqi_by_income

Unnamed: 0,median_income,aqi,income_quantile
0,18827.0,40.000000,1
1,24307.0,19.000000,1
2,30113.0,31.500000,1
3,30729.0,63.000000,1
4,32403.0,40.000000,1
...,...,...,...
703,146982.0,42.833333,10
704,150113.0,22.000000,10
705,156000.0,73.250000,10
706,159674.0,39.000000,10


In [100]:
# AQI by income
alt.Chart(aqi_by_income).mark_boxplot(extent='min-max').encode(
    y='income_quantile:N',
    x='aqi:Q'
)