In [1]:
%load_ext autoreload
%autoreload 2

import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv("../../azimuth.env")

# Add PATH_ROOT to Python path
import sys
sys.path.append(os.getenv("PATH_ROOT"))

print(f'Project path: {os.getenv("PATH_ROOT")}')

import numpy as np
from VERICONF import conflict_checker
from MARTINI.airspace.conflict_cartesian import check_conflicts
from MARTINI.definitions.trajectory import Trajectory as Trajectory_py


Project path: /Users/thinhhoang/Documents/alpha-azimuth/


In [2]:
def generate_random_trajectories(n_trajectories):
    # Define realistic bounds for the parameters
    LAT_BOUNDS = (30.0, 45.0)  # Continental US approximate bounds
    LON_BOUNDS = (-125.0, -70.0)
    ALT_BOUNDS = (20000, 40000)  # Typical cruise altitudes in feet
    SPEED_BOUNDS = (350 * 1.852 / 3600, 500 * 1.852 / 3600)    # Typical aircraft speeds in km/s
    WAYPOINTS_PER_TRAJ = 3       # Number of waypoints per trajectory

    trajectories = []
    
    for _ in range(n_trajectories):
        # Generate random waypoints with a general direction (west to east)
        start_lon = np.random.uniform(LON_BOUNDS[0], LON_BOUNDS[0] + 20)
        start_lat = np.random.uniform(*LAT_BOUNDS)
        
        waypoints = []
        for i in range(WAYPOINTS_PER_TRAJ):
            lon = start_lon + i * 5 + np.random.uniform(-2, 2)  # General west-to-east movement
            lat = start_lat + np.random.uniform(-2, 2)
            waypoints.append((lat, lon))
        
        # Generate ascending altitudes
        # start_alt = np.random.uniform(*ALT_BOUNDS)
        # altitudes = [
        #     start_alt + i * np.random.uniform(0, 1000)
        #     for i in range(WAYPOINTS_PER_TRAJ)
        # ]
        altitudes = [29000] * WAYPOINTS_PER_TRAJ
        
        # Generate reasonable speeds
        start_speed = np.random.uniform(*SPEED_BOUNDS)
        speeds = [
            start_speed + i * np.random.uniform(-20 * 1.852 / 3600, 20 * 1.852 / 3600)
            for i in range(WAYPOINTS_PER_TRAJ)
        ]
        
        # Random wake turbulence category (1-4)
        cat = np.random.randint(1, 5)
        
        traj = Trajectory_py(waypoints, altitudes, speeds, cat)
        trajectories.append(traj)
    
    return trajectories

# VERICONF Testing

In [8]:
# Generate 1000 trajectories
trajectories = generate_random_trajectories(1000)

# Convert to C++ Trajectory structs
cpp_trajectories = []
for traj in trajectories:
    cpp_traj = conflict_checker.Trajectory(
        traj.waypoints.tolist(),
        traj.altitudes.tolist(),
        traj.speeds.tolist(),
        str(traj.cat),
        0.0
    )
    cpp_trajectories.append(cpp_traj)

print(f'There are {len(cpp_trajectories)} trajectories')
# Check for conflicts
import time

start_time = time.time()
conflicts = conflict_checker.check_conflicts(cpp_trajectories)
end_time = time.time()

print(f"(VERICONF) Conflict checking time: {(end_time - start_time) * 1000:.4f} ms")
print(f'There are {len(conflicts)} conflicts')

There are 1000 trajectories
(VERICONF) Conflict checking time: 329.4830 ms
There are 1092 conflicts


In [5]:
from IPython.display import display, HTML

# Create HTML table for conflicts
html = "<table><tr><th>Trajectory 1</th><th>Trajectory 2</th><th>Segment 1</th><th>Segment 2</th><th>Time (s)</th></tr>"
for conflict in conflicts[:10]:
    traj1, traj2, seg1, seg2, time = conflict
    html += f"<tr><td>{traj1}</td><td>{traj2}</td><td>{seg1}</td><td>{seg2}</td><td>{time:.2f}</td></tr>"
html += "</table>"

# Add some CSS styling
html = f"""
<style>
table {{border-collapse: collapse; width: 100%;}}
th, td {{padding: 8px; text-align: left; border-bottom: 1px solid #ddd;}}
th {{background-color: #f2f2f2;}}
tr:hover {{background-color: #f5f5f5;}}
</style>
{html}
"""

display(HTML(html))

Trajectory 1,Trajectory 2,Segment 1,Segment 2,Time (s)
0,101,0,0,175.45
0,349,1,1,2560.59
1,617,0,1,2288.19
2,459,0,0,1262.48
2,557,0,0,1548.6
3,44,1,1,4252.98
3,166,0,0,421.82
3,326,0,0,79.82
3,405,1,1,4481.88
5,48,0,0,861.63


# PyVeriConf Testing

In [9]:

import time

start_time = time.time()
conflicts = check_conflicts(trajectories)
end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time for check_conflicts: {execution_time:.4f} seconds")
print(f'There are {len(conflicts)} conflicts')

Execution time for check_conflicts: 15.9944 seconds
There are 1092 conflicts


In [7]:
# Display first 10 conflicts
for conflict in conflicts[:10]:
    traj1_idx, traj2_idx, seg1_idx, seg2_idx, t_lb = conflict
    print(f"Conflict between Trajectory {traj1_idx} Segment {seg1_idx} and Trajectory {traj2_idx} Segment {seg2_idx} at t={t_lb:.2f}s")


