In [1]:
import pandas as pd
import numpy as np
import os
import tarfile
import urllib.request
import gzip
import shutil
import re
from tqdm import tqdm
from sklearn.preprocessing import StandardScaler

In [2]:
airline_icao_codes = [
    # Star Alliance
    'AEE', 'ACA', 'CCA', 'AIC', 'ANZ', 'ANA', 'AAR', 'AUA', 'AVA', 'BEL', 'CMP', 'CTN', 
    'MSR', 'ETH', 'EVA', 'LOT', 'DLH', 'CSZ', 'SIA', 'SAA', 'SWR', 'TAP', 'THA', 'THY',

    # Oneworld
    'BAW', 'CPA', 'FJI', 'FIN', 'IBE', 'JAL', 'MAS', 'QFA', 'QTR', 'RAM', 'RJA', 'ALK',

    # SkyTeam
    'ARG', 'AMX', 'AEA', 'AFR', 'CAL', 'CES', 'GIA', 'KQA', 'KLM', 'KAL', 'MEA', 'SVA',
    'SAS', 'ROT', 'HVN', 'VIR', 'CXA', 'AFL', 

    # Other flag carriers
    'BBC', 'TAM', 'EIN', 'ELY', 'BWA', 'PIA', 'ETD', 'UAE', 'TUA', 'UZB', 'VCV', 'PAL', 
    'MGL', 'KZR', 'GFA', 'AUI', 'TAR', 'DAH',

    # Low cost carriers
    'RYR', 'IGO', 'EZY', 'AXM', 'GLO', 'NOZ', 'VLG', 'WZZ', 'JST',

    # U.S. carriers
    'UAL', 'AAL', 'DAL', 'ENY'
]

In [None]:
#import pandas as pd
#import numpy as np
#import os
#import tarfile
#import urllib.request
#import gzip
#import shutil
#import re
#from sklearn.preprocessing import StandardScaler

def is_valid_callsign(callsign):
    if not isinstance(callsign, str):
        return False

    callsign = callsign.strip().upper()

    # must be at least 4 chrs (eg 'UAL1') and at most 8
    if not (4 <= len(callsign) <= 8):
        return False

    prefix = callsign[:3]
    suffix = callsign[3:]

    # prefix must be a major intl airline
    if prefix not in airline_icao_codes:
        return False   

    # suffix: 1–4 digits, possibly ending in 1 letter
    if not re.fullmatch(r'\d{1,4}[A-Z]?', suffix):
        return False

    return True

BASE_URL = "https://s3.opensky-network.org/data-samples/states/.2019-07-15"
HOURS = [f"{h:02d}" for h in range(24)]
MASTER_DF = []

for h in tqdm(HOURS, desc = 'Processing OpenSky data'): 
    # define download URL
    filename = f"states_2019-07-15-{h}.csv.tar"
    url = f"{BASE_URL}/{h}/{filename}"
    local_tar = f"./temp/{filename}"
    
    # download tarball from OpenSky
    os.makedirs("./temp", exist_ok=True)
    urllib.request.urlretrieve(url, local_tar)

    # extract .csv.gz
    with tarfile.open(local_tar, "r") as tar:
        tar.extractall("./temp")
        
    for name in os.listdir("./temp"):
        if name.endswith(".csv.gz") and name.startswith("states_2019-07-15"):
            gz_path = f"./temp/{name}"
            csv_path = gz_path[:-3]
    
            # decompress .gz
            with gzip.open(gz_path, 'rb') as f_in:
                with open(csv_path, 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)
    
            # load to pandas and filter
            df = pd.read_csv(csv_path)
            df.dropna(subset=["time", "callsign", "lat", "lon", "velocity", "heading", "baroaltitude"], inplace=True)
            MASTER_DF.append(df)
            os.remove(gz_path)
            os.remove(csv_path)

    os.remove(local_tar)

master_df = pd.concat(MASTER_DF, ignore_index=True)

master_df["callsign"] = master_df["callsign"].str.strip()
    
# transform heading to sin/cos 
heading_rad = np.deg2rad(master_df["heading"].values)
sin_heading = np.sin(heading_rad)
cos_heading = np.cos(heading_rad)
master_df["sin_heading"] = sin_heading
master_df["cos_heading"] = cos_heading
    
# remove invalid callsigns
print("Filtering callsigns...")
valid_mask = master_df['callsign'].apply(is_valid_callsign)
cleaned_data = master_df[valid_mask].copy()

