# Preprocessing

Goal: Link the flight data to fuel segments and chunk them for batching.

In [1]:
from pathlib import Path
from typing import Literal

DATA_PATH = Path.cwd().parent / "data"
DATA_TYPE : Literal["train", "rank", "final"] = "train"
DATA_PATH

WindowsPath('c:/Users/rayte/Work/prc2025dspy/data')

## 1. Augment segments with select features

Refer to DATA.md for explanations on the data.

Let's link the features to each fuel segment.

Since we are working with LLMs, we can save tokens by:
- excluding features that are not used in their reasoning
- summarizing features which have many data points, such as track_points and timestamps
- round (and convert) numbers, especially those with high decimal precision
- possibly exclude missing values ("NaN" --> "")

In [2]:
import os

fuel_file_name = "fuel_" + (DATA_TYPE if DATA_TYPE == "train" else DATA_TYPE + "_submission")
fuel_file_path = os.path.join(DATA_PATH, fuel_file_name + ".parquet")
flightlist_path = os.path.join(DATA_PATH, "flightlist_" + DATA_TYPE + ".parquet")
flights_folder_path = os.path.join(DATA_PATH, "flights_" + DATA_TYPE)
airports_file_path = os.path.join(DATA_PATH, "apt.parquet")

In [3]:
import os

# --- Configuration ---
BATCH_SIZE = 100
OUTPUT_DIR = os.path.join(DATA_PATH, "output_batches", DATA_TYPE)
TRAJECTORY_FOLDER = flights_folder_path

os.makedirs(OUTPUT_DIR, exist_ok=True)


In [4]:
import polars as pl
import gc
from tqdm import tqdm

# --- 1. Load Metadata & Sanitize ---
print("Loading metadata...")
fuel_df = pl.read_parquet(fuel_file_path)
flightlist_df = pl.read_parquet(flightlist_path)

duration_minutes_expr = ((pl.col("end") - pl.col("start")).dt.total_seconds() / 60).cast(pl.Float64)
duration_display_expr = (
    pl.when(duration_minutes_expr >= 1)
    .then(duration_minutes_expr.floor().cast(pl.Int64))
    .otherwise(duration_minutes_expr.round(2))
)

skeleton_df = (
    fuel_df
    .join(flightlist_df, on="flight_id", how="left")
    # FIX 1: Sanitize IDs and pre-calculate types
    .with_columns([
        pl.col("flight_id").cast(pl.String).str.strip_chars(),
        pl.col("fuel_kg").alias("fuel_burnt"),
        pl.format("{}min", duration_display_expr).alias("duration_min"),
        (pl.col("origin_icao") + "-" + pl.col("destination_icao")).alias("route_icao"),
        ((pl.col("start") - pl.col("takeoff")) / (pl.col("landed") - pl.col("takeoff"))).alias("progress_pct")
    ])
)

unique_flight_ids = skeleton_df["flight_id"].unique().to_list()
total_flights = len(unique_flight_ids)
print(f"Found {total_flights} flights.")


Loading metadata...
Found 11037 flights.


In [5]:
# --- 2. Batch Processing ---
ALTITUDE_SLICE_METERS = 25

def round_up_to_slice(expr, slice_size=ALTITUDE_SLICE_METERS):
    return expr.truediv(slice_size).ceil().mul(slice_size)

batch_select_columns = [
    pl.col("idx"),
    pl.col("fuel_burnt"),
    pl.col("flight_id"),
    pl.col("aircraft_type").alias("aircraft"),
    pl.col("duration_min"),
    pl.col("route_icao"),
    pl.col("alt_start"),
    pl.col("alt_end"),
    pl.col("peak_altitude"),
    pl.col("ground_speed_start"),
    pl.col("ground_speed_end"),
    pl.col("vertical_rate_start"),
    pl.col("vertical_rate_end"),
]

metric_column_defaults = [
    pl.lit(None).alias("alt_start"),
    pl.lit(None).alias("alt_end"),
    pl.lit(None).alias("peak_altitude"),
    pl.lit(None).alias("ground_speed_start"),
    pl.lit(None).alias("ground_speed_end"),
    pl.lit(None).alias("vertical_rate_start"),
    pl.lit(None).alias("vertical_rate_end"),
]

