In [1]:
import dask.dataframe as dd
import numpy as np
import pyarrow
import seaborn as sns
import matplotlib.pyplot as plt

In [2]:
import dask
print("Dask version:", dask.__version__)

Dask version: 2025.7.0


In [298]:
ddf = dd.read_parquet('../data/us_accidents.parquet')
ddf.head()

Unnamed: 0,ID,Source,Severity,Start_Time,End_Time,Start_Lat,Start_Lng,End_Lat,End_Lng,Distance(mi),...,Roundabout,Station,Stop,Traffic_Calming,Traffic_Signal,Turning_Loop,Sunrise_Sunset,Civil_Twilight,Nautical_Twilight,Astronomical_Twilight
0,A-1,Source2,3,2016-02-08 05:46:00,2016-02-08 11:00:00,39.865147,-84.058723,,,0.01,...,False,False,False,False,False,False,Night,Night,Night,Night
1,A-2,Source2,2,2016-02-08 06:07:59,2016-02-08 06:37:59,39.928059,-82.831184,,,0.01,...,False,False,False,False,False,False,Night,Night,Night,Day
2,A-3,Source2,2,2016-02-08 06:49:27,2016-02-08 07:19:27,39.063148,-84.032608,,,0.01,...,False,False,False,False,True,False,Night,Night,Day,Day
3,A-4,Source2,3,2016-02-08 07:23:34,2016-02-08 07:53:34,39.747753,-84.205582,,,0.01,...,False,False,False,False,False,False,Night,Day,Day,Day
4,A-5,Source2,2,2016-02-08 07:39:07,2016-02-08 08:09:07,39.627781,-84.188354,,,0.01,...,False,False,False,False,True,False,Day,Day,Day,Day


In [299]:
ddf.info()

<class 'dask.dataframe.dask_expr.DataFrame'>
Columns: 46 entries, ID to Astronomical_Twilight
dtypes: bool(13), float64(12), int64(1), string(20)

In [300]:
ddf.columns

Index(['ID', 'Source', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat',
       'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi)', 'Description',
       'Street', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone',
       'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)',
       'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction',
       'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity',
       'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway',
       'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal',
       'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight',
       'Astronomical_Twilight'],
      dtype='object')

In [301]:
ddf = ddf.drop(columns=['ID', 'Source', 'Distance(mi)', 'End_Time', 'Duration', 'End_Lat', 'End_Lng', 'Description','Country', 'Street', 'Weather_Timestamp', 'Timezone', 'Zipcode'], errors='ignore')

In [302]:
print(ddf.dtypes)
print(ddf.shape)

Severity                           int64
Start_Time               string[pyarrow]
Start_Lat                        float64
Start_Lng                        float64
City                     string[pyarrow]
County                   string[pyarrow]
State                    string[pyarrow]
Airport_Code             string[pyarrow]
Temperature(F)                   float64
Wind_Chill(F)                    float64
Humidity(%)                      float64
Pressure(in)                     float64
Visibility(mi)                   float64
Wind_Direction           string[pyarrow]
Wind_Speed(mph)                  float64
Precipitation(in)                float64
Weather_Condition        string[pyarrow]
Amenity                             bool
Bump                                bool
Crossing                            bool
Give_Way                            bool
Junction                            bool
No_Exit                             bool
Railway                             bool
Roundabout      

In [303]:
ddf['Start_Time'] = dd.to_datetime(ddf['Start_Time'], format='mixed')

In [304]:
ddf.columns

Index(['Severity', 'Start_Time', 'Start_Lat', 'Start_Lng', 'City', 'County',
       'State', 'Airport_Code', 'Temperature(F)', 'Wind_Chill(F)',
       'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction',
       'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity',
       'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway',
       'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal',
       'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight',
       'Astronomical_Twilight'],
      dtype='object')

In [305]:
ddf_airports = dd.read_csv('../data/data-world-us-airports.csv',
                           dtype={'continent': 'object', 
                                   'elevation_ft': 'float64', 
                                   'iata_code': 'object', 
                                   'icao_code': 'object'}
                                   )

In [306]:
ddf_airports = ddf_airports.drop(columns=['type', 'elevation_ft', 'continent', 'iso_country', 'iso_region', 'municipality', 'icao_code', 'iata_code'], errors='ignore').compute()

In [307]:
ddf_airports.head()

Unnamed: 0,ident,name,gps_code,local_code,coordinates
0,00A,Total RF Heliport,K00A,00A,"40.070985, -74.933689"
1,00AA,Aero B Ranch Airport,00AA,00AA,"38.704022, -101.473911"
2,00AK,Lowell Field,00AK,00AK,"59.947733, -151.692524"
3,00AL,Epps Airpark,00AL,00AL,"34.86479949951172, -86.77030181884766"
4,00AN,Katmai Lodge Airport,00AN,00AN,"59.093287, -156.456699"


