In [29]:
import datetime
import logging
from typing import Optional, Tuple

def date_string_to_day_range_epoch(date_string: str, date_format: str = "%Y-%m-%d") -> Optional[Tuple[int, int, str, str, str]]:
    """
    Takes a date string, calculates the start, midday, and end of that day in UTC,
    and returns both as epoch timestamps AND formatted strings for APIs.
    
    The function performs the following:
    1. Parses the date_string to find midnight (00:00:00) UTC for that day (Start).
    2. Calculates midday (12:00:00) UTC for that day (Midday String).
    3. Calculates midnight (23:59:59) UTC for that day (End Exclusive Epoch).
    4. Calculates 23:59 for the current day (End Inclusive String).

    Args:
        date_string: The input date string (e.g., "2025-11-02").
        date_format: The format of the input string (default: YYYY-MM-DD).

    Returns:
        A tuple: (
            start_epoch, 
            end_epoch_exclusive, 
            start_str,
            midday_str, 
            end_str_inclusive
        ) 
        as integers and strings, or None if conversion fails.
    """
    if not date_string:
        logging.error("Input date string is empty or None.")
        raise ValueError("A date string must be provided for processing.")
        
    try:
        # Parse the string into a datetime object, setting to midnight UTC
        dt_start_of_day = datetime.datetime.strptime(date_string, date_format).replace(
            hour=0, minute=0, second=0, microsecond=0, tzinfo=datetime.timezone.utc
        )

        # Calculate the exclusive end of day (midnight of the next day)
        dt_end_of_day_exclusive_boundary = dt_start_of_day + datetime.timedelta(days=1)
        
        # Calculate the inclusive end of day (23:59 of the current day)
        dt_end_of_day_inclusive = dt_end_of_day_exclusive_boundary - datetime.timedelta(minutes=1)
        
        # 4. Calculate midday (12:00:00 of the current day)
        dt_midday = dt_start_of_day + datetime.timedelta(hours=12)

        # Convert both boundaries to epoch
        start_epoch = int(dt_start_of_day.timestamp())
        end_epoch_exclusive = int(dt_end_of_day_exclusive_boundary.timestamp()) - 1
        
        # Produce required strings (e.g., "2025-11-02T00:00")
        API_TIME_FORMAT = "%Y-%m-%dT%H:%M"
        start_str = dt_start_of_day.strftime(API_TIME_FORMAT)
        midday_str = dt_midday.strftime(API_TIME_FORMAT)
        end_str_inclusive = dt_end_of_day_inclusive.strftime(API_TIME_FORMAT)

        logging.debug(
            f"Date string '{date_string}' converted to range: Epoch=[{start_epoch}, {end_epoch_exclusive}); Strings=[{start_str}, {end_str_inclusive}]"
        )
        return start_epoch, end_epoch_exclusive, start_str, midday_str, end_str_inclusive

    except ValueError as e:
        logging.error(f"Date conversion failed for string '{date_string}' with format '{date_format}': {e}")
        return ValueError
    except Exception as e:
        logging.error(f"An unexpected error occurred during day range conversion: {e}")
        return Exception

In [None]:
start_epoch, end_epoch, start_str, end_str = date_string_to_day_range_epoch("2025-01-02")
print(start_epoch, end_epoch, start_str, end_str)

Accessing OpenSky Network token for sending API requests

In [30]:
import requests
import json
import logging
from datetime import datetime

credentials_file_path = "credentials/opensky_credentials.json"

try:
    with open(credentials_file_path, 'r', encoding='utf-8') as f:
        credentials = json.load(f)
        logging.info("Successfully loaded credentials attributes.")
except FileNotFoundError:
    logging.error(f"Error: The file '{credentials_file_path}' was not found. Please check the path.")
except json.JSONDecodeError:
    logging.error(f"Error: The file '{credentials_file_path}' is not valid JSON.")
except Exception as e:
    logging.error(f"An unexpected error occurred: {e}")

# 1. Your OpenSky API Client Credentials
CLIENT_ID = credentials['clientId']
CLIENT_SECRET = credentials['clientSecret']

AUTH_URL = "https://auth.opensky-network.org/auth/realms/opensky-network/protocol/openid-connect/token"

def get_access_token(client_id, client_secret):
    """Requests a new access token from the OpenSky auth server."""
    logging.info("Requesting new Access Token...")
    
    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }
    
    data = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret
    }
    
    try:
        response = requests.post(AUTH_URL, headers=headers, data=data)
        response.raise_for_status()  
        token_data = response.json()
        
        # The token is valid for 'expires_in' seconds (usually 1800 seconds or 30 minutes)
        access_token = token_data.get("access_token")
        #expires_in = token_data.get("expires_in", 1800)
        
        logging.info(f"Successfully retrieved Access Token.")
        return access_token
        
    except requests.exceptions.RequestException as e:
        logging.error(f"Error requesting token: {e}")
        raise Exception

Function to send request for real-time arrivals and departures

