# Integrated Urban Data Pipeline: Linking Traffic, Weather, and Air Quality in Chicago

## Project Overview

Building a repeatable, end-to-end data pipeline that links the City of Chicago's traffic, weather, and air quality datasets is the main goal of our project. Instead of creating a predictive model, the goal is to show how extensive urban data can be ethically gathered, processed, and combined to aid in environmental analysis. This is in line with the course's focus on workflow automation, quality assessment, metadata, and data lifecycle management.

The main objectives are:

- To create a single, cohesive dataset by combining information from several reliable, independent sources.
- Automating data transformation, integration, and cleaning procedures will enable dependable reruns.
- To assess the relationship between variations in citywide air quality and weather and traffic patterns.
- Through workflow automation and clear GitHub documentation, we aim to demonstrate ethical data handling, transparent processing steps, and full reproducibility.

### *Rishabh add: Data sources + acquisition details + ethical data handling and how you got the data ethically*

## Initial Storage Strategy

The Air quality and Weather datasets size is very managable. However, because the traffic dataset contains 78+ million rows, a careful storage plan is required. We are adopting a hybrid approach using:
- Chunked Parquet Storage (for large data)
- Direct CSV Storage (for smaller datasets)
- Metadata + Naming Conventions

For naming conventions we will focus on that later on for the final submission. Currently we want to just orginize everything together. For example a short metadata file (.json) describing units, schema, and source. Also adding strict lowercase, snake_case naming

In [29]:
import pandas as pd

This is just air and weather data. For Air Quality I only pulled the ones that were 2024 in chicago as that is all I want to focus on

*Data lifecycle collect data*

In [63]:
dfAir = pd.read_csv("Air.csv")
AirChicago = dfAir[dfAir['City'] == 'Chicago']
AirChicago["Date"] = pd.to_datetime(AirChicago["Date"], errors="coerce")
AirChicago = AirChicago[AirChicago["Date"].dt.year == 2024]
dfWeather = pd.read_csv("Weather.csv")
dfWeather = dfWeather[dfWeather["YEAR"]== 2024]

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  AirChicago["Date"] = pd.to_datetime(AirChicago["Date"], errors="coerce")


In [64]:
AirChicago

Unnamed: 0,Pm2.5,Pm10,No2,So2,Co,Aqi,Date,City
13881,156.82,211.81,14.55,18.75,4.77,357,2024-01-01,Chicago
13882,179.38,151.95,43.44,35.43,0.58,338,2024-01-02,Chicago
13883,158.78,270.43,96.99,7.85,0.22,115,2024-01-03,Chicago
13884,198.75,255.81,49.38,43.27,4.72,67,2024-01-04,Chicago
13885,195.22,139.75,97.97,42.26,5.44,127,2024-01-05,Chicago
...,...,...,...,...,...,...,...,...
14242,101.51,256.44,85.75,3.18,9.65,337,2024-12-27,Chicago
14243,15.98,292.72,58.46,19.69,4.00,247,2024-12-28,Chicago
14244,117.67,244.84,76.69,12.44,6.63,101,2024-12-29,Chicago
14245,198.33,26.22,8.20,31.66,4.30,31,2024-12-30,Chicago


In [65]:
dfWeather

Unnamed: 0,YEAR,MO,DY,HR,TEMP,PRCP,HMDT,WND_SPD,ATM_PRESS,REF
24108,2024,1,1,0,2.69,0.05,84.88,8.76,99.90,202401
24109,2024,1,1,1,2.63,0.05,84.19,8.50,99.96,202401
24110,2024,1,1,2,2.36,0.03,83.62,8.00,100.01,202401
24111,2024,1,1,3,2.14,0.03,82.94,7.51,100.05,202401
24112,2024,1,1,4,1.73,0.03,83.19,6.78,100.10,202401
...,...,...,...,...,...,...,...,...,...,...
32887,2024,12,31,19,1.96,0.73,87.12,8.10,99.07,202412
32888,2024,12,31,20,1.72,0.70,87.33,8.45,99.14,202412
32889,2024,12,31,21,1.36,0.59,87.07,8.52,99.19,202412
32890,2024,12,31,22,1.00,0.37,86.26,8.50,99.24,202412