In [308]:
# Split coordinates
ddf_airports[['latitude', 'longitude']] = ddf_airports['coordinates'].str.split(',', expand=True)
ddf_airports['latitude'] = ddf_airports['latitude'].astype(float)
ddf_airports['longitude'] = ddf_airports['longitude'].astype(float)
ddf_airports = ddf_airports.drop(columns=['coordinates'], errors='ignore')

In [309]:
ddf_airports = ddf_airports.rename(columns={'ident': 'Airport_Code', 'name': 'airport_name'})

In [310]:
ddf_airports.head()

Unnamed: 0,Airport_Code,airport_name,gps_code,local_code,latitude,longitude
0,00A,Total RF Heliport,K00A,00A,40.070985,-74.933689
1,00AA,Aero B Ranch Airport,00AA,00AA,38.704022,-101.473911
2,00AK,Lowell Field,00AK,00AK,59.947733,-151.692524
3,00AL,Epps Airpark,00AL,00AL,34.864799,-86.770302
4,00AN,Katmai Lodge Airport,00AN,00AN,59.093287,-156.456699


In [311]:
ddf = ddf.merge(ddf_airports, how='left', left_on='Airport_Code', right_on='Airport_Code', suffixes=('', '_airport'))

In [312]:
ddf.head()

Unnamed: 0,Severity,Start_Time,Start_Lat,Start_Lng,City,County,State,Airport_Code,Temperature(F),Wind_Chill(F),...,Turning_Loop,Sunrise_Sunset,Civil_Twilight,Nautical_Twilight,Astronomical_Twilight,airport_name,gps_code,local_code,latitude,longitude
0,3,2016-02-08 05:46:00,39.865147,-84.058723,Dayton,Montgomery,OH,KFFO,36.9,,...,False,Night,Night,Night,Night,Wright-Patterson Air Force Base,KFFO,FFO,39.826099,-84.048302
1,2,2016-02-08 06:07:59,39.928059,-82.831184,Reynoldsburg,Franklin,OH,KCMH,37.9,,...,False,Night,Night,Night,Day,,,,,
2,2,2016-02-08 06:49:27,39.063148,-84.032608,Williamsburg,Clermont,OH,KI69,36.0,33.3,...,False,Night,Night,Day,Day,Clermont County Airport,KI69,I69,39.0784,-84.210197
3,3,2016-02-08 07:23:34,39.747753,-84.205582,Dayton,Montgomery,OH,KDAY,35.1,31.0,...,False,Night,Day,Day,Day,James M Cox Dayton International Airport,KDAY,DAY,39.902401,-84.219398
4,2,2016-02-08 07:39:07,39.627781,-84.188354,Dayton,Montgomery,OH,KMGY,36.0,33.3,...,False,Day,Day,Day,Day,Dayton-Wright Brothers Airport,KMGY,MGY,39.589001,-84.224899


In [313]:
ddf[ddf['airport_name'].isna() & (~ddf['Airport_Code'].isna())].compute()

Unnamed: 0,Severity,Start_Time,Start_Lat,Start_Lng,City,County,State,Airport_Code,Temperature(F),Wind_Chill(F),...,Turning_Loop,Sunrise_Sunset,Civil_Twilight,Nautical_Twilight,Astronomical_Twilight,airport_name,gps_code,local_code,latitude,longitude
1,2,2016-02-08 06:07:59,39.928059,-82.831184,Reynoldsburg,Franklin,OH,KCMH,37.9,,...,False,Night,Night,Night,Day,,,,,
6096,2,2016-12-07 21:30:53,38.642143,-121.162064,Rancho Cordova,Sacramento,CA,KMHR,42.8,40.1,...,False,Night,Night,Night,Night,,,,,
6722,2,2016-12-10 20:10:47,37.563786,-122.306152,San Mateo,San Mateo,CA,KSQL,57.2,,...,False,Night,Night,Night,Night,,,,,
8415,2,2016-12-23 17:08:55,37.848099,-122.488701,Sausalito,Marin,CA,KSFO,50.0,,...,False,Night,Day,Day,Day,,,,,
15746,2,2016-11-08 11:54:51,38.590389,-121.785675,Davis,Yolo,CA,KEDU,69.8,,...,False,Day,Day,Day,Day,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1709292,4,2019-08-23 17:01:58,40.374860,-120.399344,Susanville,Lassen,CA,KRTS,90.0,90.0,...,False,Day,Day,Day,Day,,,,,
1709293,4,2019-08-23 17:01:58,40.370829,-120.400948,Susanville,Lassen,CA,KRTS,90.0,90.0,...,False,Day,Day,Day,Day,,,,,
1709297,3,2019-08-23 04:04:48,34.075790,-118.276680,Los Angeles,Los Angeles,CA,KCQT,67.0,67.0,...,False,Night,Night,Night,Night,,,,,
1709301,2,2019-08-23 12:52:31,34.023790,-118.276390,Los Angeles,Los Angeles,CA,KCQT,81.0,81.0,...,False,Day,Day,Day,Day,,,,,