# scale altitude and velocity globally
print("Scaling features...")
scaler = StandardScaler()
scaler.fit(cleaned_data[["velocity", "baroaltitude"]].values)
velocity = cleaned_data["velocity"].values.reshape(-1, 1)
altitude = cleaned_data["baroaltitude"].values.reshape(-1, 1)
scaled = scaler.transform(np.hstack([velocity, altitude]))
velocity_z = scaled[:, 0]
altitude_z = scaled[:, 1]
cleaned_data.loc[:, "velocity"] = velocity_z
cleaned_data.loc[:, "altitude"] = altitude_z

cleaned_filtered_data = cleaned_data[["time", "callsign", "lat", "lon", "velocity", "sin_heading", "cos_heading", "altitude"]]

# save locally as parquet (~11M rows, ~450MB)
cleaned_filtered_data.to_parquet("data/opensky_2019-07-15.parquet")
print("All hourly files processed and combined.")

In [3]:
master_data = pd.read_parquet("data/opensky_2019-07-15.parquet")
print(master_data.shape)

(17276605, 8)


In [6]:
MAX_FLIGHT_DURATION = 8 * 3600  # 8 hours in seconds
RESAMPLE_INTERVAL = 60  # seconds
TARGET_LENGTH = 200 # target vector dimension for similarity

def preprocess_flight(df_flight):
    df_flight = df_flight.sort_values("time")
    duration = df_flight["time"].iloc[-1] - df_flight["time"].iloc[0]
    
    if duration > MAX_FLIGHT_DURATION:
        return None
    
    start_time = df_flight["time"].iloc[0]
    df_flight["elapsed"] = df_flight["time"] - start_time
    # resample to 200 steps
    idxs = np.linspace(0, len(df_flight) - 1, TARGET_LENGTH).astype(int)
    df_flight = df_flight.iloc[idxs]
    
    origin_lat = df_flight.iloc[0]["lat"]
    origin_lon = df_flight.iloc[0]["lon"]
    df = df_flight.copy()
    df["delta_lat"] = df["lat"] - origin_lat
    df["delta_lon"] = df["lon"] - origin_lon

    return df[["delta_lat", "delta_lon", "velocity", "sin_heading", "cos_heading", "altitude"]]

def extract_flight(data, callsign):
    return data[data["callsign"] == callsign]

In [None]:
sfo_lax = ['UAL2757', 'DAL409', 'UAL1200', 'UAL257', 'DAL2861', 'UAL613', 'DAL664', 'UAL256', 'DAL2600', 'AAL1851']
den_ord = ['UAL1938', 'UAL781', 'UAL301', 'AAL2771', 'UAL532', 'UAL682', 'AAL2780', 'UAL336', 'AAL2470', 'AAL773']
dfw_atl = ['DAL2010', 'AAL2749', 'DAL1890', 'DAL1966', 'AAL1309', 'DAL2310', 'DAL2269', 'AAL333', 'DAL1513', 'AAL2403']
dca_bos = ['AAL2150', 'AAL2148', 'AAL2160', 'AAL2169', 'AAL2139', 'AAL2170', 'AAL2119', 'AAL2149', 'AAL2120', 'AAL2134']
ord_lga = ['UAL1823', 'UAL1606', 'AAL129', 'AAL398', 'AAL527', 'DAL379', 'DAL585', 'UAL509', 'DAL2775', 'UAL639'] 
lax_jfk = ['AAL10', 'DAL1908', 'DAL1436', 'AAL118', 'DAL1258', 'AAL2', 'DAL2164', 'AAL238', 'AAL4', 'DAL816']
iah_ord = ['UAL2131', 'UAL1854', 'UAL2246', 'ENY3331', 'UAL1835', 'ENY3621', 'UAL1403', 'UAL1160', 'AAL869', 'UAL1899']
sfo_sea = ['UAL800', 'DAL2787', 'UAL2161', 'DAL0856', 'UAL1074', 'UAL351', 'UAL618', 'DAL2490', 'DAL2429', 'DAL1470']
atl_mco = ['DAL1418', 'DAL863', 'DAL804', 'DAL1883', 'DAL1905', 'DAL2428', 'DAL897', 'DAL768', 'DAL1118', 'DAL186']
lax_atl = ['DAL1901', 'AAL1071', 'DAL2213', 'DAL2270', 'DAL954', 'DAL1592', 'DAL2714', 'DAL1954', 'DAL1140', 'DAL516']
routes = [sfo_lax, den_ord, dfw_atl, dca_bos, ord_lga, lax_jfk, iah_ord, sfo_sea, atl_mco, lax_atl]

In [None]:
def aggregate_route(data, callsigns):
    matrices = []
    for call in callsigns:
        df_flight = extract_flight(data, call)
        traj = preprocess_flight(df_flight)
        if traj is None:
            continue
        matrices.append(traj)

    if len(matrices) == 0:
        return None

    # shape: (num_flights, 200, 6)
    stacked = np.stack(matrices, axis=0)
    aggregate = stacked.mean(axis=0) 

    return aggregate