This is the traffic dataset. It's so large it needs to be split into multiple chunks. It was split into a total of 78 datasets.

In [None]:
import pandas as pd

reader = pd.read_csv(
    "big2.csv",
    engine="c",
    dtype=str,
    chunksize=1_000_000,
    on_bad_lines="skip"
)

for i, chunk in enumerate(reader):
    chunk.to_parquet(f"big2_part_{i:03}.parquet", index=False)


In [7]:
import os

parquets = [f for f in os.listdir() if f.endswith(".parquet")]
print(len(parquets), "parquet files found")
print(parquets[:10])


78 parquet files found
['big2_part_033.parquet', 'big2_part_023.parquet', 'big2_part_051.parquet', 'big2_part_041.parquet', 'big2_part_048.parquet', 'big2_part_058.parquet', 'big2_part_077.parquet', 'big2_part_067.parquet', 'big2_part_015.parquet', 'big2_part_005.parquet']


In [80]:
dfBig = pd.read_parquet("big2_part_000.parquet")

I just combined everything into one dataset. What I did was I just took the traffic for a specfic day (there were multiple per day) and did the middle observation (floored if it was an even number)

*Data managment* (storing data properly)

In [35]:
from glob import glob

parts = sorted(glob("big2_part_*.parquet"))
out = []

for p in parts:
    x = pd.read_parquet(p)
    x["TIME"] = pd.to_datetime(
        x["TIME"],
        format="%m/%d/%Y %I:%M:%S %p",
        errors="coerce"
    )
    x = x.dropna(subset=["TIME"])
    x["DATE"] = x["TIME"].dt.date
    x = x.sort_values(["STREET", "DATE", "TIME"])
    
    g = x.groupby(["STREET", "DATE"])
    n = g["TIME"].transform("size")
    pos = g.cumcount()
    k = n // 2
    
    out.append(x[pos.eq(k)])

one_per_street_day = pd.concat(out, ignore_index=True).drop(columns=["DATE"])


looking at these there seems to be a mix ranging from early morning, noon, and night

In [66]:
one_per_street_day

Unnamed: 0,TIME,SEGMENT_ID,SPEED,STREET,DIRECTION,FROM_STREET,TO_STREET,LENGTH,STREET_HEADING,COMMENTS,...,HOUR,DAY_OF_WEEK,MONTH,RECORD_ID,START_LATITUDE,START_LONGITUDE,END_LATITUDE,END_LONGITUDE,START_LOCATION,END_LOCATION
0,2024-03-05 09:20:54,484,34,111th,WB,Monterey,Western,0.54,W,,...,9,3,3,0484-202403051520,41.6920630313,-87.670578725,41.6919091649,-87.6811653381,POINT (-87.670578725 41.6920630313),POINT (-87.6811653381 41.6919091649)
1,2024-06-11 18:31:10,484,31,111th,WB,Monterey,Western,0.54,W,,...,18,3,6,0484-202406112331,41.6920630313,-87.670578725,41.6919091649,-87.6811653381,POINT (-87.670578725 41.6920630313),POINT (-87.6811653381 41.6919091649)
2,2024-06-12 12:01:20,484,27,111th,WB,Monterey,Western,0.54,W,,...,12,4,6,0484-202406121701,41.6920630313,-87.670578725,41.6919091649,-87.6811653381,POINT (-87.670578725 41.6920630313),POINT (-87.6811653381 41.6919091649)
3,2024-06-13 12:01:19,477,29,111th,EB,Western,Monterey,0.54,W,,...,12,5,6,0477-202406131701,41.6916347471,-87.6811681063,41.6917886141,-87.6705815418,POINT (-87.6811681063 41.6916347471),POINT (-87.6705815418 41.6917886141)
4,2024-06-14 12:01:19,477,30,111th,EB,Western,Monterey,0.54,W,,...,12,6,6,0477-202406141701,41.6916347471,-87.6811681063,41.6917886141,-87.6705815418,POINT (-87.6811681063 41.6916347471),POINT (-87.6705815418 41.6917886141)
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
51015,2024-12-27 12:01:16,466,29,95th,WB,Halsted,Racine,0.50,W,,...,12,6,12,0466-202412271801,41.7215954811,-87.6432218261,41.7214266197,-87.6529423406,POINT (-87.6432218261 41.7215954811),POINT (-87.6529423406 41.7214266197)
51016,2024-12-28 12:01:15,466,29,95th,WB,Halsted,Racine,0.50,W,,...,12,7,12,0466-202412281801,41.7215954811,-87.6432218261,41.7214266197,-87.6529423406,POINT (-87.6432218261 41.7215954811),POINT (-87.6529423406 41.7214266197)
51017,2024-12-29 12:01:13,466,33,95th,WB,Halsted,Racine,0.50,W,,...,12,1,12,0466-202412291801,41.7215954811,-87.6432218261,41.7214266197,-87.6529423406,POINT (-87.6432218261 41.7215954811),POINT (-87.6529423406 41.7214266197)
51018,2024-12-30 12:01:17,466,-1,95th,WB,Halsted,Racine,0.50,W,,...,12,2,12,0466-202412301801,41.7215954811,-87.6432218261,41.7214266197,-87.6529423406,POINT (-87.6432218261 41.7215954811),POINT (-87.6529423406 41.7214266197)


