In [60]:
#!/usr/bin/env python3
import json
import math
import os
import sys

import psycopg2

DSN = os.environ.get("POSTGRES_DSN", "postgresql://postgres:Warqi4-sywsow-zozfyc@tracer-db.cb80eku2emy0.eu-north-1.rds.amazonaws.com:5432/tracer")
R = 6371.0

# Bounding box for filtering flight data
BBOX_NORTH = 34.597042
BBOX_SOUTH = 28.536275
BBOX_WEST = 32.299805
BBOX_EAST = 37.397461

def haversine_km(lat1, lon1, lat2, lon2):
    a = math.radians(lat1)
    b = math.radians(lat2)
    
    dlat = math.radians(lat2 - lat1)
    dlon = math.radians(lon2 - lon1)
    x = math.sin(dlat / 2) ** 2 + math.cos(a) * math.cos(b) * math.sin(dlon / 2) ** 2
    c = 2 * math.atan2(math.sqrt(x), math.sqrt(1 - x))
    return R * c

def bearing_deg(lat1, lon1, lat2, lon2):
    y = math.sin(math.radians(lon2 - lon1)) * math.cos(math.radians(lat2))
    x = math.cos(math.radians(lat1)) * math.sin(math.radians(lat2)) - math.sin(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.cos(math.radians(lon2 - lon1))
    return (math.degrees(math.atan2(y, x)) + 360) % 360

def angle_diff(a, b):
    d = (a - b + 180) % 360 - 180
    return abs(d)

def get_conn():
    return psycopg2.connect(DSN)

SEC_FOR_6KM, KM_FOR_WINDOW_BASE = 30, 5

def window_sec(chunk_half_km):
    return SEC_FOR_6KM * (2 * chunk_half_km) / KM_FOR_WINDOW_BASE

def fetch_feedback_track(conn, flight_id):
    """Load track for one flight from feedback.flight_tracks. Returns list of (timestamp, lat, lon, alt)."""
    cur = conn.cursor()
    cur.execute(
        "SELECT timestamp, lat, lon, alt, gspeed, vspeed FROM research.normal_tracks WHERE flight_id = %s ORDER BY timestamp ASC",
        (flight_id,),
    )
    rows = cur.fetchall()
    out = []
    for r in rows:
        lat = float(r[1])
        lon = float(r[2])
        # Filter points within bounding box
        if not (BBOX_SOUTH <= lat <= BBOX_NORTH and BBOX_WEST <= lon <= BBOX_EAST):
            continue
        ts = float(r[0]) if isinstance(r[0], (int, float)) else r[0].timestamp()
        alt_val = float(r[3]) if r[3] is not None else 0.0
        gspeed = float(r[4]) if r[4] is not None else None
        vspeed = float(r[5]) if r[5] is not None else None
        out.append((flight_id, ts, lat, lon, alt_val, vspeed, gspeed))
    return out

def fetch_normal_track(conn, flight_id):
    cur = conn.cursor()
    cur.execute(
        "SELECT timestamp, lat, lon, alt, gspeed, vspeed FROM feedback.flight_tracks WHERE flight_id = %s ORDER BY timestamp ASC",
        (flight_id,),
    )
    rows = cur.fetchall()
    out = []
    for r in rows:
        lat = float(r[1])
        lon = float(r[2])
        # Filter points within bounding box
        if not (BBOX_SOUTH <= lat <= BBOX_NORTH and BBOX_WEST <= lon <= BBOX_EAST):
            continue
        ts = float(r[0]) if isinstance(r[0], (int, float)) else r[0].timestamp()
        alt_val = float(r[3]) if r[3] is not None else 0.0
        gspeed = float(r[4]) if r[4] is not None else None
        vspeed = float(r[5]) if r[5] is not None else None
        out.append((flight_id, ts, lat, lon, alt_val, vspeed, gspeed))
    return out

def fetch_normal_tracks_between_origin_dest(conn, origin, dest):
    query = """
    select n.timestamp,
        n.lat,
        n.lon,
        n.alt,
        n.gspeed,
        n.vspeed,
        n.flight_id
    FROM research.normal_tracks n
    JOIN research.flight_metadata fm
    on n.flight_id = fm.flight_id
    where origin_airport = %s
    and destination_airport = %s
    and n.lat between %s and %s
    and n.lon between %s and %s
    """

    cur = conn.cursor()
    cur.execute(
        query,
        (origin, dest, BBOX_SOUTH, BBOX_NORTH, BBOX_WEST, BBOX_EAST)
    )
    rows = cur.fetchall()
    out = []
    for r in rows:
        ts = float(r[0]) if isinstance(r[0], (int, float)) else r[0].timestamp()
        alt_val = float(r[3]) if r[3] is not None else 0.0
        gspeed = float(r[4]) if r[4] is not None else None
        vspeed = float(r[5]) if r[5] is not None else None
        flight_id = r[6]
        out.append((flight_id, ts, float(r[1]), float(r[2]), alt_val, vspeed, gspeed))
    return out




In [None]:
!apt-get install libsqlite3-mod-spatialite # for sqlite geographic functions
!brew install libsqlite3-mod-spatialite # for sqlite geographic functions

[34m==>[0m [1mAuto-updating Homebrew...[0m
Adjust how often this is run with `$HOMEBREW_AUTO_UPDATE_SECS` or disable with
`$HOMEBREW_NO_AUTO_UPDATE=1`. Hide these hints with `$HOMEBREW_NO_ENV_HINTS=1` (see `man brew`).
[34m==>[0m [1mAuto-updated Homebrew![0m
Updated 2 taps (homebrew/core and homebrew/cask).
[34m==>[0m [1mNew Formulae[0m
actions-up: Tool to update GitHub Actions to latest versions with SHA pinning
agent-browser: Browser automation CLI for AI agents
arcadedb: Multi-Model DBMS: Graph, Document, Key/Value, Search, Time Series, Vector
cargo-features-manager: TUI like cli tool to manage the features of your rust-project dependencies
codex-acp: Use Codex from ACP-compatible clients such as Zed!
cozyhr: Cozy wrapper around Helm and Flux CD for local development
dbcsr: Distributed Block Compressed Sparse Row matrix library
go-air: Live reload for Go apps
ic-wasm: CLI tool for performing Wasm transformations specific to ICP canisters
icp-cli: Development tool for bui

In [61]:
import sqlite3

def setup_flight_db(db_name="flights.db"):
    """
    Initializes the SQLite DB and returns the connection object.
    """
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()
    # Enable WAL mode for better performance with concurrent reads/writes
    cursor.execute("PRAGMA journal_mode=WAL;")

    cursor.execute("""
        CREATE TABLE IF NOT EXISTS flight_tracks (
            flight_id TEXT,
            timestamp INTEGER,
            latitude REAL,
            longitude REAL,
            altitude REAL,
            gspeed REAL,
            vspeed REAL
        )
    """)

    # Composite index for spatial bounding box queries
    cursor.execute("CREATE INDEX IF NOT EXISTS idx_lat_lon ON flight_tracks (latitude, longitude)")

    # Composite index for time-series analysis per flight
    # cursor.execute("CREATE INDEX IF NOT EXISTS idx_flight_time ON flight_tracks (flight_id, timestamp)")

    conn.commit()
    return conn

def insert_flights(conn, data_list):
  """
  Inserts a list of flight records using an existing connection.
  data_list should be a list of tuples: (id, ts, lat, lon, alt, gspd, vspd)
  """
  query = "INSERT INTO flight_tracks VALUES (?, ?, ?, ?, ?, ?, ?)"

  try:
      cursor = conn.cursor()
      cursor.executemany(query, data_list)
      conn.commit()
  except sqlite3.Error as e:
      print(f"An error occurred during insertion: {e}")
      conn.rollback()

def delete_records_by_flight_id(conn, flight_id):
    """
    Permanently removes all rows matching the given flight_id.
    """
    query = "DELETE FROM flight_tracks WHERE flight_id = ?"

    try:
        cursor = conn.cursor()

        # Execute the deletion
        cursor.execute(query, (flight_id,))

        # Get the count of deleted rows (optional but helpful)
        deleted_count = cursor.rowcount

        # Commit the changes to the database
        conn.commit()

        print(f"Successfully deleted {deleted_count} rows for flight_id: {flight_id}")
        return deleted_count

    except sqlite3.Error as e:
        print(f"An error occurred during deletion: {e}")
        conn.rollback()
        return 0

def clear_flight_data(conn):
    """
    Removes all rows from the flight_tracks table.
    The table structure and indexes remain intact.
    """
    try:
        cursor = conn.cursor()

        # This removes all data rows
        cursor.execute("DELETE FROM flight_tracks")

        conn.commit()

        # Optional: Reset the file size
        # cursor.execute("VACUUM")

        print("All rows deleted. Table structure and indexes preserved.")

    except sqlite3.Error as e:
        print(f"An error occurred while clearing data: {e}")
        conn.rollback()

sqlite_conn = setup_flight_db(db_name="flights.db")

sqlite_conn.enable_load_extension(True)
sqlite_conn.load_extension("/opt/homebrew/lib/mod_spatialite.dylib")
# Initialize the spatial metadata (only needed once)
sqlite_conn.execute("SELECT InitSpatialMetadata(1);")

InitSpatiaMetaData() error:"table spatial_ref_sys already exists"


<sqlite3.Cursor at 0x140e5d7c0>

In [62]:
#postgres_conn.close()
postgres_conn = get_conn()

In [63]:
from collections import deque
import numpy as np
from scipy import stats
from enum import Enum
import math
from dataclasses import dataclass

class EventState(Enum):
    NORMAL = 1
    WARNING = 2
    STANDBY = 3

@dataclass
class FlightDataPoint:
  flight_id: str
  timestamp: int
  latitude: float
  longitude: float
  altitude: float
  gspeed: float
  vspeed: float

class WarningQueue:
  def __init__(self, max_size=15, warning_percentage_threshold=0.8):
    self._queue = deque()
    self._max_size = max_size
    self._warning_percentage_threshold = warning_percentage_threshold

  def add_event_state(self, event_state):
    if event_state is None:
      return
    self._queue.append(event_state)
    if len(self._queue) > self._max_size:
      self._queue.popleft()
    return self.queue_state()

  def queue_state(self):
    if len(self._queue) == self._max_size and sum(1 for q in self._queue if q == EventState.WARNING) >= self._warning_percentage_threshold * len(self._queue):
      return EventState.WARNING
    return EventState.NORMAL

def probability_of_data_point(lst, x):
  mean = np.mean(lst)
  std = np.std(lst)

  if std <= 0:
        raise ValueError("Standard deviation must be positive.")

  # 1. Find the absolute distance from the mean in units of standard deviation
  z_score = np.abs(x - mean) / std

  # 2. Use the survival function (sf), which is 1 - cdf.
  # This gives the area of the tail further away than our z-score.
  one_tail_prob = stats.norm.sf(z_score)

  # 3. Double it because the distribution is symmetric (the other tail is equally unlikely)
  total_prob = 2 * one_tail_prob

  return total_prob



class FlightTracker:
  def __init__(self, postgres_conn, sqlite_conn, max_track_proximity_threshold=5, minimal_close_tracks_for_decision=3):
    self._postgres_conn = postgres_conn
    self._sqlite_conn = sqlite_conn
    self._max_track_proximity_threshold = max_track_proximity_threshold
    self._minimal_close_tracks_for_decision = minimal_close_tracks_for_decision
    self.gspeed_queue = WarningQueue()
    self.vspeed_queue = WarningQueue()

  def setup_sqlite(self, origin, dest):
    clear_flight_data(self._sqlite_conn)
    normal_tracks = fetch_normal_tracks_between_origin_dest(self._postgres_conn, origin, dest)
    insert_flights(self._sqlite_conn, normal_tracks)

 

  def retrieve_closest_tracks_points(self, x: FlightDataPoint):
    query = """
    WITH DistanceAnalysis AS (
      SELECT
          *,
          -- 4326 is the standard WGS84 (GPS) coordinate system
          ST_Distance(
              MakePoint(longitude, latitude, 4326),
              MakePoint(:my_lon, :my_lat, 4326),
              1 -- The '1' tells SpatiaLite to use exact Geodesic (ellipsoid) distance
          ) AS exact_dist_meters,
          -- Rank rows for each flight. #1 is the shortest distance.
          ROW_NUMBER() OVER (
              PARTITION BY flight_id
              ORDER BY ST_Distance(
                  MakePoint(longitude, latitude, 4326),
                  MakePoint(:my_lon, :my_lat, 4326)
              ) ASC
          ) as rank
      FROM flight_tracks ft

      WHERE latitude between :lat_low and :lat_high
      AND longitude between :lon_low and :lon_high
    )
    SELECT
        flight_id,
        timestamp,
        latitude,
        longitude,
        altitude,
        gspeed,
        vspeed,
        exact_dist_meters / 1000.0 AS dist_km
    FROM DistanceAnalysis
    WHERE rank = 1;
  """
    km_per_deg = 111.32
    lat_m = self._max_track_proximity_threshold * 1.2 / km_per_deg
    lon_m = lat_m / max(0.01, math.cos(math.radians(x.latitude)))

    lat_low = x.latitude - lat_m
    lat_high = x.latitude + lat_m
    lon_low = x.longitude - lat_m
    lon_high = x.longitude + lat_m

    cursor = self._sqlite_conn.cursor()
    params = {'my_lon': x.longitude,
              'my_lat': x.latitude,
              'lat_low': lat_low,
              'lat_high': lat_high,
              'lon_low': lon_low,
              'lon_high': lon_high}

    cursor.execute(query, params)
    rows = cursor.fetchall()
    return [(FlightDataPoint(*row[:-1]), row[-1]) for row in rows] # list[tuple[FlightDataPoint, float(distance in km)]]

  # returns:
  #  dictionary of test results
  # dictionary of metadata
  def recieve_single_flight_data(self, x: FlightDataPoint):
    metadata = {}
    gspeed_state = EventState.NORMAL
    vspeed_state = EventState.NORMAL

    closest_points = self.retrieve_closest_tracks_points(x)
    closest_points = [f[0] for f in closest_points if f[1] <= self._max_track_proximity_threshold]

    closest_tracks_count = len(closest_points)
    metadata['closest_tracks_count'] = closest_tracks_count

    if closest_tracks_count < self._minimal_close_tracks_for_decision:
      return None , metadata # not enough data for data driven decision

    gspeeds = [t.gspeed for t in closest_points]
    vspeeds = [t.vspeed for t in closest_points]

    if np.std(gspeeds) > 1.0 and probability_of_data_point(gspeeds, x.gspeed) < 0.01:
      gspeed_state = EventState.WARNING

    if np.std(vspeeds) > 1.0 and probability_of_data_point(vspeeds, x.vspeed) < 0.01:
      vspeed_state = EventState.WARNING

    gspeed_state = self.gspeed_queue.add_event_state(gspeed_state)
    vspeed_state = self.vspeed_queue.add_event_state(vspeed_state)

    return dict(
        gspeed_state = gspeed_state,
        vspeed_state = vspeed_state
    ), metadata


In [64]:
from tqdm.auto import tqdm

def monitor_flight(postgres_conn,
                   sqlite_conn,
                   origin,
                   destination,
                   flight_iterable,
                   id_to_delete=None,
                   to_tqdm=False,
                   minimum_flight_duration_for_warning = 5.0,
                   minimum_initial_tracks_for_immediate_warning=10, flight_start_time=None):
  tracker = FlightTracker(postgres_conn, sqlite_conn)
  tracker.setup_sqlite(origin, destination)

  if id_to_delete:
    delete_records_by_flight_id(sqlite_conn, id_to_delete)

  flight_enumerable = enumerate(flight_iterable)
  if to_tqdm:
    flight_enumerable = tqdm(flight_enumerable)

  initial_closest_tracks_count = None
  has_warning_been_issued = False

  for i, flight_data in flight_enumerable:
    flight_data: FlightDataPoint
    tests, metadata = tracker.recieve_single_flight_data(flight_data)

    if tests is None:
      final_state = EventState.WARNING if has_warning_been_issued else EventState.STANDBY
      final_state_reason = "did not meet minimum conditions for data driven test"
      yield final_state, final_state_reason
      continue

    closest_tracks_count = metadata['closest_tracks_count']
    gspeed_state = tests['gspeed_state']
    vspeed_state = tests['vspeed_state']
    print(i)
    if i == 0:
      
      initial_closest_tracks_count = closest_tracks_count
      flight_start_time = flight_data.timestamp

    current_flight_duration_minutes = (flight_data.timestamp - flight_start_time) / 60.0

    if closest_tracks_count == 0 and current_flight_duration_minutes >= minimum_flight_duration_for_warning:
      if has_warning_been_issued:
        final_state = EventState.WARNING
        final_state_reason = "initial tracks: {}, closest tracks count {}".formal(initial_closest_tracks_count, closest_tracks_count)
        has_warning_been_issued = True
        yield final_state, final_state_reason
        continue

      if initial_closest_tracks_count >= minimum_initial_tracks_for_immediate_warning and current_flight_duration_minutes > minimum_flight_duration_for_warning:
        final_state = EventState.WARNING
        final_state_reason = "initial tracks: {}, closest tracks count {}".formal(initial_closest_tracks_count, closest_tracks_count)
        has_warning_been_issued = True
        yield final_state, final_state_reason
        continue

    elif closest_tracks_count == 0:
      final_state = EventState.STANDBY
      final_state_reason = "initial tracks: {}, closest tracks count {} but too close to takeoff".formal(initial_closest_tracks_count, closest_tracks_count)
      yield final_state, final_state_reason
      continue

    if gspeed_state == EventState.NORMAL and vspeed_state == EventState.NORMAL:
      final_state = EventState.NORMAL
      final_state_reason = "all tests passed"
      yield final_state, final_state_reason
      continue

    if current_flight_duration_minutes <= minimum_flight_duration_for_warning:
      final_state = EventState.STANDBY
      final_state_reason = "gspeed_state: {}, vspeed_state: {} but too close to takeoff".format(gspeed_state, vspeed_state)
      yield final_state, final_state_reason
      continue

    final_state = EventState.WARNING
    final_state_reason = "gspeed_state: {}, vspeed_state: {}".format(gspeed_state, vspeed_state)
    has_warning_been_issued = True
    yield final_state, final_state_reason




In [65]:
origin = "OMDW"
destination = "OLBA"
flight_id = "3d735e94"

# Bounding box filter
BBOX_NORTH = 34.597042
BBOX_SOUTH = 28.536275
BBOX_WEST = 32.299805
BBOX_EAST = 37.397461

flight_iterable = [
    FlightDataPoint(*row) for row in fetch_normal_track(postgres_conn, flight_id)
    if BBOX_SOUTH <= row[2] <= BBOX_NORTH and BBOX_WEST <= row[3] <= BBOX_EAST
]
to_tqdm = True
states, reasons = [], []
warnings = []
for i, (state, reason) in enumerate(monitor_flight(postgres_conn,
                   sqlite_conn,
                   origin,
                   destination,
                   flight_iterable,
                   id_to_delete=None,
                   to_tqdm=to_tqdm,
                   minimum_flight_duration_for_warning = 5.0,
                   minimum_initial_tracks_for_immediate_warning=10)):
  states.append(state)
  reasons.append(reason)
  
  if state == EventState.WARNING:
    warnings.append(flight_iterable[i])

All rows deleted. Table structure and indexes preserved.


31it [00:00, 6959.82it/s]

31





TypeError: unsupported operand type(s) for -: 'float' and 'NoneType'

In [None]:
first_warning_point

FlightDataPoint(flight_id='3af2006c', timestamp=1750750291.0, latitude=25.78976, longitude=52.83544, altitude=30700.0, gspeed=896.0, vspeed=496.0)

In [None]:
origin = "HECA"
destination = "OJAI"
flight_id = "3cdd7a5f"

# Bounding box filter
BBOX_NORTH = 34.597042
BBOX_SOUTH = 28.536275
BBOX_WEST = 32.299805
BBOX_EAST = 37.397461

flight_iterable = [
    FlightDataPoint(*row) for row in fetch_feedback_track(postgres_conn, flight_id)
    if BBOX_SOUTH <= row[2] <= BBOX_NORTH and BBOX_WEST <= row[3] <= BBOX_EAST
]
to_tqdm = True
states, reasons = [], []
first_warning_point = None
for i, (state, reason) in enumerate(monitor_flight(postgres_conn,
                   sqlite_conn,
                   origin,
                   destination,
                   flight_iterable,
                   id_to_delete=None,
                   to_tqdm=to_tqdm,
                   minimum_flight_duration_for_warning = 5.0,
                   minimum_initial_tracks_for_immediate_warning=10)):
  states.append(state)
  reasons.append(reason)
  
  if state == EventState.WARNING and first_warning_point is None:
    first_warning_point = flight_iterable[i]
    first_warning_index = i



All rows deleted. Table structure and indexes preserved.


368it [00:14, 24.95it/s] 


In [None]:
[emo for emo in reasons if "WARNING" in emo]

[]