In [314]:
print(type(ddf['Airport_Code']))

<class 'dask.dataframe.dask_expr._collection.Series'>


In [315]:
# Manual corrections
airport_corrections = {
    'KCQT': {
        'airport_name': 'Whiteman Airport',
        'latitude': 34.2598,
        'longitude': -118.4119
    },
    'KMCJ': {
        'airport_name': 'William P Hobby Airport',
        'latitude': 29.6459,
        'longitude': -95.2769
    },
    'KATT': {
        'airport_name': 'Austin–Bergstrom International Airport',
        'latitude': 30.197535,
        'longitude': -97.662015
    },
    'KJRB': {
        'airport_name': 'Downtown Manhattan Heliport',
        'latitude': 40.700711,
        'longitude': -74.008573
    },
    'KDMH': {
        'airport_name': 'Baltimore/Washington International Airport',
        'latitude': 39.1718,
        'longitude': -76.6677
    }
}

In [316]:
def apply_corrections(df):
    for code, info in airport_corrections.items():
        mask = df['Airport_Code'] == code
        for col, value in info.items():
            df.loc[mask, col] = value
    return df

In [317]:
ddf = ddf.map_partitions(apply_corrections)

In [318]:
print(ddf[ddf['airport_name'].isna()]['Airport_Code'].value_counts(sort=True).compute())

Airport_Code
KNYC    24649
K3A6    19046
KAUD    17034
KMWS    12349
KNKA    11155
        ...  
KWVL        1
KWYS        1
KY19        1
KY22        1
KY50        1
Name: count, Length: 1354, dtype: int64[pyarrow]


In [319]:
conditional_airport_data = {
    'KNYC': {
        'Kings': {
            'airport_name': 'John F. Kennedy International Airport (JFK)',
            'latitude': 40.64013,
            'longitude': -73.77651
        },
        'New York': {
            'airport_name': 'LaGuardia Airport (LGA)',
            'latitude': 40.7769,
            'longitude': -73.8740
        },
        'Hudson': {
            'airport_name': 'Newark Liberty International Airport (EWR)',
            'latitude': 40.6925,
            'longitude': -74.1687
        },
        'Queens': {
            'airport_name': 'John F. Kennedy International Airport (JFK)',
            'latitude': 40.64013,
            'longitude': -73.77651
        },
        'Bergen': {
            'airport_name': 'Teterboro Airport (TEB)',
            'latitude': 40.8502,
            'longitude': -74.0609
        }
    }
}

In [320]:
def apply_conditional_corrections(df):
    for code, county_map in conditional_airport_data.items():
        for county, info in county_map.items():
            mask = (df['Airport_Code'] == code) & (df['County'] == county)
            for col, value in info.items():
                df.loc[mask, col] = value
    return df

In [321]:
ddf = ddf.map_partitions(apply_conditional_corrections)

In [322]:
ddf[ddf['airport_name'].isna() & (~ddf['Airport_Code'].isna())].compute()