# vectors for 10 trunk routes
trunk_vectors = []
for r in tqdm(routes, desc = 'Aggregating Trunk Routes'):
    agg = aggregate_route(master_data, r)
    if agg is not None:
        trunk_vectors.append(agg)

np.save('data/trunk_vectors', trunk_vectors)

# remove flights by US carriers after aggregation
def is_non_us_callsign(callsign):
    prefix = callsign[:3]
    if prefix in ['UAL', 'DAL', 'AAL', 'ENY']:
        return False
    return True

us_mask = master_data['callsign'].apply(is_non_us_callsign)
master_data = master_data[us_mask].copy()
print(master_data.shape)

In [None]:
# vectors for ~22k flights in dataset
import pickle

flight_vectors = {}
all_callsigns = np.unique(master_data['callsign'])

for c in tqdm(all_callsigns):
    df = extract_flight(master_data, c)
    vec = preprocess_flight(df)
    if vec is not None:
        flight_vectors[c] = vec

# Save clean dictionary
with open('data/flight_vectors.pkl', 'wb') as f:
    pickle.dump(flight_vectors, f)

print(f"Saved {len(flight_vectors)} valid vectors to data/flight_vectors.pkl")

'''
for c in tqdm(range(len(all_callsigns))):
    vec = preprocess_flight(extract_flight(master_data, all_callsigns[c]))
    flight_vectors[all_callsigns[c]] = vec

np.savez('data/flight_vectors', flight_vectors)
'''

  0%|▏                                    | 109/22618 [01:06<3:45:23,  1.66it/s]

In [None]:
trunk_vectors = np.load('data/trunk_vectors.npy')
with open("flight_vectors.pkl", "rb") as f:
    flight_vectors = pickle.load(f)

In [None]:
raw = np.load('data/flight_vectors.npz', allow_pickle = True)
flight_vectors = raw['arr_0'].item()

In [None]:
from fastdtw import fastdtw
from scipy.spatial.distance import euclidean
import heapq
from collections import defaultdict

def compute_dtw(vec1, vec2):
    score, _ = fastdtw(vec1, vec2, dist=euclidean)
    return score

def moved_enough(vec, min_displacement_km=100):
    # roughly scaled for lat/lon in degrees (~111 km per deg)
    start = vec.iloc[0, :2]
    end = vec.iloc[-1, :2]
    displacement_deg = np.linalg.norm(end - start)
    displacement_km = displacement_deg * 111  # approximation
    return displacement_km >= min_displacement_km

def top_k_dtw(trunk_vectors, flight_vectors, k=5):
    topk = defaultdict(list)

    for callsign, vec in tqdm(flight_vectors.items(), desc="Flight vectors"):
        alt_start = vec.iloc[0,5]
        alt_end = vec.iloc[-1,5]

        # valid flights must have low starting/ending altitude and traveled at least 100 km
        if alt_start >= 0 or alt_end >= 0 or not moved_enough(vec):
            continue
        
        for i, trunk_vec in enumerate(trunk_vectors):
            score = compute_dtw(vec, trunk_vec)
            # use minheap with negative score to simulate maxheap
            if len(topk[i]) < k:
                heapq.heappush(topk[i], (-score, callsign))
            else:
                heapq.heappushpop(topk[i], (-score, callsign))

    for i in topk:
        topk[i].sort()

    return topk
    

In [None]:
flight_vectors = {k: v for k, v in flight_vectors.items() if v is not None}

In [None]:
topk = top_k_dtw(trunk_vectors, flight_vectors)

In [None]:
from pprint import pp

In [None]:
pp(topk)

In [None]:
import json

def serialize_json(topk_dict, trunk_labels, out_path):
    serializable = {
        trunk_labels[i]: [
            {"callsign": callsign, "dtw_score": -score}
            for score, callsign in sorted(results)
        ]
        for i, results in topk_dict.items()
    }

    with open(out_path, "w") as f:
        json.dump(serializable, f, indent = 2)

    print(f"saved top-k results to {out_path}")

trunk_labels = {
  0: "sfo_lax",
  1: "den_ord",
  2: "lax_jfk",
  3: "atl_dfw",
  4: "bos_dca",
  5: "jfk_ord",
  6: "iah_ord",
  7: "sfo_sea",
  8: "mco_atl",
  9: "lax_atl"
}


In [None]:
serialize_topk(topk, trunk_labels, out_path="data/results/topk_dtw.json")