In [31]:
def make_OpenSky_request(API_BASE_URL, endpoint, airport_icao, date, token):
    """Makes an API request using the Bearer Token."""
    if not token:
        logging.error("Error: No valid token available.")
        raise "notValidTokenError"
        
    url = f"{API_BASE_URL}{endpoint}"
    logging.info(f"\nMaking API request to {url}...")
    
    #begin_ts, end_ts, _, _, _ = date_string_to_day_range_epoch(date)
    
    params = {
        "airport": airport_icao,
        "begin": 1735794325,
        "end": 1735801525
    }
    
    logging.info(f"params: {params}")
    
    headers = {
        "Authorization": f"Bearer {token}"
    }
    
    try:
        response = requests.get(url, params=params, headers=headers)
        response.raise_for_status() 
        return response
        
    except requests.exceptions.RequestException as e:
        logging.error(f"Error making API request: {e}")
        raise e

Function to request for arrivals, departure and individual aircrafts' routes

In [None]:
def make_OpenSky_request(API_BASE_URL, endpoint, param, airport_or_icao24_value, date, token):
    """Makes an API request using the Bearer Token."""
    if not token:
        logging.error("Error: No valid token available.")
        raise "notValidTokenError"
        
    url = f"{API_BASE_URL}{endpoint}"
    logging.info(f"\nMaking API request to {url}...")
    
    begin_ts, end_ts, _, _, _ = date_string_to_day_range_epoch(date)
    
    # As this function works for two REST API endpoints, depending on the param provided the params were selected  
    if param == "airport":
        params = {
            "airport": airport_or_icao24_value,
            "begin": begin_ts,
            "end": end_ts
        }
    else:
        params = {
            "icao24": airport_or_icao24_value,
            "begin": begin_ts,
            "end": end_ts
        }
    
    logging.info(f"Param:'{param}' is selected, therefore params: {params}")
    
    headers = {
        "Authorization": f"Bearer {token}"
    }
    
    try:
        response = requests.get(url, params=params, headers=headers)
        response.raise_for_status() 
        return response
        
    except requests.exceptions.RequestException as e:
        logging.error(f"Error making API request: {e}")
        raise e

Read airport icao from airports table and apply for loop in the airflow dag.

In [None]:

token = get_access_token(CLIENT_ID, CLIENT_SECRET)

API_BASE_URL = "https://opensky-network.org/api"

columns = [
    'icao24', 'firstSeen', 'estDepartureAirport', 'lastSeen',
    'estArrivalAirport', 'callsign',
    'estDepartureAirportHorizDistance', 'estDepartureAirportVertDistance',
    'estArrivalAirportHorizDistance', 'estArrivalAirportVertDistance',
    'departureAirportCandidatesCount', 'arrivalAirportCandidatesCount'
]

airports_icao = ['EDDN']

all_records = []

for icao in airports_icao:
    
    MAX_RETRIES=2
    retry = 0
    
    while retry < MAX_RETRIES:
        
        response = make_OpenSky_request(API_BASE_URL, "/flights/arrival", icao, "2025-01-02", token)

        if response.status_code == 200:
            data = response.json()
            logging.info(f"Successfully retrieved Aircraft vector records.")
            ingestion_timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
            columns = columns + ['airport_icao', 'ingestion_timestamp'] # icao and timestamp not in the response
            
            records = [tuple(item.get(col) for col in columns[0:-2]) + (icao, ingestion_timestamp) for item in data]
            
            all_records.extend(records)
            
            retry = 2

        elif response.status_code == 401:
            logging.warn("Token might have expired. Sending request to get new token...")
            # Retrieve the token
            token = get_access_token(CLIENT_ID, CLIENT_SECRET)
            
            retry += 1
            
            if retry == 2:
                logging.error(f"Already accessed token twice for this icao: {icao} request.")
                raise Exception
            
        else:
            logging.error(f"Error while retrieving the data. Status Code: {response.status_code}")
            raise Exception
        
print(all_records)

Extracting Arrivals and Departures from AeroDataBox API

In [None]:
import requests
import urllib.parse
import json
import logging
import datetime

aerodatabox_api_key_file_path = "credentials/aerodatabox_api_key.json"

try:
    with open(aerodatabox_api_key_file_path, 'r', encoding='utf-8') as f:
        credentials = json.load(f)
        logging.info("Successfully loaded credentials attributes.")
except FileNotFoundError:
    logging.error(f"Error: The file '{aerodatabox_api_key_file_path}' was not found. Please check the path.")
except json.JSONDecodeError:
    logging.error(f"Error: The file '{aerodatabox_api_key_file_path}' is not valid JSON.")
except Exception as e:
    logging.error(f"An unexpected error occurred: {e}")


delays_base_url = "https://prod.api.market/api/v1/aedbx/aerodatabox"
codetype = "icao"
code = "KJFK"
endpoint = f"flights/airports/{codetype}/{code}"
TIME_FROM = "2025-11-02T00:00"
TIME_TO = "2025-11-02T00:15"

# 1. URL-encode the time strings
encoded_from = urllib.parse.quote(TIME_FROM)
encoded_to = urllib.parse.quote(TIME_TO)

