In [1]:
!pip install pandas polars numpy scipy matplotlib seaborn cupy-cuda12x



In [2]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("usdot/flight-delays")

print("Path to dataset files:", path)

Path to dataset files: /home/nemo/.cache/kagglehub/datasets/usdot/flight-delays/versions/1


In [3]:
import cupy as cp
import pandas as pd
import numpy as np
import polars as pl
from skimpy import skim
import os
from matplotlib import pyplot as pyplot
import seaborn as sns

In [4]:
file_path_airlines = os.path.join(path, "airlines.csv")
file_path_airports = os.path.join(path, "airports.csv")
file_path_flights = os.path.join(path, "flights.csv")
airlines_df = pl.read_csv(file_path_airlines)
airports_df = pl.read_csv(file_path_airports)
flights = pl.scan_csv(file_path_flights)

print(flights.collect_schema())
print(airports_df.schema)
print(airlines_df.schema)

Schema({'YEAR': Int64, 'MONTH': Int64, 'DAY': Int64, 'DAY_OF_WEEK': Int64, 'AIRLINE': String, 'FLIGHT_NUMBER': Int64, 'TAIL_NUMBER': String, 'ORIGIN_AIRPORT': String, 'DESTINATION_AIRPORT': String, 'SCHEDULED_DEPARTURE': Int64, 'DEPARTURE_TIME': Int64, 'DEPARTURE_DELAY': Int64, 'TAXI_OUT': Int64, 'WHEELS_OFF': Int64, 'SCHEDULED_TIME': Int64, 'ELAPSED_TIME': Int64, 'AIR_TIME': Int64, 'DISTANCE': Int64, 'WHEELS_ON': Int64, 'TAXI_IN': Int64, 'SCHEDULED_ARRIVAL': Int64, 'ARRIVAL_TIME': Int64, 'ARRIVAL_DELAY': Int64, 'DIVERTED': Int64, 'CANCELLED': Int64, 'CANCELLATION_REASON': String, 'AIR_SYSTEM_DELAY': Int64, 'SECURITY_DELAY': Int64, 'AIRLINE_DELAY': Int64, 'LATE_AIRCRAFT_DELAY': Int64, 'WEATHER_DELAY': Int64})
Schema({'IATA_CODE': String, 'AIRPORT': String, 'CITY': String, 'STATE': String, 'COUNTRY': String, 'LATITUDE': Float64, 'LONGITUDE': Float64})
Schema({'IATA_CODE': String, 'AIRLINE': String})


In [5]:
skim(airlines_df)

In [6]:
skim(airports_df)

In [7]:
null_counts = (
    flights.select([
        pl.col(c).is_null().sum().alias(c) for c in flights.collect_schema().names()
    ]).collect()
)
null_counts

YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY
u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32
0,0,0,0,0,0,14721,0,0,0,86153,86153,89047,89047,6,105071,105071,0,92513,92513,0,92513,105071,0,0,5729195,4755640,4755640,4755640,4755640,4755640


In [8]:
airlines_df.head(10)

IATA_CODE,AIRLINE
str,str
"""UA""","""United Air Lines Inc."""
"""AA""","""American Airlines Inc."""
"""US""","""US Airways Inc."""
"""F9""","""Frontier Airlines Inc."""
"""B6""","""JetBlue Airways"""
"""OO""","""Skywest Airlines Inc."""
"""AS""","""Alaska Airlines Inc."""
"""NK""","""Spirit Air Lines"""
"""WN""","""Southwest Airlines Co."""
"""DL""","""Delta Air Lines Inc."""


In [9]:
airports_df.head(10)

