# ATS Routing Graph Builder

In this version (0.2), we use
- ATS graph from ATS.txt (the traditional routing options)
- DCT edges from EUROCONTROL Appendix B3 - DCT
- FRA entry, intermediate and exit points from EUROCONTROL Appendix C3 - FRA LIM.

The DCT links usage is under conditions. We **do not consider** them in this version. Perhaps a future version will implement a more advanced search algorithm that takes into account these constraints.

In [1]:
%load_ext autoreload
%autoreload 2

from dotenv import load_dotenv
import os
import sys
load_dotenv()

PROJECT_ROOT = os.getenv('PROJECT_ROOT')

# Add PROJECT_ROOT to the sys.path
sys.path.append(PROJECT_ROOT)


In [2]:
import networkx as nx
from tqdm import tqdm
import os

def is_in_europe(lat, lon):
    """
    Checks if a given latitude and longitude fall within an approximate bounding
    box for the European continent.
    
    Europe is approximated as:
      Latitude: 30°N to 72°N
      Longitude: -25°E to 45°E
    """
    return 30 <= lat <= 72 and -25 <= lon <= 45

In [3]:
import random

def generate_random_two_digits():
    return str(random.randint(0, 99)).zfill(2)



In [4]:
def build_graph_from_ats(filename, bidirectional=False):
    """
    Builds a graph from an ATS file containing airway segments.
    
    The ATS file is expected to have header lines for each airway (starting with "A")
    and segment lines (starting with "S"). For example:
    
      A,A1,46
      S,KEC,33.447742,135.794494,ALBAT,33.364503,135.441514,0,262,18.37
      S,ALBAT,33.364503,135.441514,HALON,33.248769,134.997222,260,262,23.34
      ...
      
    For each "S" (segment) line, this function:
      - Parses the start and end fixes and their coordinates.
      - Filters out segments if either endpoint is not within Europe.
      - Adds fixes (nodes) to the graph.
      - Adds an edge for the segment with attributes for minimum altitude,
        maximum altitude, distance, and the airway name.
    
    Parameters:
      filename (str): Path to the ATS data file.
      bidirectional (bool): If True, add edges in both directions.
    
    Returns:
      networkx.DiGraph: The directed graph representing ATS fixes and segments.
    """
    # Create a directed graph; convert later to undirected if needed.
    G = nx.DiGraph()
    current_airway = "Unknown"  # Default airway name if header hasn't been seen

    with open(filename, 'r') as file:
        for line in file:
            line = line.strip()
            if not line or line.startswith("#"):
                continue  # Skip empty or comment lines

            parts = line.split(',')
            record_type = parts[0].strip().upper()

            # If the line is an airway header, update the current airway name.
            if record_type == "A":
                if len(parts) >= 2:
                    current_airway = parts[1].strip()
                    # Append two random digits to the airway name to make it unique
                    current_airway = current_airway
                continue


            # Process only segment lines.
            if record_type != "S":
                continue

            if len(parts) < 10:
                print(f"Skipping malformed line: {line}")
                continue

            # Unpack fields from the segment line.
            # Format: S, start_fix, start_lat, start_lon, end_fix, end_lat, end_lon, min_alt, max_alt, distance
            (_, start_fix, start_lat, start_lon, end_fix, end_lat, end_lon,
             min_alt, max_alt, distance) = parts

            # Convert coordinate and numeric fields.
            try:
                start_lat = float(start_lat)
                start_lon = float(start_lon)
                end_lat   = float(end_lat)
                end_lon   = float(end_lon)
                min_alt   = float(min_alt)
                max_alt   = float(max_alt)
                distance  = float(distance)
            except ValueError:
                print(f"Skipping line due to conversion error: {line}")
                continue

            # Filter: Only include segments with both endpoints in Europe.
            if not (is_in_europe(start_lat, start_lon) and is_in_europe(end_lat, end_lon)):
                # Uncomment the next line to see which segments are skipped.
                # print(f"Skipping non-European segment: {start_fix} -> {end_fix}")
                continue

            # Add the nodes (fixes) with their coordinates.
            if start_fix not in G:
                G.add_node(start_fix, lat=start_lat, lon=start_lon)
            else:
                # Start fix already exists, check if coordinates match
                if abs(G.nodes[start_fix]['lat'] - start_lat) > 1e-4 or abs(G.nodes[start_fix]['lon'] - start_lon) > 1e-4:
                    print(f"Start fix {start_fix} has different coordinates: {G.nodes[start_fix]['lat']}, {G.nodes[start_fix]['lon']} != {start_lat}, {start_lon}")
                    start_fix = start_fix + '_' + generate_random_two_digits()
                    G.add_node(start_fix, lat=start_lat, lon=start_lon)
            if end_fix not in G:
                G.add_node(end_fix, lat=end_lat, lon=end_lon)
            else:
                # End fix already exists, check if coordinates match
                if abs(G.nodes[end_fix]['lat'] - end_lat) > 1e-4 or abs(G.nodes[end_fix]['lon'] - end_lon) > 1e-4:
                    print(f"End fix {end_fix} has different coordinates: {G.nodes[end_fix]['lat']}, {G.nodes[end_fix]['lon']} != {end_lat}, {end_lon}")
                    end_fix = end_fix + '_' + generate_random_two_digits()
                    G.add_node(end_fix, lat=end_lat, lon=end_lon)

            # Add the edge (segment) with its attributes, including the airway name.
            edge_attrs = {
                'min_alt': min_alt,
                'max_alt': max_alt,
                'distance': distance,
                'airway': current_airway,
                'edge_type': 'airway'
            }
            G.add_edge(start_fix, end_fix, **edge_attrs)

            # Optionally, add the reverse edge if the segment is bidirectional.
            if bidirectional:
                G.add_edge(end_fix, start_fix, **edge_attrs)

    return G