Unnamed: 0,Severity,Start_Time,Start_Lat,Start_Lng,City,County,State,Airport_Code,Temperature(F),Wind_Chill(F),...,Turning_Loop,Sunrise_Sunset,Civil_Twilight,Nautical_Twilight,Astronomical_Twilight,airport_name,gps_code,local_code,latitude,longitude
1,2,2016-02-08 06:07:59,39.928059,-82.831184,Reynoldsburg,Franklin,OH,KCMH,37.9,,...,False,Night,Night,Night,Day,,,,,
6096,2,2016-12-07 21:30:53,38.642143,-121.162064,Rancho Cordova,Sacramento,CA,KMHR,42.8,40.1,...,False,Night,Night,Night,Night,,,,,
6722,2,2016-12-10 20:10:47,37.563786,-122.306152,San Mateo,San Mateo,CA,KSQL,57.2,,...,False,Night,Night,Night,Night,,,,,
8415,2,2016-12-23 17:08:55,37.848099,-122.488701,Sausalito,Marin,CA,KSFO,50.0,,...,False,Night,Day,Day,Day,,,,,
15746,2,2016-11-08 11:54:51,38.590389,-121.785675,Davis,Yolo,CA,KEDU,69.8,,...,False,Day,Day,Day,Day,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1709151,2,2019-08-23 01:59:56,42.430325,-123.252602,Grants Pass,Josephine,OR,KSXT,58.0,58.0,...,False,Night,Night,Night,Night,,,,,
1709202,2,2019-08-23 12:20:38,42.437790,-123.285592,Grants Pass,Josephine,OR,KSXT,70.0,70.0,...,False,Day,Day,Day,Day,,,,,
1709213,2,2019-08-23 12:52:07,45.460548,-123.803174,Tillamook,Tillamook,OR,KS47,70.0,70.0,...,False,Day,Day,Day,Day,,,,,
1709292,4,2019-08-23 17:01:58,40.374860,-120.399344,Susanville,Lassen,CA,KRTS,90.0,90.0,...,False,Day,Day,Day,Day,,,,,


In [323]:
ddf[ddf['Temperature(F)'].isna()].compute()

Unnamed: 0,Severity,Start_Time,Start_Lat,Start_Lng,City,County,State,Airport_Code,Temperature(F),Wind_Chill(F),...,Turning_Loop,Sunrise_Sunset,Civil_Twilight,Nautical_Twilight,Astronomical_Twilight,airport_name,gps_code,local_code,latitude,longitude
601,3,2016-03-11 07:28:40,41.323063,-82.616463,Milan,Erie,OH,KPCW,,,...,False,Day,Day,Day,Day,Carl R Keller Field,KPCW,PCW,41.515647,-82.868328
1734,2,2016-06-30 08:00:04,38.386948,-121.242165,Wilton,Sacramento,CA,KMHR,,,...,False,Day,Day,Day,Day,Sacramento Mather Airport,KMHR,MHR,38.554744,-121.297989
1957,2,2016-07-03 03:54:45,37.902588,-122.514198,Mill Valley,Marin,CA,KDVO,,,...,False,Night,Night,Night,Night,Marin County Airport - Gnoss Field,KDVO,DVO,38.143600,-122.556000
1968,2,2016-07-03 08:41:05,37.994217,-122.532051,San Rafael,Marin,CA,KDVO,,,...,False,Day,Day,Day,Day,Marin County Airport - Gnoss Field,KDVO,DVO,38.143600,-122.556000
1973,2,2016-07-03 11:59:28,37.993839,-122.532051,San Rafael,Marin,CA,KDVO,,,...,False,Day,Day,Day,Day,Marin County Airport - Gnoss Field,KDVO,DVO,38.143600,-122.556000
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1709096,2,2019-08-23 12:23:00,40.705620,-111.546730,Park City,Summit,UT,K36U,,,...,False,Day,Day,Day,Day,,,,,
1709102,2,2019-08-23 13:23:00,40.792697,-111.450383,Coalville,Summit,UT,K36U,,,...,False,Day,Day,Day,Day,,,,,
1709157,2,2019-08-23 07:01:53,47.244171,-122.335366,Milton,Pierce,WA,KPLU,,,...,False,Day,Day,Day,Day,Pierce County-Thun Field,KPLU,PLU,47.103901,-122.287003
1709190,2,2019-08-23 17:19:55,44.119763,-121.317222,Bend,Deschutes,OR,,,,...,False,Day,Day,Day,Day,Los Alamitos Army Air Field,KSLI,SLI,33.790001,-118.052002


In [324]:
ddf = ddf.dropna(subset=['Airport_Code'])

In [325]:
ddf['airport_name'] = ddf['airport_name'].fillna('Unknown')

In [326]:
ddf.head()

Unnamed: 0,Severity,Start_Time,Start_Lat,Start_Lng,City,County,State,Airport_Code,Temperature(F),Wind_Chill(F),...,Turning_Loop,Sunrise_Sunset,Civil_Twilight,Nautical_Twilight,Astronomical_Twilight,airport_name,gps_code,local_code,latitude,longitude
0,3,2016-02-08 05:46:00,39.865147,-84.058723,Dayton,Montgomery,OH,KFFO,36.9,,...,False,Night,Night,Night,Night,Wright-Patterson Air Force Base,KFFO,FFO,39.826099,-84.048302
1,2,2016-02-08 06:07:59,39.928059,-82.831184,Reynoldsburg,Franklin,OH,KCMH,37.9,,...,False,Night,Night,Night,Day,Unknown,,,,
2,2,2016-02-08 06:49:27,39.063148,-84.032608,Williamsburg,Clermont,OH,KI69,36.0,33.3,...,False,Night,Night,Day,Day,Clermont County Airport,KI69,I69,39.0784,-84.210197
3,3,2016-02-08 07:23:34,39.747753,-84.205582,Dayton,Montgomery,OH,KDAY,35.1,31.0,...,False,Night,Day,Day,Day,James M Cox Dayton International Airport,KDAY,DAY,39.902401,-84.219398
4,2,2016-02-08 07:39:07,39.627781,-84.188354,Dayton,Montgomery,OH,KMGY,36.0,33.3,...,False,Day,Day,Day,Day,Dayton-Wright Brothers Airport,KMGY,MGY,39.589001,-84.224899