IATA_CODE,AIRPORT,CITY,STATE,COUNTRY,LATITUDE,LONGITUDE
str,str,str,str,str,f64,f64
"""ABE""","""Lehigh Valley International Ai…","""Allentown""","""PA""","""USA""",40.65236,-75.4404
"""ABI""","""Abilene Regional Airport""","""Abilene""","""TX""","""USA""",32.41132,-99.6819
"""ABQ""","""Albuquerque International Sunp…","""Albuquerque""","""NM""","""USA""",35.04022,-106.60919
"""ABR""","""Aberdeen Regional Airport""","""Aberdeen""","""SD""","""USA""",45.44906,-98.42183
"""ABY""","""Southwest Georgia Regional Air…","""Albany""","""GA""","""USA""",31.53552,-84.19447
"""ACK""","""Nantucket Memorial Airport""","""Nantucket""","""MA""","""USA""",41.25305,-70.06018
"""ACT""","""Waco Regional Airport""","""Waco""","""TX""","""USA""",31.61129,-97.23052
"""ACV""","""Arcata Airport""","""Arcata/Eureka""","""CA""","""USA""",40.97812,-124.10862
"""ACY""","""Atlantic City International Ai…","""Atlantic City""","""NJ""","""USA""",39.45758,-74.57717
"""ADK""","""Adak Airport""","""Adak""","""AK""","""USA""",51.87796,-176.64603


In [10]:
cancelled_uniques = flights.select(
    pl.col('CANCELLED').unique()
).collect()
cancelled_uniques

CANCELLED
i64
0
1


In [11]:
cancelled_uniques_counts = flights.select(
    pl.col('CANCELLED').unique_counts()
).collect()
cancelled_uniques_counts

CANCELLED
u32
5729195
89884


In [12]:
diverted_uniques_counts = flights.select(
    pl.col('DIVERTED').unique_counts()
).collect()
diverted_uniques_counts

DIVERTED
u32
5803892
15187


In [13]:
cancelled_reasons_uniques = flights.select(
    pl.col('CANCELLATION_REASON').unique()
).collect()
cancelled_reasons_uniques

CANCELLATION_REASON
str
"""B"""
"""C"""
"""A"""
"""D"""
""


In [14]:
DELAY_COLS = ["DEPARTURE_DELAY", "ARRIVAL_DELAY"]
DELAY_COMP_COLS = [
    "AIR_SYSTEM_DELAY", "SECURITY_DELAY", "AIRLINE_DELAY",
    "LATE_AIRCRAFT_DELAY", "WEATHER_DELAY"
]
TIME_COLS = [
    "DEPARTURE_TIME", "ARRIVAL_TIME", "AIR_TIME",
    "WHEELS_OFF", "WHEELS_ON", "TAXI_IN", "TAXI_OUT"
]

In [15]:
def null_summary(lf: pl.LazyFrame, cols: list[str]) -> pl.DataFrame:
    total = lf.select(
        pl.len().alias("n")
        ).collect()[0, 0]
    counts = lf.select(
        [pl.col(c).is_null().sum().alias(c) for c in cols]
        ).collect()
    pct = counts.select([(pl.col(c) / total * 100).alias(f"{c}_null_pct") for c in cols])
    return counts.hstack(pct)

In [16]:
flown = flights.filter(
    pl.col('CANCELLED') == 0
)

In [17]:
flown_counts = flown.select(pl.len().alias("n_flown")).collect()
flown_counts


n_flown
u32
5729195


In [18]:
core_delay_nulls = null_summary(flown, DELAY_COLS)
core_delay_nulls

DEPARTURE_DELAY,ARRIVAL_DELAY,DEPARTURE_DELAY_null_pct,ARRIVAL_DELAY_null_pct
u32,u32,f64,f64
0,15187,0.0,0.265081


In [19]:
# Count how many null ARRIVAL_DELAY rows are cancelled/diverted
null_analysis = flights.filter(pl.col("ARRIVAL_DELAY").is_null()).group_by(["DIVERTED"]).len()

print(null_analysis.collect())

shape: (2, 2)
┌──────────┬───────┐
│ DIVERTED ┆ len   │
│ ---      ┆ ---   │
│ i64      ┆ u32   │
╞══════════╪═══════╡
│ 1        ┆ 15187 │
│ 0        ┆ 89884 │
└──────────┴───────┘


In [20]:
flights.filter(
    pl.col("DIVERTED") == 1
).group_by(["DIVERTED"]).len().collect()

DIVERTED,len
i64,u32
1,15187


In [21]:
flights = flights.with_columns(
    pl.col("ARRIVAL_DELAY").is_null().alias("ARRIVAL_DELAY_MISSING")
)

missing_summary = flights.group_by(["ARRIVAL_DELAY_MISSING"]).agg(
    pl.col("CANCELLED").mean().alias("cancel_rate"),
    pl.col("DIVERTED").mean().alias("diverted_rate"),
    pl.len().alias("row_count"),
).collect()
missing_summary

ARRIVAL_DELAY_MISSING,cancel_rate,diverted_rate,row_count
bool,f64,f64,u32
True,0.85546,0.14454,105071
False,0.0,0.0,5714008


In [22]:
flown_not_div = flown.filter(
    pl.col("DIVERTED") == 0,
)
flown_not_div_counts = flown_not_div.select(pl.len().alias("n_flown_not_diverted")).collect()
flown_not_div_counts

n_flown_not_diverted
u32
5714008


In [23]:
core_delay_nulls = null_summary(flown_not_div, DELAY_COLS)
core_delay_nulls

DEPARTURE_DELAY,ARRIVAL_DELAY,DEPARTURE_DELAY_null_pct,ARRIVAL_DELAY_null_pct
u32,u32,f64,f64
0,0,0.0,0.0


In [24]:
component_null_analysis = flown_not_div.with_columns(
    (pl.col("ARRIVAL_DELAY") > 0).alias("is_delayed"),
).group_by("is_delayed").agg([
     pl.col(c).is_null().sum().alias(f"{c}_nulls")
        for c in DELAY_COMP_COLS
]).collect()
component_null_analysis

is_delayed,AIR_SYSTEM_DELAY_nulls,SECURITY_DELAY_nulls,AIRLINE_DELAY_nulls,LATE_AIRCRAFT_DELAY_nulls,WEATHER_DELAY_nulls
bool,u32,u32,u32,u32,u32
False,3627112,3627112,3627112,3627112,3627112
True,1023457,1023457,1023457,1023457,1023457


In [25]:
component_null_summary_pct = (
    flown_not_div
    .with_columns((pl.col("ARRIVAL_DELAY") > 0).alias("delayed"))
    .group_by("delayed")
    .agg([
        (pl.col(c).is_null().sum() / pl.len() * 100).alias(f"{c}_null_pct")
        for c in DELAY_COMP_COLS
    ])
).collect()

print(component_null_summary_pct)


shape: (2, 6)
┌─────────┬─────────────────┬─────────────────┬─────────────────┬─────────────────┬────────────────┐
│ delayed ┆ AIR_SYSTEM_DELA ┆ SECURITY_DELAY_ ┆ AIRLINE_DELAY_n ┆ LATE_AIRCRAFT_D ┆ WEATHER_DELAY_ │
│ ---     ┆ Y_null_pct      ┆ null_pct        ┆ ull_pct         ┆ ELAY_null_pct   ┆ null_pct       │
│ bool    ┆ ---             ┆ ---             ┆ ---             ┆ ---             ┆ ---            │
│         ┆ f64             ┆ f64             ┆ f64             ┆ f64             ┆ f64            │
╞═════════╪═════════════════╪═════════════════╪═════════════════╪═════════════════╪════════════════╡
│ false   ┆ 100.0           ┆ 100.0           ┆ 100.0           ┆ 100.0           ┆ 100.0          │
│ true    ┆ 49.04207        ┆ 49.04207        ┆ 49.04207        ┆ 49.04207        ┆ 49.04207       │
└─────────┴─────────────────┴─────────────────┴─────────────────┴─────────────────┴────────────────┘


In [26]:
delayed_only = flown_not_div.filter(pl.col("ARRIVAL_DELAY") > 0)
no_reason = delayed_only.filter(
    sum([(pl.col(c).fill_null(0) == 0) for c in DELAY_COMP_COLS]) == len(DELAY_COMP_COLS)
).collect()
print("Number of delayed flights with no cause recorded:", no_reason.height)
no_reason.head()

Number of delayed flights with no cause recorded: 1023457


YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY
i64,i64,i64,i64,str,i64,str,str,str,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,str,i64,i64,i64,i64,i64
2015,1,1,4,"""US""",840,"""N171US""","""SFO""","""CLT""",20,18,-2,16,34,286,293,266,2296,800,11,806,811,5,0,0,,,,,,
2015,1,1,4,"""DL""",806,"""N3730B""","""SFO""","""MSP""",25,20,-5,18,38,217,230,206,1589,604,6,602,610,8,0,0,,,,,,
2015,1,1,4,"""DL""",2440,"""N651DL""","""SEA""","""MSP""",40,39,-1,28,107,189,198,166,1399,553,4,549,557,8,0,0,,,,,,
2015,1,1,4,"""NK""",520,"""N525NK""","""LAS""","""MCI""",55,120,25,11,131,162,143,128,1139,539,4,537,543,6,0,0,,,,,,
2015,1,1,4,"""AA""",371,"""N3GXAA""","""SEA""","""MIA""",100,52,-8,30,122,338,347,311,2724,933,6,938,939,1,0,0,,,,,,


In [27]:
flown_not_div.select(
    pl.col("WEATHER_DELAY").n_unique()
).collect()

WEATHER_DELAY
u32
633


In [28]:
flown_not_div.filter(
    pl.col("WEATHER_DELAY").is_not_null(),
).collect()["ARRIVAL_DELAY", "WEATHER_DELAY"]

ARRIVAL_DELAY,WEATHER_DELAY
i64,i64
25,0
43,0
15,0
20,0
85,0
…,…
20,0
28,0
28,0
159,0


In [29]:
df = flown_not_div.with_columns(
    pl.sum_horizontal([pl.col(c).fill_null(0) for c in DELAY_COMP_COLS]).alias("COMP_SUM")
).collect()

# 2. Check equality vs less-than vs greater-than
consistency = df.select([
    (pl.col("COMP_SUM") == pl.col("ARRIVAL_DELAY")).mean().alias("pct_equal_arr_delay"),
    (pl.col("COMP_SUM") < pl.col("ARRIVAL_DELAY")).mean().alias("pct_less_arr_delay"),
    (pl.col("COMP_SUM") > pl.col("ARRIVAL_DELAY")).mean().alias("pct_more_arr_delay"),

    (pl.col("COMP_SUM") == pl.col("DEPARTURE_DELAY")).mean().alias("pct_equal_dep_delay"),
    (pl.col("COMP_SUM") < pl.col("DEPARTURE_DELAY")).mean().alias("pct_less_dep_delay"),
    (pl.col("COMP_SUM") > pl.col("DEPARTURE_DELAY")).mean().alias("pct_more_dep_delay"),
])
print("Consistency check:")
print(consistency)

# 3. Distributions of each component
dist_stats = df.select([
    pl.col(c) for c in DELAY_COMP_COLS
]).describe()
print("Distribution stats for components:")
print(dist_stats)

# 4. Correlations of components with arrival/departure delay
corr_exprs = []
for c in DELAY_COMP_COLS:
    corr_exprs.append(pl.corr(c, "ARRIVAL_DELAY").alias(f"{c}_corr_arr"))
    corr_exprs.append(pl.corr(c, "DEPARTURE_DELAY").alias(f"{c}_corr_dep"))

correlations = df.select(corr_exprs)
print("Correlations:")
print(correlations)

# 5. Columns that never contribute (all null or zero)
contrib_check = df.select([
    ((pl.col(c).fill_null(0) > 0).any()).alias(f"{c}_has_contrib")
    for c in DELAY_COMP_COLS
])
print("Columns with any positive contribution:")
print(contrib_check)

Consistency check:
shape: (1, 6)
┌────────────────┬────────────────┬────────────────┬───────────────┬───────────────┬───────────────┐
│ pct_equal_arr_ ┆ pct_less_arr_d ┆ pct_more_arr_d ┆ pct_equal_dep ┆ pct_less_dep_ ┆ pct_more_dep_ │
│ delay          ┆ elay           ┆ elay           ┆ _delay        ┆ delay         ┆ delay         │
│ ---            ┆ ---            ┆ ---            ┆ ---           ┆ ---           ┆ ---           │
│ f64            ┆ f64            ┆ f64            ┆ f64           ┆ f64           ┆ f64           │
╞════════════════╪════════════════╪════════════════╪═══════════════╪═══════════════╪═══════════════╡
│ 0.208199       ┆ 0.179114       ┆ 0.612687       ┆ 0.059436      ┆ 0.293697      ┆ 0.646867      │
└────────────────┴────────────────┴────────────────┴───────────────┴───────────────┴───────────────┘
Distribution stats for components:
shape: (9, 6)
┌────────────┬──────────────────┬────────────────┬───────────────┬─────────────────┬───────────────┐
│ statist

In [30]:
correlations

AIR_SYSTEM_DELAY_corr_arr,AIR_SYSTEM_DELAY_corr_dep,SECURITY_DELAY_corr_arr,SECURITY_DELAY_corr_dep,AIRLINE_DELAY_corr_arr,AIRLINE_DELAY_corr_dep,LATE_AIRCRAFT_DELAY_corr_arr,LATE_AIRCRAFT_DELAY_corr_dep,WEATHER_DELAY_corr_arr,WEATHER_DELAY_corr_dep
f64,f64,f64,f64,f64,f64,f64,f64,f64,f64
0.247187,0.095917,0.009655,0.011877,0.609351,0.621296,0.522013,0.554802,0.264799,0.243532


In [31]:
flights_clean = flown_not_div.with_columns(
    [pl.col(c).fill_null(0).alias(c) for c in DELAY_COMP_COLS]
)

# ✅ Verify that nulls are gone
flights_clean.select(
    [pl.col(c).is_null().sum().alias(f"{c}_nulls") for c in DELAY_COMP_COLS]
).collect()

AIR_SYSTEM_DELAY_nulls,SECURITY_DELAY_nulls,AIRLINE_DELAY_nulls,LATE_AIRCRAFT_DELAY_nulls,WEATHER_DELAY_nulls
u32,u32,u32,u32,u32
0,0,0,0,0


In [32]:
null_summary(flights_clean, TIME_COLS)

DEPARTURE_TIME,ARRIVAL_TIME,AIR_TIME,WHEELS_OFF,WHEELS_ON,TAXI_IN,TAXI_OUT,DEPARTURE_TIME_null_pct,ARRIVAL_TIME_null_pct,AIR_TIME_null_pct,WHEELS_OFF_null_pct,WHEELS_ON_null_pct,TAXI_IN_null_pct,TAXI_OUT_null_pct
u32,u32,u32,u32,u32,u32,u32,f64,f64,f64,f64,f64,f64,f64
0,0,0,0,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [33]:
flights_clean.null_count().collect()

YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY
u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,5714008,0,0,0,0,0


In [34]:
flights_clean = flights_clean.drop(["DIVERTED", "CANCELLED", "CANCELLATION_REASON"])
flights_clean_df = flights_clean.collect(engine="streaming")
flights_clean_df.head()

YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY
i64,i64,i64,i64,str,i64,str,str,str,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64
2015,1,1,4,"""AS""",98,"""N407AS""","""ANC""","""SEA""",5,2354,-11,21,15,205,194,169,1448,404,4,430,408,-22,0,0,0,0,0
2015,1,1,4,"""AA""",2336,"""N3KUAA""","""LAX""","""PBI""",10,2,-8,12,14,280,279,263,2330,737,4,750,741,-9,0,0,0,0,0
2015,1,1,4,"""US""",840,"""N171US""","""SFO""","""CLT""",20,18,-2,16,34,286,293,266,2296,800,11,806,811,5,0,0,0,0,0
2015,1,1,4,"""AA""",258,"""N3HYAA""","""LAX""","""MIA""",20,15,-5,15,30,285,281,258,2342,748,8,805,756,-9,0,0,0,0,0
2015,1,1,4,"""AS""",135,"""N527AS""","""SEA""","""ANC""",25,24,-1,11,35,235,215,199,1448,254,5,320,259,-21,0,0,0,0,0


In [35]:
airlines_df.null_count()

IATA_CODE,AIRLINE
u32,u32
0,0


In [36]:
airports_df.null_count()

IATA_CODE,AIRPORT,CITY,STATE,COUNTRY,LATITUDE,LONGITUDE
u32,u32,u32,u32,u32,u32,u32
0,0,0,0,0,3,3


In [37]:
airports_df = airports_df.with_columns(
    pl.when(pl.col(['IATA_CODE']) == 'ECP')
    .then(pl.lit(30.3549))
    .otherwise(pl.col("LATITUDE"))
    .alias("LATITUDE"),
    
    pl.when(pl.col("IATA_CODE") == "ECP")
    .then(pl.lit(-85.7995))
    .otherwise(pl.col("LONGITUDE"))
    .alias("LONGITUDE")
    )
airports_df = airports_df.with_columns(
    pl.when(pl.col(['IATA_CODE']) == 'PBG')
    .then(pl.lit(44.6575))
    .otherwise(pl.col("LATITUDE"))
    .alias("LATITUDE"),
    
    pl.when(pl.col("IATA_CODE") == "PBG")
    .then(pl.lit(-73.4670))
    .otherwise(pl.col("LONGITUDE"))
    .alias("LONGITUDE")
    )
airports_df = airports_df.with_columns(
    pl.when(pl.col(['IATA_CODE']) == 'UST')
    .then(pl.lit(29.9314))
    .otherwise(pl.col("LATITUDE"))
    .alias("LATITUDE"),
    
    pl.when(pl.col("IATA_CODE") == "UST")
    .then(pl.lit(-81.2961))
    .otherwise(pl.col("LONGITUDE"))
    .alias("LONGITUDE")
    )

airports_df.null_count()

IATA_CODE,AIRPORT,CITY,STATE,COUNTRY,LATITUDE,LONGITUDE
u32,u32,u32,u32,u32,u32,u32
0,0,0,0,0,0,0


In [38]:
print(airports_df.schema)
print(flights_clean_df.schema)

Schema({'IATA_CODE': String, 'AIRPORT': String, 'CITY': String, 'STATE': String, 'COUNTRY': String, 'LATITUDE': Float64, 'LONGITUDE': Float64})
Schema({'YEAR': Int64, 'MONTH': Int64, 'DAY': Int64, 'DAY_OF_WEEK': Int64, 'AIRLINE': String, 'FLIGHT_NUMBER': Int64, 'TAIL_NUMBER': String, 'ORIGIN_AIRPORT': String, 'DESTINATION_AIRPORT': String, 'SCHEDULED_DEPARTURE': Int64, 'DEPARTURE_TIME': Int64, 'DEPARTURE_DELAY': Int64, 'TAXI_OUT': Int64, 'WHEELS_OFF': Int64, 'SCHEDULED_TIME': Int64, 'ELAPSED_TIME': Int64, 'AIR_TIME': Int64, 'DISTANCE': Int64, 'WHEELS_ON': Int64, 'TAXI_IN': Int64, 'SCHEDULED_ARRIVAL': Int64, 'ARRIVAL_TIME': Int64, 'ARRIVAL_DELAY': Int64, 'AIR_SYSTEM_DELAY': Int64, 'SECURITY_DELAY': Int64, 'AIRLINE_DELAY': Int64, 'LATE_AIRCRAFT_DELAY': Int64, 'WEATHER_DELAY': Int64})


In [39]:
airlines = airlines_df.lazy()
airports = airports_df.lazy()

In [40]:
lf = (
    flights_clean
    .join(airlines, left_on="AIRLINE", right_on="IATA_CODE", how="left")
    .rename({"AIRLINE_right": "AIRLINE_NAME"})
)

In [41]:
lf = (
    lf.join(
        airports.rename({
            "IATA_CODE": "ORIGIN_AIRPORT_CODE",
            "AIRPORT": "ORIGIN_AIRPORT",
            "CITY": "ORIGIN_CITY",
            "STATE": "ORIGIN_STATE",
            "COUNTRY": "ORIGIN_COUNTRY",
            "LATITUDE": "ORIGIN_LAT",
            "LONGITUDE": "ORIGIN_LON"
        }),
        left_on="ORIGIN_AIRPORT",
        right_on="ORIGIN_AIRPORT_CODE",
        how="left"
        )
)

In [42]:
lf.null_count().collect()

YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY,AIRLINE_NAME,ORIGIN_AIRPORT_right,ORIGIN_CITY,ORIGIN_STATE,ORIGIN_COUNTRY,ORIGIN_LAT,ORIGIN_LON
u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,482878,482878,482878,482878,482878,482878


In [43]:
lf = (
    lf.join(
        airports.rename({
            "IATA_CODE": "DEST_AIRPORT_CODE",
            "AIRPORT": "DEST_AIRPORT",
            "CITY": "DEST_CITY",
            "STATE": "DEST_STATE",
            "COUNTRY": "DEST_COUNTRY",
            "LATITUDE": "DEST_LAT",
            "LONGITUDE": "DEST_LON"
        }),
        left_on="DESTINATION_AIRPORT",
        right_on="DEST_AIRPORT_CODE",
        how="left"
        )
)

In [44]:
lf.null_count().collect()

YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY,AIRLINE_NAME,ORIGIN_AIRPORT_right,ORIGIN_CITY,ORIGIN_STATE,ORIGIN_COUNTRY,ORIGIN_LAT,ORIGIN_LON,DEST_AIRPORT,DEST_CITY,DEST_STATE,DEST_COUNTRY,DEST_LAT,DEST_LON
u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,482878,482878,482878,482878,482878,482878,482878,482878,482878,482878,482878,482878


In [45]:
# Which destination airport codes did not join?
missing_dest_codes = (
    lf
    .filter(pl.col("DEST_CITY").is_null())   # null join
    .select("DESTINATION_AIRPORT")
    .unique()
    .collect()
)

missing_dest_codes


DESTINATION_AIRPORT
str
"""12898"""
"""15841"""
"""11624"""
"""15380"""
"""10874"""
…
"""10372"""
"""11049"""
"""10157"""
"""15249"""


In [46]:
dest_null_counts = (
    lf
    .filter(pl.col("DEST_CITY").is_null())
    .group_by("DESTINATION_AIRPORT")
    .len()
    .sort("len", descending=True)
    .collect()
)

dest_null_counts


DESTINATION_AIRPORT,len
str,u32
"""10397""",32546
"""13930""",27506
"""11298""",20461
"""11292""",18065
"""12892""",17647
…,…
"""14222""",9
"""10165""",9
"""13502""",7
"""11503""",3


In [47]:
lf.collect_schema()

Schema([('YEAR', Int64),
        ('MONTH', Int64),
        ('DAY', Int64),
        ('DAY_OF_WEEK', Int64),
        ('AIRLINE', String),
        ('FLIGHT_NUMBER', Int64),
        ('TAIL_NUMBER', String),
        ('ORIGIN_AIRPORT', String),
        ('DESTINATION_AIRPORT', String),
        ('SCHEDULED_DEPARTURE', Int64),
        ('DEPARTURE_TIME', Int64),
        ('DEPARTURE_DELAY', Int64),
        ('TAXI_OUT', Int64),
        ('WHEELS_OFF', Int64),
        ('SCHEDULED_TIME', Int64),
        ('ELAPSED_TIME', Int64),
        ('AIR_TIME', Int64),
        ('DISTANCE', Int64),
        ('WHEELS_ON', Int64),
        ('TAXI_IN', Int64),
        ('SCHEDULED_ARRIVAL', Int64),
        ('ARRIVAL_TIME', Int64),
        ('ARRIVAL_DELAY', Int64),
        ('AIR_SYSTEM_DELAY', Int64),
        ('SECURITY_DELAY', Int64),
        ('AIRLINE_DELAY', Int64),
        ('LATE_AIRCRAFT_DELAY', Int64),
        ('WEATHER_DELAY', Int64),
        ('AIRLINE_NAME', String),
        ('ORIGIN_AIRPORT_right', String),
    

In [48]:
cols_to_drop = ["ORIGIN_AIRPORT_right", "DEST_AIRPORT", "AIRLINE_NAME"]
lf = lf.drop(cols_to_drop)

In [49]:
origin_check = (
    lf
    .select(["ORIGIN_AIRPORT", "ORIGIN_CITY", "ORIGIN_STATE", "ORIGIN_COUNTRY"])
    .unique()
    .group_by("ORIGIN_AIRPORT")
    .len()
    .filter(pl.col("len") > 1)
)

dest_check = (
    lf
    .select(["DESTINATION_AIRPORT", "DEST_CITY", "DEST_STATE", "DEST_COUNTRY"])
    .unique()
    .group_by("DESTINATION_AIRPORT")
    .len()
    .filter(pl.col("len") > 1)
)

In [50]:
origin_check.collect()

ORIGIN_AIRPORT,len
str,u32


In [51]:
dest_check.collect()

DESTINATION_AIRPORT,len
str,u32


In [52]:
lf = lf.drop(["ORIGIN_CITY", "ORIGIN_STATE", "ORIGIN_COUNTRY", "DEST_CITY", "DEST_STATE", "DEST_COUNTRY"])

In [53]:
print(lf.collect_schema())
lf.null_count().collect()

Schema({'YEAR': Int64, 'MONTH': Int64, 'DAY': Int64, 'DAY_OF_WEEK': Int64, 'AIRLINE': String, 'FLIGHT_NUMBER': Int64, 'TAIL_NUMBER': String, 'ORIGIN_AIRPORT': String, 'DESTINATION_AIRPORT': String, 'SCHEDULED_DEPARTURE': Int64, 'DEPARTURE_TIME': Int64, 'DEPARTURE_DELAY': Int64, 'TAXI_OUT': Int64, 'WHEELS_OFF': Int64, 'SCHEDULED_TIME': Int64, 'ELAPSED_TIME': Int64, 'AIR_TIME': Int64, 'DISTANCE': Int64, 'WHEELS_ON': Int64, 'TAXI_IN': Int64, 'SCHEDULED_ARRIVAL': Int64, 'ARRIVAL_TIME': Int64, 'ARRIVAL_DELAY': Int64, 'AIR_SYSTEM_DELAY': Int64, 'SECURITY_DELAY': Int64, 'AIRLINE_DELAY': Int64, 'LATE_AIRCRAFT_DELAY': Int64, 'WEATHER_DELAY': Int64, 'ORIGIN_LAT': Float64, 'ORIGIN_LON': Float64, 'DEST_LAT': Float64, 'DEST_LON': Float64})


YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY,ORIGIN_LAT,ORIGIN_LON,DEST_LAT,DEST_LON
u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,482878,482878,482878,482878


In [54]:
null_rows_count_lf = lf.filter(
    pl.any_horizontal(
        pl.col(["ORIGIN_LAT"]).is_null(),
        pl.col(["ORIGIN_LON"]).is_null(),
        pl.col(["DEST_LAT"]).is_null(),
        pl.col(["DEST_LON"]).is_null(),
    )
).select(
    pl.len().alias("count_of_null_rows")
).collect()
null_rows_count_lf

count_of_null_rows
u32
482878


In [55]:
total_rows = lf.collect().shape[0]
null_rows_count_lf[0, 0] / total_rows * 100

8.450775707699394

In [56]:
missing_airports = lf.filter(
    pl.col("ORIGIN_LAT").is_null()
)
missing_airports.collect().head()

YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY,ORIGIN_LAT,ORIGIN_LON,DEST_LAT,DEST_LON
i64,i64,i64,i64,str,i64,str,str,str,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,f64,f64,f64,f64
2015,10,1,4,"""AA""",1230,"""N3DBAA""","""14747""","""11298""",5,15,10,15,30,237,208,189,1660,539,4,602,543,-19,0,0,0,0,0,,,,
2015,10,1,4,"""DL""",1805,"""N696DL""","""14771""","""13487""",5,16,11,14,30,213,192,171,1589,521,7,538,528,-10,0,0,0,0,0,,,,
2015,10,1,4,"""NK""",612,"""N602NK""","""12889""","""13487""",5,2400,-5,15,15,177,168,149,1299,444,4,502,448,-14,0,0,0,0,0,,,,
2015,10,1,4,"""AA""",260,"""N3GNAA""","""12892""","""13303""",10,7,-3,28,35,296,306,271,2342,806,7,806,813,7,0,0,0,0,0,,,,
2015,10,1,4,"""AA""",1982,"""N914UY""","""14771""","""11057""",10,8,-2,12,20,291,282,257,2296,737,13,801,750,-11,0,0,0,0,0,,,,


In [57]:
lf_no_geo = lf.drop(["ORIGIN_LAT", "ORIGIN_LON", "DEST_LAT", "DEST_LON"])
lf_no_geo.null_count().collect()

YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY
u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [58]:
lf = lf.drop_nulls(subset=["ORIGIN_LAT"])
lf.null_count().collect()

YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY,ORIGIN_LAT,ORIGIN_LON,DEST_LAT,DEST_LON
u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [59]:
lf_no_delay = lf_no_geo.drop(["WHEELS_ON", "WHEELS_OFF", "AIR_SYSTEM_DELAY", "SECURITY_DELAY", "AIRLINE_DELAY", "LATE_AIRCRAFT_DELAY", "WEATHER_DELAY", "ARRIVAL_TIME", "ELAPSED_TIME", "AIR_TIME", "TAXI_IN"])

In [60]:
"""df = lf.collect()
missing_airports_df = missing_airports.collect()
df_no_geo = lf_no_geo.collect()
df_no_geo.write_parquet("data/flights_processed_no_geo.parquet")
airlines_df.write_parquet("data/airlines.parquet")
airports_df.write_parquet("data/airports.parquet")
missing_airports_df.write_parquet("data/missing_airports.parquet")
df.write_parquet("data/flights_processed.parquet")"""

'df = lf.collect()\nmissing_airports_df = missing_airports.collect()\ndf_no_geo = lf_no_geo.collect()\ndf_no_geo.write_parquet("data/flights_processed_no_geo.parquet")\nairlines_df.write_parquet("data/airlines.parquet")\nairports_df.write_parquet("data/airports.parquet")\nmissing_airports_df.write_parquet("data/missing_airports.parquet")\ndf.write_parquet("data/flights_processed.parquet")'

In [61]:
df_no_delay = lf_no_delay.collect()
df_no_delay.write_parquet("data/flights_preprocessed_no_delay.parquet")

In [62]:
lf.select(pl.col(["ARRIVAL_TIME"]).value_counts()).collect()

ARRIVAL_TIME
struct[2]
"{1140,5010}"
"{1507,4892}"
"{1452,4989}"
"{2201,4480}"
"{2015,5188}"
…
"{1106,5251}"
"{1348,5273}"
"{2136,4871}"
"{108,677}"


In [63]:
lf.select(pl.col(["WHEELS_OFF"]).value_counts()).collect()

WHEELS_OFF
struct[2]
"{632,5315}"
"{2131,3037}"
"{1304,5250}"
"{216,77}"
"{1230,5360}"
…
"{531,1100}"
"{218,69}"
"{1342,4991}"
"{645,4431}"


In [64]:
lf.select(pl.col("WHEELS_ON").value_counts()).collect()

WHEELS_ON
struct[2]
"{746,3856}"
"{1132,4859}"
"{1932,4792}"
"{826,4266}"
"{2020,5258}"
…
"{248,86}"
"{718,2921}"
"{502,439}"
"{1724,5197}"
