In [49]:
import polars as pl

#root_dir = "/home/ayesh/Documents/MSc/Period1/Introduction to Data Science/MiniProject/weather-driven-transit-delay-forecasting/"
df = pl.scan_parquet(["../../data/interim/combined_actual_delays.parquet"])
df.head(5).collect()

datetime,trip_id,stop_sequence,stop_id,actual_arrival_delay,arrival_uncertainty,actual_departure_delay,departure_uncertainty,timestamp
str,i64,i64,i64,f64,f64,f64,f64,i64
"""2022-06-29 21:59:54""",14010000607402196,1,9022001055075002,47.0,,47.0,0.0,1656539994
"""2022-06-29 21:59:54""",14010000612481735,1,9022001011725019,0.0,,7.0,0.0,1656539994
"""2022-06-29 21:59:54""",14010000611286946,10,9022001007531001,8.0,0.0,72.0,0.0,1656539994
"""2022-06-29 21:59:54""",14010000610964728,8,9022001051054001,162.0,0.0,162.0,0.0,1656539994
"""2022-06-29 21:59:54""",14010000610964619,12,9022001050020001,-17.0,,-17.0,,1656539994


In [27]:
# Number of rows
df.select(pl.len()).collect()

len
u32
433845292


In [23]:
import duckdb

result = duckdb.sql("""
SELECT COUNT(*) AS unique_pairs
FROM (
    SELECT DISTINCT trip_id, stop_id
    FROM '../..//data/interim/combined_actual_delays.parquet'
)
""").fetchone()[0]

print("Unique (trip_id, stop_id) pairs:", result)


Unique (trip_id, stop_id) pairs: 10006452


In [50]:
# Drop irrelevant columns
df = df.drop(["stop_sequence", "arrival_uncertainty", "actual_departure_delay", "departure_uncertainty",	"timestamp"])

# Process datetime column and create hour of the day(hod) column
df = (
    df.with_columns(
        pl.col("datetime").str.strptime(pl.Datetime, "%Y-%m-%d %H:%M:%S")
    )
    .with_columns([
        pl.col("datetime").cast(pl.Date).alias("date"),   # actual Date type
        pl.col("datetime").dt.hour().alias("HOD"),        # hour of day
    ])
    .drop("datetime")
)
df = df.rename({'actual_arrival_delay': 'delay'})

In [51]:
# Checking missing values
na_results = {}
for col in df.collect_schema().names():
    count_df = df.select(pl.col(col).is_null().sum().alias("na_count")).collect()
    na_results[col] = count_df["na_count"][0]

print(na_results)

{'trip_id': 552995, 'stop_id': 0, 'delay': 490858, 'date': 0, 'HOD': 0}


In [52]:
# Drop rows with missing trip_id as they cannot be combined 
# with GTFS-Static data

df = df.drop_nulls(subset=["trip_id"])

# Checking missing values
na_results = {}
for col in df.collect_schema().names():
    count_df = df.select(pl.col(col).is_null().sum().alias("na_count")).collect()
    na_results[col] = count_df["na_count"][0]

print(na_results)

{'trip_id': 0, 'stop_id': 0, 'delay': 483209, 'date': 0, 'HOD': 0}


In [53]:
# Write out lazily to Parquet (streaming-safe)
df.sink_parquet("../../data/interim/gtfs_rt_parquet/actual_delays_cleaned.parquet")

In [69]:
df = pl.scan_parquet(["../../data/interim/gtfs_rt_parquet/actual_delays_cleaned.parquet"])
df.head(5).collect()

trip_id,stop_id,delay,date,HOD
i64,i64,f64,date,i8
14010000607402196,9022001055075002,47.0,2022-06-29,21
14010000612481735,9022001011725019,0.0,2022-06-29,21
14010000611286946,9022001007531001,8.0,2022-06-29,21
14010000610964728,9022001051054001,162.0,2022-06-29,21
14010000610964619,9022001050020001,-17.0,2022-06-29,21


In [64]:
tdf = df.with_columns([
    pl.col("date").dt.month().alias("month")
])

# Count rows for May–October
may_oct_count = (
    tdf.filter(pl.col("month").is_in([5,6,7,8,9,10]))
      .select(pl.len().alias("count"))
      .collect()
)["count"][0]

# Count rows for Nov–Apr
nov_apr_count = (
    tdf.filter(pl.col("month").is_in([11,12,1,2,3,4]))
      .select(pl.len().alias("count"))
      .collect()
)["count"][0]

print(f"Rows in May–Oct: {may_oct_count}")
print(f"Rows in Nov–Apr: {nov_apr_count}")

Rows in May–Oct: 221585785
Rows in Nov–Apr: 211706512


In [71]:
# Keep only rows outside May–October (Snowing months)
df_filtered = (
    df.lazy()
      .filter(
          ~pl.col("date")
            .dt.month()
            .is_in([5,6,7,8,9,10])
      )
)

# Write out lazily to Parquet (streaming-safe)
df_filtered.sink_parquet("../../data/interim/gtfs_rt_parquet/gtfs_rt_snow_filtered.parquet")


In [72]:
df = pl.scan_parquet(["../../data/interim/gtfs_rt_parquet/gtfs_rt_snow_filtered.parquet"])
df.select(pl.len()).collect()

len
u32
211706512


In [73]:
tdf = df.with_columns([
    pl.col("date").dt.month().alias("month")
])

# Count rows for May–October
may_oct_count = (
    tdf.filter(pl.col("month").is_in([5,6,7,8,9,10]))
      .select(pl.len().alias("count"))
      .collect()
)["count"][0]

# Count rows for Nov–Apr
nov_apr_count = (
    tdf.filter(pl.col("month").is_in([11,12,1,2,3,4]))
      .select(pl.len().alias("count"))
      .collect()
)["count"][0]

print(f"Rows in May–Oct: {may_oct_count}")
print(f"Rows in Nov–Apr: {nov_apr_count}")

Rows in May–Oct: 0
Rows in Nov–Apr: 211706512


In [86]:
df = pl.scan_parquet(["../../data/interim/joined_daily/*.parquet"])
df.head().collect()

date,HOD,trip_id,stop_id,actual_arrival_delay,route_id,route_type,direction_id,shape_dist_traveled,Temperature,Precipitation,SnowDepth,WindSpeed
date,i8,i64,i64,f64,str,str,str,str,f64,f64,f64,f64
2022-11-01,6,14010000611464612,9022001010649002,-60.0,"""9011001000400000""","""700""","""0""","""0""",0.3,0.0,0.0,
2022-11-01,6,14010000611464612,9022001010645002,56.0,"""9011001000400000""","""700""","""0""","""1529.96""",0.3,0.0,0.0,
2022-11-01,6,14010000611464612,9022001010369001,-24.0,"""9011001000400000""","""700""","""0""","""2256.26""",0.3,0.0,0.0,
2022-11-01,6,14010000611464612,9022001010367002,63.0,"""9011001000400000""","""700""","""0""","""2697.96""",0.3,0.0,0.0,
2022-11-01,6,14010000611464612,9022001010363004,90.0,"""9011001000400000""","""700""","""0""","""3344.81""",0.3,0.0,0.0,


In [77]:
df.select(pl.len()).collect()

len
u32
211701735


In [None]:
df = df.rename({
    "HOD": "hod",
    "actual_arrival_delay": "delay",
    "Temperature": "temperature",
    "Precipitation": "precipitation",
    "SnowDepth": "snow_depth",
    "WindSpeed": "wind_speed"
})