In [1]:
import json
import asyncio

from aiohttp import ClientSession, BasicAuth
from python_opensky import OpenSky, BoundingBox
from pyensign.ensign import authenticate, publisher

# Configuration

In [8]:
OPENSKY_CREDS_PATH="opensky-creds.json"
ENSIGN_CREDS_PATH="flight-tracker-creds.json"

def load_opensky_creds():
    with open(OPENSKY_CREDS_PATH) as f:
        creds = json.load(f)
        return BasicAuth(login=creds["username"], password=creds["password"])

OPENSKY_CREDS = load_opensky_creds()

In [3]:
BOUNDING_BOX = BoundingBox(
    min_latitude=43,
    max_latitude=49,
    min_longitude=-97,
    max_longitude=-89,
)

INTERVAL_SEC=5

# Publish Flight Updates

In [10]:
@publisher("flight-positions")
async def get_flights():
    async with ClientSession() as session:
        async with OpenSky(session=session) as opensky:
            await opensky.authenticate(OPENSKY_CREDS)
            
            response = await opensky.get_states(bounding_box=BOUNDING_BOX)
            for vector in response.states:
                yield {
                    "icao24": vector.icao24,
                    "callsign": vector.callsign,
                    "origin_country": vector.origin_country,
                    "time_position": vector.time_position,
                    "last_contact": vector.last_contact,
                    "longitude": vector.longitude,
                    "latitude": vector.latitude,
                    "geo_altitude": vector.geo_altitude,
                    "on_ground": vector.on_ground,
                    "velocity": vector.velocity,
                    "true_track": vector.true_track,
                    "vertical_rate": vector.vertical_rate,
                    "sensors": vector.sensors,
                    "barometric_altitude": vector.barometric_altitude,
                    "transponder_code": vector.transponder_code,
                    "special_purpose_indicator": vector.special_purpose_indicator,
                    "position_source": vector.position_source,
                    "category": vector.category,
                }
                
@authenticate(cred_path=ENSIGN_CREDS_PATH)
async def poll_flights():
    for i in range(10):
        async for data in get_flights():
            print(data)
        await asyncio.sleep(INTERVAL_SEC)
            
    # Wait for pending events to get published
    await asyncio.sleep(1)

In [5]:
await poll_flights()

{'icao24': 'acdfab', 'callsign': 'DAL1591 ', 'origin_country': 'United States', 'time_position': 1696271978, 'last_contact': 1696271978, 'longitude': -92.8017, 'latitude': 44.8641, 'geo_altitude': 4930.14, 'on_ground': False, 'velocity': 155.47, 'true_track': 264.87, 'vertical_rate': -7.15, 'sensors': None, 'barometric_altitude': 4693.92, 'transponder_code': '3535', 'special_purpose_indicator': False, 'position_source': <PositionSource.ADSB: 0>, 'category': <AircraftCategory.NO_ADS_INFORMATION: 1>}
{'icao24': 'a2355d', 'callsign': 'SKW3851 ', 'origin_country': 'United States', 'time_position': 1696271978, 'last_contact': 1696271978, 'longitude': -96.7387, 'latitude': 44.5379, 'geo_altitude': 11125.2, 'on_ground': False, 'velocity': 252.83, 'true_track': 85.57, 'vertical_rate': 0.65, 'sensors': None, 'barometric_altitude': 10668.0, 'transponder_code': '3345', 'special_purpose_indicator': False, 'position_source': <PositionSource.ADSB: 0>, 'category': <AircraftCategory.LARGE: 4>}
{'icao2