Now comes the merging of the data:

In [71]:
traffic = one_per_street_day.copy()
traffic["TIME"] = pd.to_datetime(traffic["TIME"], errors="coerce")
traffic = traffic.dropna(subset=["TIME"])
traffic["DATE"] = traffic["TIME"].dt.date
traffic["HOUR"] = traffic["TIME"].dt.hour

In [72]:
weather = dfWeather.copy()
weather = weather.rename(columns={"MO": "MONTH", "DY": "DAY", "HR": "HOUR"})
weather["DATE"] = pd.to_datetime(
    dict(year=weather["YEAR"], month=weather["MONTH"], day=weather["DAY"])
).dt.date
weather = weather.drop(columns=["REF", "YEAR", "MONTH", "DAY"], errors="ignore")
weather = weather.sort_values(["DATE", "HOUR"]).drop_duplicates(["DATE", "HOUR"], keep="first")

In [73]:
air = AirChicago.copy()
air["Date"] = pd.to_datetime(air["Date"], errors="coerce").dt.date
air = air.dropna(subset=["Date"])
air = air.rename(columns={"Date": "DATE"})
air = air.drop(columns=["City"], errors="ignore")

In [115]:
traffic_weather = pd.merge(traffic, weather, on=["DATE", "HOUR"], how="left")
final_df = pd.merge(traffic_weather, air, on="DATE", how="left")
final_df = final_df[['TIME', 'SEGMENT_ID', 'SPEED', 'STREET', 'DIRECTION', 'FROM_STREET',
       'TO_STREET', 'LENGTH', 'STREET_HEADING', 'COMMENTS', 'BUS_COUNT',
       'MESSAGE_COUNT', 'RECORD_ID', 'START_LATITUDE', 'START_LONGITUDE', 
       'END_LATITUDE', 'END_LONGITUDE', 'START_LOCATION', 'END_LOCATION', 
       'TEMP', 'PRCP', 'HMDT', 'WND_SPD', 'ATM_PRESS', 'Pm2.5', 
       'Pm10', 'No2', 'So2', 'Co', 'Aqi']]

Now that I have merged data and removed duplicate columns lets move onto data cleaning

In [116]:
num_obj_cols = [
    "SPEED", "LENGTH", "BUS_COUNT", "MESSAGE_COUNT",
    "START_LATITUDE", "START_LONGITUDE",
    "END_LATITUDE", "END_LONGITUDE"
]

for c in num_obj_cols:
    final_df[c] = pd.to_numeric(final_df[c], errors="coerce")

str_cols = [
    "STREET", "DIRECTION", "FROM_STREET", "TO_STREET",
    "STREET_HEADING", "COMMENTS", "START_LOCATION", "END_LOCATION"
]

for c in str_cols:
    final_df[c] = (
        final_df[c]
        .astype("string")
        .str.strip()
    )

id_cols = ["SEGMENT_ID", "RECORD_ID"]
for c in id_cols:
    final_df[c] = final_df[c].astype("string").str.strip()