Conflict between Trajectory 0 Segment 0 and Trajectory 101 Segment 0 at t=175.45s
Conflict between Trajectory 0 Segment 1 and Trajectory 349 Segment 1 at t=2560.59s
Conflict between Trajectory 1 Segment 0 and Trajectory 617 Segment 1 at t=2288.19s
Conflict between Trajectory 2 Segment 0 and Trajectory 459 Segment 0 at t=1262.48s
Conflict between Trajectory 2 Segment 0 and Trajectory 557 Segment 0 at t=1548.60s
Conflict between Trajectory 3 Segment 1 and Trajectory 44 Segment 1 at t=4252.98s
Conflict between Trajectory 3 Segment 0 and Trajectory 166 Segment 0 at t=421.82s
Conflict between Trajectory 3 Segment 0 and Trajectory 326 Segment 0 at t=79.82s
Conflict between Trajectory 3 Segment 1 and Trajectory 405 Segment 1 at t=4481.88s
Conflict between Trajectory 5 Segment 0 and Trajectory 48 Segment 0 at t=861.63s


# Simplified Testing

In [None]:
traj_perpendicular_wp = generate_random_trajectories(1)[0]
# Replace the second waypoint of traj_perpendicular_wp with the second waypoint of traj_sample[0]
traj_perpendicular_wp.waypoints[1] = trajectories[11].waypoints[1]

In [None]:
# Recompute the passing times
traj_perpendicular_wp.compute_times(-904)

# Conflict Inspection

In [None]:
# Pick the index 4 and 18
traj_sample = trajectories[0], trajectories[714]
print(traj_sample[0])
print('*')
print(traj_sample[1])


In [None]:
conflicts_sample = check_conflicts(traj_sample)
print(f'There are {len(conflicts_sample)} conflicts')

# Display the conflicts
for conflict in conflicts_sample:
    traj1_idx, traj2_idx, seg1_idx, seg2_idx, t_lb = conflict
    print(f"Conflict between Trajectory {traj1_idx} Segment {seg1_idx} and Trajectory {traj2_idx} Segment {seg2_idx} at t={t_lb:.2f}s")


In [None]:
from matplotlib import pyplot as plt
import cartopy.crs as ccrs

# Get the bounds for the plot from the trajectories
min_lon = min([traj.waypoints[:, 1].min() for traj in trajectories[:50]])
max_lon = max([traj.waypoints[:, 1].max() for traj in trajectories[:50]])
min_lat = min([traj.waypoints[:, 0].min() for traj in trajectories[:50]])
max_lat = max([traj.waypoints[:, 0].max() for traj in trajectories[:50]])

fig, ax = plt.subplots(figsize=(12, 8),subplot_kw={'projection': ccrs.PlateCarree()})
ax.coastlines()
import cartopy.feature as cfeature
ax.add_feature(cfeature.LAND, edgecolor='black', alpha=0.3)


traj_sample[0].plot(ax=ax, show_annotations=True)
# trajectories[2].plot(ax=ax, show_annotations=False)
traj_sample[1].plot(ax=ax, show_annotations=True)
# trajectories[13].plot(ax=ax, show_annotations=False)

# Set map bounds with some padding
padding = 2  # degrees
ax.set_extent([
    min_lon - padding,
    max_lon + padding,
    min_lat - padding,
    max_lat + padding
])

# Add gridlines
gl = ax.gridlines(draw_labels=True, linewidth=1, color='gray', alpha=0.5, linestyle='--')
gl.top_labels = False  # Don't show labels at top
gl.right_labels = False  # Don't show labels at right




plt.show()

In [21]:
def distance_between_segments(traj1, traj2):
    t_min = min(traj1.passing_time.min(), traj2.passing_time.min())
    t_max = max(traj1.passing_time.max(), traj2.passing_time.max())
    
    t_span = np.arange(t_min, t_max, 1)
    distances = []
    
    for t in t_span:
        # 1. Find segments for each trajectory at time t
        try:
            seg1 = np.where(traj1.passing_time > t)[0][0] - 1
        except IndexError:
            seg1 = len(traj1.passing_time) - 2
        try:
            seg2 = np.where(traj2.passing_time > t)[0][0] - 1
        except IndexError:
            seg2 = len(traj2.passing_time) - 2
        
        # 2. Interpolate positions
        # For trajectory 1
        t1_start = traj1.passing_time[seg1]
        t1_end = traj1.passing_time[seg1 + 1]
        alpha1 = (t - t1_start) / (t1_end - t1_start)
        pos1 = (1 - alpha1) * traj1.waypoints_xyz[seg1] + alpha1 * traj1.waypoints_xyz[seg1 + 1]
        
        # For trajectory 2
        t2_start = traj2.passing_time[seg2]
        t2_end = traj2.passing_time[seg2 + 1]
        alpha2 = (t - t2_start) / (t2_end - t2_start)
        pos2 = (1 - alpha2) * traj2.waypoints_xyz[seg2] + alpha2 * traj2.waypoints_xyz[seg2 + 1]
        
        # 3. Calculate distance
        distance = np.linalg.norm(pos1 - pos2)
        distances.append(distance)
    
    # 4. Plot the result
    plt.figure(figsize=(10, 6))
    plt.plot(t_span, distances)
    plt.grid(True)
    plt.xlabel('Time (s)')
    plt.ylabel('Distance (km)')
    plt.title('Distance Between Aircraft Over Time')
    plt.show()
    
    return t_span, distances

In [None]:
t_span, distances = distance_between_segments(traj_sample[0], traj_sample[1])