In [1]:
from pathlib import Path

def find_project_root(start: Path) -> Path:
    p = start.resolve()
    for parent in [p] + list(p.parents):
        if (parent / "data").exists():
            return parent
    return p

ROOT = find_project_root(Path.cwd())
RAW_DIR = ROOT / "data" / "raw_opensky"
# RAW_DIR.mkdir(parents=True, exist_ok=True)

# Flight data preprocessing

In [8]:
Path.cwd().resolve().parents

<WindowsPath.parents>

In [10]:
RAW_DIR

WindowsPath('C:/Users/HiWi/Desktop/Terril/01_nextcloud/Germany/DATA SCIENCE/Semesters/05/02 Sustainability in aviation/03 contrail-mvp/data/raw_opensky')

In [11]:

IN_PREFIX = "states_europe_winter"          # your downloaded files prefix
OUT_DIR = ROOT / "data" / "processed" / "opensky_points_60s_europe_winter"
OUT_DIR.mkdir(parents=True, exist_ok=True)

files = sorted(RAW_DIR.glob(f"{IN_PREFIX}_*.parquet"))
len(files), files[:2]


(24,
 [WindowsPath('C:/Users/HiWi/Desktop/Terril/01_nextcloud/Germany/DATA SCIENCE/Semesters/05/02 Sustainability in aviation/03 contrail-mvp/data/raw_opensky/states_europe_winter_20250113_000000Z.parquet'),
  WindowsPath('C:/Users/HiWi/Desktop/Terril/01_nextcloud/Germany/DATA SCIENCE/Semesters/05/02 Sustainability in aviation/03 contrail-mvp/data/raw_opensky/states_europe_winter_20250113_010000Z.parquet')])

In [12]:
import pandas as pd
from tqdm.auto import tqdm

# Choose what you keep (minimal set)
KEEP_COLS = ["time", "icao24", "callsign", "lat", "lon", "baroaltitude"]

# Optional cruise-band refilter (set to None to disable)
ALT_MIN_M = 8000
ALT_MAX_M = 13000