final_df["STREET"] = final_df["STREET"].str.upper()
final_df["FROM_STREET"] = final_df["FROM_STREET"].str.upper()
final_df["TO_STREET"] = final_df["TO_STREET"].str.upper()
final_df["DIRECTION"] = final_df["DIRECTION"].str.upper()

final_df = final_df[final_df["SPEED"].between(-2, 200) | final_df["SPEED"].isna()]
final_df = final_df[(final_df["Aqi"].isna()) | (final_df["Aqi"] >= 0)]

lat_ok = final_df["START_LATITUDE"].between(41.4, 42.1) | final_df["START_LATITUDE"].isna()
lon_ok = final_df["START_LONGITUDE"].between(-88.0, -87.3) | final_df["START_LONGITUDE"].isna()
final_df = final_df[lat_ok & lon_ok]

final_df = (
    final_df
    .drop_duplicates(subset=["TIME", "SEGMENT_ID"])
    .sort_values(["TIME", "SEGMENT_ID"])
    .reset_index(drop=True)
)

final_df

Unnamed: 0,TIME,SEGMENT_ID,SPEED,STREET,DIRECTION,FROM_STREET,TO_STREET,LENGTH,STREET_HEADING,COMMENTS,...,PRCP,HMDT,WND_SPD,ATM_PRESS,Pm2.5,Pm10,No2,So2,Co,Aqi
0,2024-03-05 09:20:35,154,-1,ASHLAND,SB,CHICAGO,GRAND,0.35,N,,...,0.0,88.31,6.43,99.35,174.02,274.85,30.09,12.75,2.37,21.0
1,2024-03-05 09:20:35,168,30,IRVING PARK,EB,CUMBERLAND,PACIFIC,0.50,W,IDOT Signals Possible,...,0.0,88.31,6.43,99.35,174.02,274.85,30.09,12.75,2.37,21.0
2,2024-03-05 09:20:35,172,17,55TH,WB,KEDZIE,CENTRAL PARK,0.50,W,,...,0.0,88.31,6.43,99.35,174.02,274.85,30.09,12.75,2.37,21.0
3,2024-03-05 09:20:35,178,23,GARFIELD,WB,HALSTED,RACINE,0.50,W,,...,0.0,88.31,6.43,99.35,174.02,274.85,30.09,12.75,2.37,21.0
4,2024-03-05 09:20:36,184,-1,87TH,EB,PULASKI,CENTRAL PARK,0.49,W,IDOT Signals Possible,...,0.0,88.31,6.43,99.35,174.02,274.85,30.09,12.75,2.37,21.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
51015,2025-11-12 11:20:55,1178,20,57TH,EB,STONY ISLAND,LAKE SHORE DR,0.37,E,,...,,,,,,,,,,
51016,2025-11-12 11:20:55,1182,20,BROADWAY,NB,HOLLYWOOD,DEVON,0.88,N,,...,,,,,,,,,,
51017,2025-11-12 11:20:55,1183,27,SHERIDAN,NB,DEVON,PRATT,0.52,N,,...,,,,,,,,,,
51018,2025-11-12 11:20:55,1188,23,CLARK,NB,ASHLAND,RIDGE,0.16,N,,...,,,,,,,,,,


### original dataset is 20GB and could not be uploaded to git so I downloaded the filtered dataframe

In [117]:
final_df.to_csv("final_dataset.csv", index=False)

### Data Lifecycle Alignment
Our project follows the full data lifecycle described in the course. After collecting raw traffic, weather, and air quality data, we entered the Data Processing and Preparation phase. This is the intermediary step between data collection and data analysis, and it is responsible for transforming heterogeneous, messy datasets into a structured form that is ready for analytics.

In this phase, we performed:

- datatype standardization (converting object fields to numeric or string)
- timestamp parsing and normalization of DATE and HOUR
- removal of invalid or impossible values (e.g., negative AQI, unrealistic speeds)
- geospatial sanity checks on latitude and longitude
- string normalization (uppercase formatting, trimming whitespace)
- deduplication of segment-time records
- integration across the three datasets using consistent time-based keys

This processing layer is essential in the data lifecycle because analytics cannot begin until the data is clean, consistent, and integrated. Only after completing this stage can we move into descriptive analysis and exploratory work.