In [1]:
import os
from datetime import datetime, timedelta
import pandas as pd
from pyopensky.schema import (
    IdentificationData4,
    PositionData4,
    RollcallRepliesData4,
    VelocityData4,
)
from traffic.core import Traffic
# from traffic.data.adsb import OpenSky
from traffic.data import opensky

  from .autonotebook import tqdm as notebook_tqdm


# Download trajectories

In [3]:
import os
import re
from datetime import datetime, timedelta

filenames = os.listdir("./data_EPWA")
times = []

for filename in filenames:
    match = re.search(r'([A-Za-z]+)-(\d{1,2})-(\d{4})-(\d{4})Z', filename)
    if match:
        month_str, day, year, time_str = match.groups()
        dt = datetime.strptime(f"{day} {month_str} {year} {time_str}", "%d %b %Y %H%M")
        times.append([dt, dt + timedelta(minutes=30)]) # A recording is 30 mins
        print(f"start:{dt}, stop:{dt + timedelta(minutes=30)}")
    else:
        print("No datetime found.")

start:2025-06-15 10:00:00, stop:2025-06-15 10:30:00


In [21]:
print(opensky.rawdata.__doc__)

Get raw message from the OpenSky Trino database.

You may pass requests based on time ranges, callsigns, aircraft, areas,
serial numbers for receivers, or airports of departure or arrival.

The method builds appropriate SQL requests, caches results and formats
data into a proper pandas DataFrame. Requests are split by hour (by
default) in case the connection fails.


:param start: a string (default to UTC), epoch or datetime (native
    Python or pandas)
:param stop: a string (default to UTC), epoch or datetime (native Python
    or pandas), *by default, one day after start*
:param date_delta: a timedelta representing how to split the requests,
    *by default: per hour*

More arguments to filter resulting data:

:param callsign: a string or a list of strings (wildcards
    accepted, _ for any character, % for any sequence of characters);
:param icao24: a string or a list of strings identifying the transponder
    code of the aircraft;
:param serials: an integer or a list of integers i

In [14]:
icao_EPWA = opensky.rawdata(
                times[0][0],
                times[0][1],
                arrival_airport="EPWA",
                Table=PositionData4,
            ).data.icao24.unique()

In [5]:
icao_EPWA

<ArrowExtensionArray>
['48af0d', '495278', '4bb4e7', '48ac80', '48ad83', '48ad10', '484fdf',
 '4d200d', '48ae20', '489788', '471fa1', '471f32', '4d24d6', '48ada1',
 '48ae80', '48ad80', '896382', '5082ef', '48ae82', '48ae01', '48ad0b',
 '48cb15', '48ae26', '48ad0d', '48ad00', '48af11', '4891b4', '06a0b1',
 '48ada8', '48af0f', '48af01', '896112', '4a8199', '45216a', '48c10e',
 '489789', '489786', '43ea72', '48978a', '48ac81', '48af12', '48ae85',
 '48ad86', '3c65d6']
Length: 44, dtype: string[pyarrow]

In [8]:
t_epwa = opensky.history(
    times[0][0],
    times[0][1],
    arrival_airport = "EPWA"
)

RUNNING: : 42.7% [00:13, 3.26%/s]
DOWNLOAD: 62.5klines [00:02, 22.9klines/s]


In [12]:
t_epwa.data.callsign.unique()

<ArrowExtensionArray>
['LOT6568', 'LOT6362',  'LOT2PA',  'LOT8WZ',  'LOT196',  'ABY858',   'LOT6E',
  'LOT2CH',  'LOT454',  'LOT266', 'LOT6380',  'LOT6KG',  'LOT3JA',  'LOT2EG',
 'WZZ8241',  'MGH871',  'LOT3YM',  'LOT318',  'LOT3EJ',  'LOT48F',  'LOT7RN',
    'LOT4',  'KLM75C', 'WZZ17JZ', 'UTN2226', 'LOT6648', 'ENT4862', 'TVP7707',
 'WZZ51AH',   'LOT5N',  'WZZ456', 'SEU7834',  'LOT282',  'QTR51V',  'SVF634',
  'UAE98X',  'JDI06X',  'LOT3AB',  'LOT6MJ',   'MOCNY',  'LOT3LN',  'LOT672',
 'LOT3908',  'DLH2RL']
Length: 44, dtype: string[pyarrow]

# Testing EPWA TMA

In [None]:
from traffic.data import eurofirs

eurofirs["EPWW"]

In [None]:
from traffic.data import opensky, airports
from geopy.distance import distance
from geopy.point import Point

center = Point(airports["EPWA"].latitude, airports["EPWA"].longitude)

# Half side of 50 NM box = 25 NM
half_side_nm = 50

# Move in each cardinal direction
north = distance(nautical=half_side_nm).destination(center, bearing=0)
south = distance(nautical=half_side_nm).destination(center, bearing=180)
east = distance(nautical=half_side_nm).destination(center, bearing=90)
west = distance(nautical=half_side_nm).destination(center, bearing=270)

# Rectangle bounds
lat_min = south.latitude
lat_max = north.latitude
lon_min = west.longitude
lon_max = east.longitude

print(f"Bounding Box (50 NM):\nLat: {lat_min:.4f} to {lat_max:.4f}\nLon: {lon_min:.4f} to {lon_max:.4f}")

In [None]:
test = opensky.history(
    start=datetime(2025, 6, 23, 10, 0),
    stop=datetime(2025, 6, 23, 10, 30),
    bounds = (19.6139, 51.3334, 22.3203, 52.9978), # (west, south, east, north)
    # bounds = eurofirs["EPWW"].bounds
)

In [None]:
test.map_leaflet()

In [None]:
test.data.callsign.unique()