In [5]:
ats_file = os.path.join(PROJECT_ROOT, "data", "airac", "ATS.txt")

# Set bidirectional=True if segments are used in both directions.
graph_ats = build_graph_from_ats(ats_file, bidirectional=False)

print(f"Number of fixes (nodes): {graph_ats.number_of_nodes()}")
print(f"Number of route segments (edges): {graph_ats.number_of_edges()}")

End fix BNA has different coordinates: 36.651297, 3.591522 != 32.124444, 20.253611
Start fix BNA has different coordinates: 36.651297, 3.591522 != 32.124444, 20.253611
End fix BNA has different coordinates: 36.651297, 3.591522 != 32.124444, 20.253611
Start fix BNA has different coordinates: 36.651297, 3.591522 != 32.124444, 20.253611
End fix BNA has different coordinates: 36.651297, 3.591522 != 32.124444, 20.253611
Start fix BNA has different coordinates: 36.651297, 3.591522 != 32.124444, 20.253611
End fix BNA has different coordinates: 36.651297, 3.591522 != 32.124444, 20.253611
Start fix BNA has different coordinates: 36.651297, 3.591522 != 32.124444, 20.253611
End fix BNA has different coordinates: 36.651297, 3.591522 != 32.124444, 20.253611
Start fix BNA has different coordinates: 36.651297, 3.591522 != 32.124444, 20.253611
End fix BOURI has different coordinates: 31.69, 18.716389 != 37.989167, 5.0
Start fix BOURI has different coordinates: 31.69, 18.716389 != 37.989167, 5.0
End fi

In [6]:
import pandas as pd
import os
rad_dct_df = pd.read_csv(os.path.join(PROJECT_ROOT, "data", "rad", "RAD_DCT.csv"))
rad_dct_df.columns = ["ChangeInd","ValidFrom","ValidUntil","ID","From","To","LowerVertLimit","UpperVertLimit","Available","Utilization","DCTTimeAvail","OpGoal","Remarks","CruisingLevelsDir","ATCUnit","ReleaseDate","SpecialEvent"]
rad_dct_df.head(1)