def preprocess_one_file(in_path: Path, out_path: Path):
    df = pd.read_parquet(in_path, columns=KEEP_COLS)
    if df.empty:
        df.to_parquet(out_path, index=False)
        return 0

    # Clean callsign (strip spaces, convert empty -> NA)
    df["callsign"] = df["callsign"].astype("string").str.strip()
    df.loc[df["callsign"].isna() | (df["callsign"] == ""), "callsign"] = pd.NA

    # Drop missing essentials
    df = df.dropna(subset=["time", "icao24", "lat", "lon", "baroaltitude"])

    # Optional: cruise filter
    if ALT_MIN_M is not None and ALT_MAX_M is not None:
        df = df[(df["baroaltitude"] >= ALT_MIN_M) & (df["baroaltitude"] <= ALT_MAX_M)]

    if df.empty:
        df.to_parquet(out_path, index=False)
        return 0

    # Downsample: 1 point per minute per (icao24, callsign)
    df["minute"] = (df["time"] // 60).astype("int64")

    # Sort so we consistently keep the earliest point in each minute-bin
    df = df.sort_values(["icao24", "callsign", "minute", "time"], kind="mergesort")

    # Keep first row per minute bin
    df = df.drop_duplicates(subset=["icao24", "callsign", "minute"], keep="first")

    # Drop helper column
    df = df.drop(columns=["minute"])

    df.to_parquet(out_path, index=False)
    return len(df)

written_rows = 0
for f in tqdm(files):
    out_path = OUT_DIR / f.name.replace(IN_PREFIX, f"{IN_PREFIX}__60s")
    if out_path.exists():
        continue
    written_rows += preprocess_one_file(f, out_path)

print("Done. Output files:", len(list(OUT_DIR.glob("*.parquet"))))
print("Total rows written (approx):", f"{written_rows:,}")


  from .autonotebook import tqdm as notebook_tqdm
100%|██████████| 24/24 [03:06<00:00,  7.78s/it]

Done. Output files: 24
Total rows written (approx): 1,363,182





In [13]:
import pyarrow.parquet as pq
import numpy as np

out_files = sorted(OUT_DIR.glob("*.parquet"))
print("processed files:", len(out_files))

total = 0
lat_min, lat_max = None, None
lon_min, lon_max = None, None

for f in out_files[:10]:  # sample first 10 to keep it quick
    pf = pq.ParquetFile(f)
    total += pf.metadata.num_rows
    ll = pf.read(columns=["lat","lon"]).to_pandas()
    lat_min = ll["lat"].min() if lat_min is None else min(lat_min, ll["lat"].min())
    lat_max = ll["lat"].max() if lat_max is None else max(lat_max, ll["lat"].max())
    lon_min = ll["lon"].min() if lon_min is None else min(lon_min, ll["lon"].min())
    lon_max = ll["lon"].max() if lon_max is None else max(lon_max, ll["lon"].max())

print("sampled rows (10 files):", f"{total:,}")
print("lat_range sample:", (lat_min, lat_max))
print("lon_range sample:", (lon_min, lon_max))


processed files: 24
sampled rows (10 files): 400,771
lat_range sample: (np.float64(35.00001525878906), np.float64(71.39628264863612))
lon_range sample: (np.float64(-14.999892290900732), np.float64(34.999891008649556))


In [14]:
# from pathlib import Path
# import pandas as pd

IN_DIR = OUT_DIR
out_day = ROOT /"data" / "processed" / "opensky_points_60s_europe_winter_day.parquet"

files = sorted(IN_DIR.glob("*.parquet"))
dfs = [pd.read_parquet(f) for f in files]
points = pd.concat(dfs, ignore_index=True)

# Ensure types + sort for later sessionizing
points["callsign"] = points["callsign"].astype("string")
points = points.sort_values(["icao24", "callsign", "time"]).reset_index(drop=True)

points.to_parquet(out_day, index=False)
print("Wrote:", out_day, "| rows:", f"{len(points):,}")


Wrote: C:\Users\HiWi\Desktop\Terril\01_nextcloud\Germany\DATA SCIENCE\Semesters\05\02 Sustainability in aviation\03 contrail-mvp\data\processed\opensky_points_60s_europe_winter_day.parquet | rows: 1,363,182


In [16]:
import numpy as np

GAP_S = 30 * 60  # 30 minutes

# time difference per (icao24,callsign)
dt = points.groupby(["icao24", "callsign"])["time"].diff()

new_flight = dt.isna() | (dt > GAP_S)

# flight index per (icao24,callsign)
flight_idx = new_flight.groupby([points["icao24"], points["callsign"]]).cumsum()

# build a stable flight_id
points["flight_id"] = (
    points["icao24"].astype(str) + "_" +
    points["callsign"].fillna("NA").astype(str) + "_" +
    flight_idx.astype(str)
)

print("Estimated flights:", points["flight_id"].nunique())
points[["flight_id","icao24","callsign","time","lat","lon","baroaltitude"]].head()


Estimated flights: 22026


Unnamed: 0,flight_id,icao24,callsign,time,lat,lon,baroaltitude
0,000001_QSUAV_1.0,000001,QSUAV,1736767135,52.396126,5.899658,11643.36
1,008ba5_OPM007_1.0,008ba5,OPM007,1736754689,50.493805,-0.415023,8008.62
2,008ba5_OPM007_1.0,008ba5,OPM007,1736754720,50.43502,-0.371308,8321.04
3,008ba5_OPM007_1.0,008ba5,OPM007,1736754780,50.309981,-0.307914,9014.46
4,008ba5_OPM007_1.0,008ba5,OPM007,1736754840,50.180283,-0.242277,9540.24


In [18]:
from math import radians, sin, cos, sqrt, atan2

def haversine_km(lat1, lon1, lat2, lon2):
    R = 6371.0
    dlat = radians(lat2 - lat1)
    dlon = radians(lon2 - lon1)
    a = sin(dlat/2)**2 + cos(radians(lat1))*cos(radians(lat2))*sin(dlon/2)**2
    return 2 * R * atan2(sqrt(a), sqrt(1-a))

# shift within each flight
p = points
p["lat2"] = p.groupby("flight_id")["lat"].shift(-1)
p["lon2"] = p.groupby("flight_id")["lon"].shift(-1)
p["time2"] = p.groupby("flight_id")["time"].shift(-1)
p["alt2"] = p.groupby("flight_id")["baroaltitude"].shift(-1)

seg = p.dropna(subset=["lat2","lon2","time2","alt2"]).copy()
seg["dt_s"] = (seg["time2"] - seg["time"]).astype("int64")
seg = seg[(seg["dt_s"] > 0) & (seg["dt_s"] <= 5*60)]  # keep reasonable gaps (<=5 min)

# distance
seg["dist_km"] = [
    haversine_km(a,b,c,d) for a,b,c,d in zip(seg["lat"],seg["lon"],seg["lat2"],seg["lon2"])
]

# midpoint features (useful for met collocation)
seg["mid_time"] = ((seg["time"] + seg["time2"]) // 2).astype("int64")
seg["mid_lat"] = (seg["lat"] + seg["lat2"]) / 2
seg["mid_lon"] = (seg["lon"] + seg["lon2"]) / 2
seg["mid_alt_m"] = (seg["baroaltitude"] + seg["alt2"]) / 2

segments = seg[["flight_id","mid_time","mid_lat","mid_lon","mid_alt_m","dt_s","dist_km"]].reset_index(drop=True)
out_seg = ROOT /"data" / "processed" / "opensky_segments_europe_winter_day.parquet"
segments.to_parquet(out_seg, index=False)

print("Segments:", f"{len(segments):,}", "| Flights:", points["flight_id"].nunique())
print("Wrote:", out_seg)


Segments: 1,336,164 | Flights: 22026
Wrote: C:\Users\HiWi\Desktop\Terril\01_nextcloud\Germany\DATA SCIENCE\Semesters\05\02 Sustainability in aviation\03 contrail-mvp\data\processed\opensky_segments_europe_winter_day.parquet


In [19]:
print("callsign NA rows:", points["callsign"].isna().sum())
print("flight_idx NA count:", flight_idx.isna().sum())


callsign NA rows: 4548
flight_idx NA count: 4548


In [21]:
import numpy as np
import pandas as pd

# Haversine distance (vectorized-ish)
def haversine_km(lat1, lon1, lat2, lon2):
    R = 6371.0
    lat1 = np.radians(lat1); lon1 = np.radians(lon1)
    lat2 = np.radians(lat2); lon2 = np.radians(lon2)
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = np.sin(dlat/2)**2 + np.cos(lat1)*np.cos(lat2)*np.sin(dlon/2)**2
    return 2 * R * np.arctan2(np.sqrt(a), np.sqrt(1 - a))

p = points.sort_values(["flight_id", "time"]).reset_index(drop=True)

# Shift next point within each flight
p["time2"] = p.groupby("flight_id")["time"].shift(-1)
p["lat2"]  = p.groupby("flight_id")["lat"].shift(-1)
p["lon2"]  = p.groupby("flight_id")["lon"].shift(-1)
p["alt2"]  = p.groupby("flight_id")["baroaltitude"].shift(-1)

seg = p.dropna(subset=["time2","lat2","lon2","alt2"]).copy()

# Segment duration
seg["dt_s"] = (seg["time2"] - seg["time"]).astype("int64")

# Keep reasonable consecutive segments (since you downsampled to 60s)
seg = seg[(seg["dt_s"] > 0) & (seg["dt_s"] <= 5*60)].copy()

# Distance and midpoints (used later for met colocation)
seg["dist_km"] = haversine_km(seg["lat"], seg["lon"], seg["lat2"], seg["lon2"])
seg["mid_time"] = ((seg["time"] + seg["time2"]) // 2).astype("int64")
seg["mid_lat"]  = (seg["lat"] + seg["lat2"]) / 2
seg["mid_lon"]  = (seg["lon"] + seg["lon2"]) / 2
seg["mid_alt_m"] = (seg["baroaltitude"] + seg["alt2"]) / 2

segments = seg[["flight_id","mid_time","mid_lat","mid_lon","mid_alt_m","dt_s","dist_km"]].reset_index(drop=True)

print("Flights:", p["flight_id"].nunique())
print("Segments:", len(segments))
print("Mean segment length (km):", segments["dist_km"].mean())

# Save
from pathlib import Path
# OUT_SEG = Path("data/processed/opensky_segments_europe_winter_day.parquet")
OUT_SEG = ROOT /"data" / "processed" / "opensky_segments_europe_winter_day.parquet"
OUT_SEG.parent.mkdir(parents=True, exist_ok=True)
segments.to_parquet(OUT_SEG, index=False)
print("Saved:", OUT_SEG)


Flights: 22026
Segments: 1336164
Mean segment length (km): 13.557601611162
Saved: C:\Users\HiWi\Desktop\Terril\01_nextcloud\Germany\DATA SCIENCE\Semesters\05\02 Sustainability in aviation\03 contrail-mvp\data\processed\opensky_segments_europe_winter_day.parquet


In [24]:
segments.head(20)

Unnamed: 0,flight_id,mid_time,mid_lat,mid_lon,mid_alt_m,dt_s,dist_km
0,008ba5_OPM007_1.0,1736754704,50.464413,-0.393166,8164.83,31,7.231921
1,008ba5_OPM007_1.0,1736754750,50.372501,-0.339611,8667.75,60,14.612554
2,008ba5_OPM007_1.0,1736754810,50.245132,-0.275096,9277.35,60,15.158267
3,008ba5_OPM007_1.0,1736754870,50.112259,-0.21192,9776.46,60,15.735089
4,008ba5_OPM007_1.0,1736754930,49.973442,-0.155471,10264.14,60,16.179957
5,008ba5_OPM007_1.0,1736754990,49.830734,-0.102925,10797.54,60,16.437154
6,008ba5_OPM007_1.0,1736755050,49.685794,-0.049853,11369.04,60,16.685654
7,008ba5_OPM007_1.0,1736755110,49.540356,0.003006,11883.39,60,16.542956
8,008ba5_OPM007_1.0,1736755170,49.395807,0.055365,12283.44,61,16.481961
9,008ba5_OPM007_1.0,1736755230,49.249077,0.108032,12729.21,59,17.030514


In [26]:
fid = "008ba5_OPM007_1.0"

pts = points[points["flight_id"] == fid].sort_values("time").head(5)
pts


Unnamed: 0,time,icao24,callsign,lat,lon,baroaltitude,flight_id,lat2,lon2,time2,alt2
1,1736754689,008ba5,OPM007,50.493805,-0.415023,8008.62,008ba5_OPM007_1.0,50.43502,-0.371308,1736755000.0,8321.04
2,1736754720,008ba5,OPM007,50.43502,-0.371308,8321.04,008ba5_OPM007_1.0,50.309981,-0.307914,1736755000.0,9014.46
3,1736754780,008ba5,OPM007,50.309981,-0.307914,9014.46,008ba5_OPM007_1.0,50.180283,-0.242277,1736755000.0,9540.24
4,1736754840,008ba5,OPM007,50.180283,-0.242277,9540.24,008ba5_OPM007_1.0,50.044235,-0.181564,1736755000.0,10012.68
5,1736754900,008ba5,OPM007,50.044235,-0.181564,10012.68,008ba5_OPM007_1.0,49.902649,-0.129378,1736755000.0,10515.6


# ERA5 preprocessing to parquet

In [2]:
from pathlib import Path
import xarray as xr
import pandas as pd
import numpy as np

# --- Input (your file) ---
ERA5_PATH = ROOT / "data/raw/weather_ERA5/states_europe/2025-01-13/era5_pl_europe_T_q.nc"
assert ERA5_PATH.exists(), f"Not found: {ERA5_PATH}"

# --- Outputs (same file copied to interim + processed) ---
out_interim = ROOT / "data/interim/weather_ERA5/states_europe/2025-01-13_2025-01-14/era5_pl_europe_20250113_T_q.parquet"
out_processed = ROOT / "data/processed/weather_ERA5/states_europe/2025-01-13_2025-01-14/era5_pl_europe_20250113_T_q.parquet"
out_interim.parent.mkdir(parents=True, exist_ok=True)
out_processed.parent.mkdir(parents=True, exist_ok=True)

# --- Load NetCDF ---
ds = xr.open_dataset(ERA5_PATH)

# Detect coordinate names robustly
lat_name  = "latitude" if "latitude" in ds.coords else "lat"
lon_name  = "longitude" if "longitude" in ds.coords else "lon"
time_name = "valid_time" if "valid_time" in ds.coords else ("time" if "time" in ds.coords else None)
if time_name is None:
    raise ValueError(f"Could not find time coord. coords={list(ds.coords)}")

# Keep only what we need
# ERA5 pressure-level netcdf usually uses t (K) and q (kg/kg)
need_vars = []
for v in ["t", "q"]:
    if v in ds.data_vars:
        need_vars.append(v)
    else:
        raise ValueError(f"Variable '{v}' not found. Available: {list(ds.data_vars)}")

ds_small = ds[need_vars].rename({lat_name: "lat", lon_name: "lon", time_name: "time"})

# Normalize longitude to [-180, 180] so it matches your OpenSky segments lon
lon = ds_small["lon"].values
if np.nanmax(lon) > 180:
    ds_small = ds_small.assign_coords(lon=(((ds_small["lon"] + 180) % 360) - 180)).sortby("lon")

# Flatten to a table
df = ds_small.to_dataframe().reset_index()

# Standardize column names
# pressure_level coord exists in ERA5 PL netcdf; keep as hPa
if "pressure_level" in df.columns:
    df = df.rename(columns={"pressure_level": "plev_hpa"})
elif "level" in df.columns:
    df = df.rename(columns={"level": "plev_hpa"})
else:
    raise ValueError(f"No pressure level column found. Columns: {df.columns.tolist()}")

df = df.rename(columns={"t": "T_K", "q": "q_kgkg"})

# Keep only required columns (compact)
df = df[["time", "plev_hpa", "lat", "lon", "T_K", "q_kgkg"]]

# Optional: small dtype optimizations (saves space)
df["plev_hpa"] = df["plev_hpa"].astype("int16")
df["lat"] = df["lat"].astype("float32")
df["lon"] = df["lon"].astype("float32")
df["T_K"] = df["T_K"].astype("float32")
df["q_kgkg"] = df["q_kgkg"].astype("float32")

# Save to both locations
df.to_parquet(out_interim, index=False)
df.to_parquet(out_processed, index=False)

print("Saved interim :", out_interim, "| rows:", f"{len(df):,}")
print("Saved processed:", out_processed, "| rows:", f"{len(df):,}")
df.head()


Saved interim : C:\Users\HiWi\Desktop\Terril\01_nextcloud\Germany\DATA SCIENCE\Semesters\05\02 Sustainability in aviation\03 contrail-mvp\data\interim\weather_ERA5\states_europe\2025-01-13_2025-01-14\era5_pl_europe_20250113_T_q.parquet | rows: 3,593,880
Saved processed: C:\Users\HiWi\Desktop\Terril\01_nextcloud\Germany\DATA SCIENCE\Semesters\05\02 Sustainability in aviation\03 contrail-mvp\data\processed\weather_ERA5\states_europe\2025-01-13_2025-01-14\era5_pl_europe_20250113_T_q.parquet | rows: 3,593,880


Unnamed: 0,time,plev_hpa,lat,lon,T_K,q_kgkg
0,2025-01-13,350,72.0,-15.0,229.437225,0.000145
1,2025-01-13,350,72.0,-14.75,229.500702,0.000147
2,2025-01-13,350,72.0,-14.5,229.565155,0.000149
3,2025-01-13,350,72.0,-14.25,229.633514,0.000151
4,2025-01-13,350,72.0,-14.0,229.706757,0.000152