headers = {
    "accept": "application/json",
    "x-api-market-key": credentials['key'],
}

params = {
    "withLeg" : True
}

# Combine the base URL and the endpoint for the final request URL
full_url = f"{delays_base_url}/{endpoint}/{encoded_from}/{encoded_to}"

try:
    # Use the full_url for the request
    response = requests.get(full_url, params=params, headers=headers)
    
    # Check for HTTP errors before trying to parse JSON
    response.raise_for_status() 
    
    print(response.json())
    
except requests.exceptions.HTTPError as errh:
    logging.error(f"Http Error: {errh}")
except requests.exceptions.ConnectionError as errc:
    logging.error(f"Error Connecting: {errc}")
except requests.exceptions.Timeout as errt:
    logging.error(f"Timeout Error: {errt}")
except requests.exceptions.RequestException as e:
    # This catches the original exception and others not caught above
    logging.error(f"An unexpected API request error occurred: {e}")
except Exception as e:
    # Catches non-request errors, like JSON decoding failure
    logging.error(f"An unexpected error occurred: {e}")

In [4]:
data = response.json()

In [7]:
departures = data['departures']
arrivals = data['arrivals']

The below one process can be used to flatten the json and assign the keys. And also another method is to use schema mapping config using yaml.

In [13]:
def flatten_json(nested_json, parent_key='', sep='_'):
    items = []
    for k, v in nested_json.items():
        new_key = f"{parent_key}{sep}{k}" if parent_key else k
        if isinstance(v, dict):
            items.extend(flatten_json(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)

In [14]:
flattened_departures = [flatten_json(rec) for rec in departures]

# Collect all keys from all records
all_keys = sorted({key for rec in flattened_departures for key in rec.keys()})

# Align records so that missing keys are filled with None
aligned_records = [
    tuple(rec.get(k) for k in all_keys)
    for rec in flattened_departures
]

print(aligned_records)

At present we follow the manual process which give us more grip on attributes

In [16]:
def get_value(data, path, default=None):
    """Safely get a nested value from a dict using dot notation."""
    keys = path.split('.')
    for key in keys:
        if isinstance(data, dict):
            data = data.get(key, default)
        else:
            return default
    return data

In [None]:
from datetime import datetime
departure_records=[]
for departure in departures:
    record = {
    "flight_number": get_value(departure, "number"),
    "flight_date": "2025-05-02",  # can be dynamic later
    "callsign": get_value(departure, "callSign"),
    "status": get_value(departure, "codeshareStatus"),
    "iscargo": get_value(departure, "isCargo"),
    "aircraft_reg": get_value(departure, "aircraft.reg"),
    "aircraft_modeS": get_value(departure, "aircraft.modeS"),
    "aircraft_model": get_value(departure, "aircraft.model"),
    "airline_name": get_value(departure, "airline.name"),
    "airline_iata": get_value(departure, "airline.iata"),
    "airline_icao": get_value(departure, "airline.icao"),
    "airport_icao": "icao",  # dynamic later
    "departure_scheduledtime_utc": get_value(departure, "departure.scheduledTime.utc"),
    "departure_scheduledtime_local": get_value(departure, "departure.scheduledTime.local"),
    "departure_revisedtime_utc": get_value(departure, "departure.revisedTime.utc"),
    "departure_revisedtime_local": get_value(departure, "departure.revisedTime.local"),
    "departure_runwaytime_utc": get_value(departure, "departure.runwayTime.utc"),
    "departure_runwaytime_local": get_value(departure, "departure.runwayTime.local"),
    "departure_terminal": get_value(departure, "departure.terminal"),
    "departure_quality": get_value(departure, "departure.quality"),
    "arrival_airport_icao": get_value(departure, "arrival.airport.icao"),
    "arrival_airport_iata": get_value(departure, "arrival.airport.iata"),
    "arrival_airport_name": get_value(departure, "arrival.airport.name"),
    "arrival_airport_timezone": get_value(departure, "arrival.airport.timeZone"),  # note: correct key name
    "arrival_scheduledtime_utc": get_value(departure, "arrival.scheduledTime.utc"),
    "arrival_scheduledtime_local": get_value(departure, "arrival.scheduledTime.local"),
    "arrival_revisedtime_utc": get_value(departure, "arrival.revisedTime.utc"),
    "arrival_revisedtime_local": get_value(departure, "arrival.revisedTime.local"),
    "arrival_runwaytime_utc": get_value(departure, "arrival.runwayTime.utc"),
    "arrival_runwaytime_local": get_value(departure, "arrival.runwayTime.local"),
    "arrival_terminal": get_value(departure, "arrival.terminal"),
    "arrival_gate": get_value(departure, "arrival.gate"),
    "arrival_baggagebelt": get_value(departure, "arrival.baggageBelt"),
    "arrival_quality": get_value(departure, "arrival.quality"),
    "ingestion_timestamp": datetime.utcnow().isoformat(),
    "data_source": "AeroDataBox"
    }
    
    departure_records.append(record)

columns = list(record.keys())

In [26]:
all_departures = [tuple(item.get(col) for col in columns) for item in departure_records]