Unnamed: 0,ChangeInd,ValidFrom,ValidUntil,ID,From,To,LowerVertLimit,UpperVertLimit,Available,Utilization,DCTTimeAvail,OpGoal,Remarks,CruisingLevelsDir,ATCUnit,ReleaseDate,SpecialEvent
0,AMD,06 FEB 2025 [2501],UFN,EG5256,MIMVA,OTBED,105,660,Yes,"NOT AVBL FOR TFC\nDEP EHAAFIR EXC DEP (EHBK, E...",H24\n----------\nH24,To provide connectivity for traffic at Maastri...,,,EG**ACC,05 FEB 2025,


In [7]:
from utils.haversine import haversine_distance

def build_graph_from_rad_dct3(rad_dct_df, graph_ats):
    # Filter for available DCT routes
    available_dct = rad_dct_df[rad_dct_df['Available'] == 'Yes']
    
    # Initialize counter for successfully created edges
    edges_created = 0
    total_available = len(available_dct)
    
    # Create edges for each available DCT route
    for _, row in available_dct.iterrows():
            # Add edge if both nodes exist
            from_fix = row['From'].strip().upper()
            to_fix = row['To'].strip().upper()
            if from_fix in graph_ats.nodes and to_fix in graph_ats.nodes:
                # Get the lat and lon of the nodes
                from_lat = graph_ats.nodes[from_fix]['lat']
                from_lon = graph_ats.nodes[from_fix]['lon']
                to_lat = graph_ats.nodes[to_fix]['lat']
                to_lon = graph_ats.nodes[to_fix]['lon']
                # Compute the distance using haversine distance
                distance = haversine_distance(from_lat, from_lon, to_lat, to_lon)
                graph_ats.add_edge(from_fix, to_fix,
                                   min_alt=row['LowerVertLimit'],
                                   max_alt=row['UpperVertLimit'],
                                   distance=distance,
                                   airway='',
                                   edge_type='DCT')
                edges_created += 1
            else:
                if from_fix not in graph_ats.nodes:
                    print(f"From node {from_fix} not found in graph_ats. Probably outside Europe.")
                if to_fix not in graph_ats.nodes:
                    print(f"To node {to_fix} not found in graph_ats. Probably outside Europe.")
            
    print(f"Created {edges_created} DCT edges out of {total_available} available DCT routes")
    
    return graph_ats


In [8]:
graph_ats_dct3 = build_graph_from_rad_dct3(rad_dct_df, graph_ats)

To node ODVOD not found in graph_ats. Probably outside Europe.
From node ECHED not found in graph_ats. Probably outside Europe.
From node ECHED not found in graph_ats. Probably outside Europe.
To node ECHED not found in graph_ats. Probably outside Europe.
To node ECHED not found in graph_ats. Probably outside Europe.
From node ECHED not found in graph_ats. Probably outside Europe.
From node ECHED not found in graph_ats. Probably outside Europe.
To node ECHED not found in graph_ats. Probably outside Europe.
To node ECHED not found in graph_ats. Probably outside Europe.
From node ECSACUZCUX not found in graph_ats. Probably outside Europe.
From node GATMO not found in graph_ats. Probably outside Europe.
To node RASCA not found in graph_ats. Probably outside Europe.
From node ADKUV not found in graph_ats. Probably outside Europe.
To node ADKUV not found in graph_ats. Probably outside Europe.
To node BABEG not found in graph_ats. Probably outside Europe.
From node KIRDI not found in graph_a

In [9]:
fra_df = pd.read_csv(os.path.join(PROJECT_ROOT, "data", "rad", "FRA_PTS.csv"))
fra_df.columns = ["chg_rec","pt_type","fra_pt","fra_lat","fra_lon","fra_name","fra_rel_enroute","fra_rel_arr_dep","arr_apt","dep_apt","flos","lvl_avail","time_avail","loc_ind","rmk"]
fra_df.head(1)

Unnamed: 0,chg_rec,pt_type,fra_pt,fra_lat,fra_lon,fra_name,fra_rel_enroute,fra_rel_arr_dep,arr_apt,dep_apt,flos,lvl_avail,time_avail,loc_ind,rmk
0,,,ABADI,N404519,E0183830,BALTIC & FRAIT & SECSI & SEE FRA,I,-,,,-,FL195 / FL660,H24,LIBB,