for i in tqdm(range(0, total_flights, BATCH_SIZE), desc="Processing Batches"):
    
    batch_ids = unique_flight_ids[i : i + BATCH_SIZE]
    batch_filename = os.path.join(OUTPUT_DIR, f"batch_{i // BATCH_SIZE}.parquet")
    
    if os.path.exists(batch_filename):
        continue

    valid_files = []
    valid_ids_in_batch = []
    
    for fid in batch_ids:
        path = os.path.join(TRAJECTORY_FOLDER, f"{fid}.parquet")
        if os.path.exists(path):
            valid_files.append(path)
            valid_ids_in_batch.append(fid)

    # Prepare the skeleton for this batch
    current_skeleton = skeleton_df.filter(pl.col("flight_id").is_in(batch_ids))

    if valid_files:
        try:
            # Load Trajectories with Normalized Types
            traj_lazy = (
                pl.scan_parquet(valid_files)
                .with_columns([
                    pl.col("flight_id").cast(pl.String).str.strip_chars(),
                    # Force timestamps to Naive Microseconds to match fuel_df
                    pl.col("timestamp").dt.cast_time_unit("us").dt.replace_time_zone(None)
                ])
            )
            
            # Prepare Skeleton for Join with Normalized Types
            skeleton_lazy = (
                current_skeleton.lazy()
                .with_columns([
                    pl.col("start").dt.cast_time_unit("us").dt.replace_time_zone(None),
                    pl.col("end").dt.cast_time_unit("us").dt.replace_time_zone(None)
                ])
            )

            # Calculate Stats
            stats_df = (
                skeleton_lazy
                .join(traj_lazy, on="flight_id", how="inner")
                .filter(
                    # Add 30s buffer to catch points for short/zero-duration segments
                    (pl.col("timestamp") >= pl.col("start").dt.offset_by("-30s")) & 
                    (pl.col("timestamp") <= pl.col("end").dt.offset_by("30s"))
                )
                .group_by("idx")
                .agg([
                    round_up_to_slice(pl.col("altitude").sort_by("timestamp").first()).alias("alt_start"),
                    round_up_to_slice(pl.col("altitude").sort_by("timestamp").last()).alias("alt_end"),
                    round_up_to_slice(pl.col("altitude").max()).alias("peak_altitude"),
                    pl.col("groundspeed").sort_by("timestamp").first().alias("ground_speed_start"),
                    pl.col("groundspeed").sort_by("timestamp").last().alias("ground_speed_end"),
                    pl.col("vertical_rate").sort_by("timestamp").first().alias("vertical_rate_start"),
                    pl.col("vertical_rate").sort_by("timestamp").last().alias("vertical_rate_end"),
                ])
                .collect() 
            )

            # Join back to ensure all rows are kept
            final_batch = (
                current_skeleton
                .join(stats_df, on="idx", how="left")
                .select(batch_select_columns)
            )
            final_batch.write_parquet(batch_filename)

        except Exception as e:
            print(f"Error in batch {i}: {e}")

    else:
        # Fallback for batches with no files
        final_batch = (
            current_skeleton
            .with_columns(metric_column_defaults)
            .select(batch_select_columns)
        )
        final_batch.write_parquet(batch_filename)

    gc.collect()

print("Processing complete.")


Processing Batches: 100%|██████████| 111/111 [00:23<00:00,  4.64it/s]

Processing complete.





In [6]:
# --- 3. Combine Batches Utility ---
BATCH_PATTERN_DEFAULT = "batch_*.parquet"

def combine_batches(output_name="complete.parquet", pattern=BATCH_PATTERN_DEFAULT, overwrite=False, dry_run=False):
    output_dir = Path(OUTPUT_DIR) if not isinstance(OUTPUT_DIR, Path) else OUTPUT_DIR
    batch_files = sorted(
        path for path in output_dir.glob(pattern)
        if path.is_file() and path.name != output_name
    )
    if not batch_files:
        print(f"No files matching pattern '{pattern}' in {output_dir}.")
        return
    output_path = output_dir / output_name
    if output_path.exists() and not overwrite:
        raise FileExistsError(
            f"Output {output_path} exists. Pass overwrite=True to replace it."
        )
    if dry_run:
        print(f"[dry-run] Would combine {len(batch_files)} files into {output_path}")
        for path in batch_files:
            print(f" - {path.name}")
        return
    pl.scan_parquet([str(path) for path in batch_files]).sink_parquet(str(output_path))
    print(f"Combined {len(batch_files)} files into {output_path}")


combine_batches(overwrite=True)


Combined 111 files into c:\Users\rayte\Work\prc2025dspy\data\output_batches\train\complete.parquet


## DSPy

Note: I use my own fork of DSPy because I had to implement asynchronous batching: https://github.com/rayanehmi/dspy/tree/feat/async_batching

In [None]:
import pandas as pd

idx = 17909
fuel_df = pd.read_parquet(fuel_file_path)