In [327]:
ddf.info()

<class 'dask.dataframe.dask_expr.DataFrame'>
Columns: 39 entries, Severity to longitude
dtypes: datetime64[ns](1), object(13), float64(12), string(13)

In [328]:
ddf = ddf.reset_index(drop=True)

In [329]:
ddf.head()

Unnamed: 0,Severity,Start_Time,Start_Lat,Start_Lng,City,County,State,Airport_Code,Temperature(F),Wind_Chill(F),...,Turning_Loop,Sunrise_Sunset,Civil_Twilight,Nautical_Twilight,Astronomical_Twilight,airport_name,gps_code,local_code,latitude,longitude
0,3,2016-02-08 05:46:00,39.865147,-84.058723,Dayton,Montgomery,OH,KFFO,36.9,,...,False,Night,Night,Night,Night,Wright-Patterson Air Force Base,KFFO,FFO,39.826099,-84.048302
1,2,2016-02-08 06:07:59,39.928059,-82.831184,Reynoldsburg,Franklin,OH,KCMH,37.9,,...,False,Night,Night,Night,Day,Unknown,,,,
2,2,2016-02-08 06:49:27,39.063148,-84.032608,Williamsburg,Clermont,OH,KI69,36.0,33.3,...,False,Night,Night,Day,Day,Clermont County Airport,KI69,I69,39.0784,-84.210197
3,3,2016-02-08 07:23:34,39.747753,-84.205582,Dayton,Montgomery,OH,KDAY,35.1,31.0,...,False,Night,Day,Day,Day,James M Cox Dayton International Airport,KDAY,DAY,39.902401,-84.219398
4,2,2016-02-08 07:39:07,39.627781,-84.188354,Dayton,Montgomery,OH,KMGY,36.0,33.3,...,False,Day,Day,Day,Day,Dayton-Wright Brothers Airport,KMGY,MGY,39.589001,-84.224899


In [330]:
ddf.info()

<class 'dask.dataframe.dask_expr.DataFrame'>
Columns: 39 entries, Severity to longitude
dtypes: datetime64[ns](1), object(13), float64(12), string(13)

In [331]:
weather_cols = [
    'Temperature(F)','Wind_Chill(F)', 'Humidity(%)', 'Pressure(in)', 
    'Visibility(mi)', 'Wind_Speed(mph)', 'Precipitation(in)'
]

In [333]:
ddf.columns

Index(['Severity', 'Start_Time', 'Start_Lat', 'Start_Lng', 'City', 'County',
       'State', 'Airport_Code', 'Temperature(F)', 'Wind_Chill(F)',
       'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction',
       'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity',
       'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway',
       'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal',
       'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight',
       'Astronomical_Twilight', 'airport_name', 'gps_code', 'local_code',
       'latitude', 'longitude'],
      dtype='object')

In [332]:
global_medians = {}
for col in weather_cols:
    try:
        global_medians[col] = ddf[col].quantile(0.5).compute()
    except:
        global_medians[col] = ddf[col].dropna().compute().median()

# Apply both group-wise and global median filling in sequence
for col in weather_cols:
    # First: group-wise median filling
    ddf[col] = ddf.groupby('Airport_Code')[col].transform(
        lambda x: x.fillna(x.median()),
        meta=(col, 'f8')
    )
    
    # Second: global median for any remaining NaNs
    ddf[col] = ddf[col].fillna(global_medians[col])

# Check if any missing values are left
print("Missing values after filling:")
print(ddf[weather_cols].isnull().sum().compute())

Missing values after filling:


ValueError: cannot reindex on an axis with duplicate labels

In [295]:
ddf_f = ddf.reset_index(drop=True)

In [296]:
ddf_f.head()

ValueError: cannot reindex on an axis with duplicate labels

In [None]:
print("Missing values after filling:")
print(ddf[weather_cols].isnull().sum().compute())