In [10]:
# Count FRA points not in graph_ats_dct3
missing_fra_pts = sum(1 for pt in fra_df['fra_pt'].unique() if pt not in graph_ats_dct3)
print(f"Number of FRA points not in graph: {missing_fra_pts} out of {len(fra_df['fra_pt'].unique())}")
print(f"We will need to manually add the edges to these points later.")

Number of FRA points not in graph: 2088 out of 6744
We will need to manually add the edges to these points later.


In [None]:
def build_graph_from_fra_pts(fra_df):
    """
    Builds a graph from FRA points and adds them to the existing ATS graph.
    
    Parameters:
        fra_df (pandas.DataFrame): DataFrame containing FRA points data
        
    Returns:
        networkx.DiGraph: Updated graph with FRA points and connections
    """
    def convert_coord(coord_str):
        """Convert coordinates from format like 'N404519' or 'E0183830' to decimal degrees"""
        try:
            direction = coord_str[0]
            degrees = float(coord_str[1:-4])
            decimals = float(coord_str[-4:]) / 10000
            decimal = round(degrees + decimals, 4)
            return decimal if direction in ['N', 'E'] else -decimal
        except (ValueError, IndexError):
            return None
        
    graph_ats = nx.DiGraph()

    # Get unique FRA areas
    fra_areas = fra_df['fra_name'].unique()
    
    edges_added = 0
    nodes_added = 0
    
    for fra_area in fra_areas:
        # Get all points in this FRA area
        area_points = fra_df[fra_df['fra_name'] == fra_area]
        print(f'There are {len(area_points)} points in this FRA.')
        print(f'')
        
        # First add all nodes
        for _, point in area_points.iterrows():
            lat = convert_coord(str(point['fra_lat']))
            lon = convert_coord(str(point['fra_lon']))
            
            if lat is None or lon is None:
                print(f"Skipping point {point['fra_pt']} due to invalid coordinates")
                continue
                
            if not is_in_europe(lat, lon):
                continue
                
            if point['fra_pt'] not in graph_ats:
                graph_ats.add_node(point['fra_pt'], lat=lat, lon=lon)
                nodes_added += 1
        
        # Then create edges based on point types
        area_points = area_points[area_points['fra_rel_enroute'].notna()]
        
        for _, point1 in tqdm(area_points.iterrows(), total=len(area_points), desc=f"FRA {fra_area}"):
            type1 = point1['fra_rel_enroute']
            if type1 not in ['E', 'X', 'I', 'EX']:
                continue
                
            for _, point2 in area_points.iterrows():
                type2 = point2['fra_rel_enroute']
                if type2 not in ['E', 'X', 'I', 'EX']:
                    continue
                
                # Skip if same point
                if point1['fra_pt'] == point2['fra_pt']:
                    continue
                    
                # Connect according to rules:
                # E-X, E-I, X-I, I-I connections allowed
                # Skip E-E, X-X connections
                valid_connection = False
                if type1 == 'I' and type2 in['I', 'X']:
                    valid_connection = True
                elif type1 == 'E' and type2 in ['X', 'I']:
                    valid_connection = True
                elif type1 == 'EX' and type2 in ['I']:
                    valid_connection = True
                elif type1 == 'I' and type2 in ['EX']:
                    valid_connection = True
                
                if valid_connection:
                    # Calculate distance using haversine
                    if point1['fra_pt'] in graph_ats.nodes and point2['fra_pt'] in graph_ats.nodes:
                        lat1 = graph_ats.nodes[point1['fra_pt']]['lat']
                        lon1 = graph_ats.nodes[point1['fra_pt']]['lon']
                        lat2 = graph_ats.nodes[point2['fra_pt']]['lat']
                        lon2 = graph_ats.nodes[point2['fra_pt']]['lon']



                        if type1 == 'I' and type2 == 'I':
                            if abs(lat1 - lat2) > 3 or abs(lon1 - lon2) > 3:
                                continue # Skip FRA points that are more than 3 degrees apart
                            
                        
                        distance = haversine_distance(lat1, lon1, lat2, lon2)
                        if distance < 20 and type1 == 'I' and type2 == 'I':
                            continue # Skip I-I connections that are less than 20 nm apart
                        
                        # Add bidirectional edges
                        graph_ats.add_edge(point1['fra_pt'], point2['fra_pt'],
                                         distance=distance,
                                         min_alt=0,
                                         max_alt=0,
                                         airway='',
                                         edge_type='FRA')
                        edges_added += 1
    
    print(f"Added {nodes_added} FRA nodes and {edges_added} FRA edges")
    return graph_ats

graph_ats_fra = build_graph_from_fra_pts(fra_df, graph_ats_dct3)

In [None]:
print(f"Graph ATS FRA nodes: {graph_ats_fra.number_of_nodes()}")
print(f"Graph ATS FRA edges: {graph_ats_fra.number_of_edges()}")


In [None]:
def collapse_duplicate_nodes(graph):
    """
    Collapses nodes that are extremely close together (distance < 0.001 nm).
    When nodes are collapsed, one node is kept and the other is removed, with all edges
    redirected to the remaining node.
    
    Args:
        graph: NetworkX graph to process
        
    Returns:
        Modified graph with duplicate nodes collapsed
    """
    G = graph.copy()
    
    # Find pairs of nodes to collapse
    nodes_to_merge = []
    for u, v, data in G.edges(data=True):
        if data['distance'] < 1e-4:
            nodes_to_merge.append((u, v))

    nodes_removed = 0
    print(f'Found {len(nodes_to_merge)} duplicate nodes')
    
    # For each pair of nodes to merge
    for node1, node2 in tqdm(nodes_to_merge, total=len(nodes_to_merge)):
        if node1 not in G.nodes() or node2 not in G.nodes():
            continue # Skip if either node was already removed
            

        # Get all edges connected to node2
        edges_to_redirect = list(G.in_edges(node2, data=True)) + list(G.out_edges(node2, data=True))
        
        # Redirect all edges from/to node2 to instead connect with node1
        for u, v, data in edges_to_redirect:
            if u == node2:
                if v != node1:  # Avoid self loops
                    G.add_edge(node1, v, **data)
            elif v == node2:
                if u != node1:  # Avoid self loops
                    G.add_edge(u, node1, **data)
                    
        # Remove the duplicate node
        G.remove_node(node2)
        nodes_removed += 1

    print(f"Removed {nodes_removed} duplicate nodes")

    return G

collapsed_graph = collapse_duplicate_nodes(graph_ats_fra)


In [None]:
print(f"Merged graph nodes: {collapsed_graph.number_of_nodes()}")
print(f"Merged graph edges: {collapsed_graph.number_of_edges()}")

In [33]:
# Write the merged graph to a file
nx.write_graphml(collapsed_graph, os.path.join(PROJECT_ROOT, "data", "graphs", "route_dct_fra.graphml"))

In [None]:
def generate_airport_csv(filename):
    """
    Extracts airport information from Airport.txt and saves it to a CSV file.
    
    Airport line format:
    A,ICAO,NAME,LAT,LON,ELEV,LENGTH,UNUSED1,UNUSED2,UNUSED3
    
    Parameters:
        filename (str): Path to the Airport.txt file
    """
    import csv
    import os
    
    output_file = os.path.join(os.path.dirname(filename), 'airports.csv')
    
    # Define CSV headers
    headers = ['icao', 'name', 'latitude', 'longitude', 'elevation']
    
    with open(filename, 'r') as infile, open(output_file, 'w', newline='') as outfile:
        writer = csv.writer(outfile)
        writer.writerow(headers)
        
        for line in infile:
            line = line.strip()
            if not line or not line.startswith('A'):
                continue
                
            parts = line.split(',')
            if len(parts) < 5:
                continue
                
            # Extract relevant fields
            icao = parts[1]
            name = parts[2]
            try:
                lat = float(parts[3])
                lon = float(parts[4])
                elev = float(parts[5])
            except (ValueError, IndexError):
                print(f"Skipping malformed line: {line}")
                continue
                
            writer.writerow([icao, name, lat, lon, elev])
    
    print(f"Airport data saved to: {output_file}")

# Usage:
airport_file = os.path.join(PROJECT_ROOT, "data", "airac", "Airports.txt")
generate_airport_csv(airport_file)