In [25]:
import numpy as np
import pandas as pd

## Load Data

In [26]:
# Define the constants for the scenario
pilot = 'PILOT'
flight = 'FLIGHT'
lead_alt = 15000
wing_alt = 10000
CM_Airspeed = 150

In [27]:
# Load all data sources from a given flight
lead_data = pd.read_csv('Data/Lead/Lead_{}_{}.csv'.format(pilot, flight))
wing_data = pd.read_csv('Data/Wingman/Wing_{}_{}.csv'.format(pilot, flight))
cruise_data = pd.read_csv('Data/CruiseMissiles/CMs_{}_{}.csv'.format(pilot, flight))
workload_data = pd.read_csv('Data/Workload/Workload_{}_{}.csv'.format(pilot, flight))
# comm_data = pd.read_csv('Data/Comm/Comms_{}_{}.csv'.format(pilot, flight))
# surface_data = pd.read_csv('Data/SurfaceThreats/SurfaceThreats_{}_{}.csv'.format(pilot, flight))

## Define the Master DF

In [28]:
Altitude_Col = 'Altitude_Msl'
Airpseed_Col = 'True_Airspeed'
cols_L29s = ['Timestamp', 'SampleDate', 'SampleTime', 'TestCard', 'True_Heading', 'Roll',
        'Latitude', 'Longitude', 'Vertical_Speed', Altitude_Col, Airpseed_Col]
cols_CMs = ['Timestamp', 'SampleDate', 'SampleTime', 'TestCard', 'DisSource',
            'DisTime', 'Aplication', 'EntId', 'Latitude', 'Longitude', 'Heading']

In [29]:
# Make sure Timestamp is sorted and in datetime format
lead_data['Timestamp'] = pd.to_datetime(lead_data['Timestamp'])
wing_data['Timestamp'] = pd.to_datetime(wing_data['Timestamp'])
cruise_data['Timestamp'] = pd.to_datetime(cruise_data['Timestamp'])

lead_data = lead_data.sort_values('Timestamp')
wing_data = wing_data.sort_values('Timestamp')
cruise_data = cruise_data.sort_values('Timestamp')

# Merge wing onto lead (treat lead as "truth")
# This is necessary since the lead aircraft is the primary source of data and the timest stamps may not match perfectly.
sortie_df = pd.merge_asof(
    lead_data[cols_L29s].sort_values('Timestamp'),
    wing_data[cols_L29s].sort_values('Timestamp'),
    on='Timestamp',
    direction='nearest',
    suffixes=('_Lead', '_Wing'),
    tolerance=pd.Timedelta('50ms')  # adjust tolerance as needed
)

# Merge cruise missiles the same way
sortie_df = pd.merge_asof(
    sortie_df,
    cruise_data[cols_CMs].sort_values('Timestamp'),
    on='Timestamp',
    direction='nearest',
    suffixes=('', '_CM'),
    tolerance=pd.Timedelta('50ms')
)

In [30]:
# add Cruise Missile meta data to sortie_df
sortie_df['CM_Altitude_Lead'] = lead_alt
sortie_df['CM_Altitude_Wing'] = wing_alt
sortie_df['CM_Airspeed'] = CM_Airspeed

## Define Intercepts

In [31]:
def is_within_cone(df, role):
    """
    Determine if an aircraft (lead/wingman) is within intercept criteria:
      1) Bank angle within ±10°
      2) Within 1.5 nm aft of CM
      3) Within 30° trailing cone of CM velocity vector
    """

    # --- Column definitions ---
    roll_col = f'Roll_{role}'
    lat_col = f'Latitude_{role}'
    lon_col = f'Longitude_{role}'
    alt_col = f'Altitude_Msl_{role}'
    heading_col = f'True_Heading_{role}'

    cm_lat_col = 'Latitude'
    cm_lon_col = 'Longitude'
    cm_alt_col = f'CM_Altitude_{role}'
    cm_heading_col = 'Heading'   # you’ll need missile heading in your data

    # --- CONDITION 1: Bank angle ---
    cond1 = df[roll_col].abs() <= 10

    # --- Convert to radians ---
    lat_ac = np.radians(df[lat_col])
    lon_ac = np.radians(df[lon_col])
    lat_cm = np.radians(df[cm_lat_col])
    lon_cm = np.radians(df[cm_lon_col])

    # --- Approx Earth radius in nm ---
    R = 3440.065

    # --- ENU vector from CM → Aircraft (flat Earth approx for short ranges) ---
    dlat = lat_ac - lat_cm
    dlon = lon_ac - lon_cm
    dx = R * np.cos(lat_cm) * dlon     # east displacement [nm]
    dy = R * dlat                      # north displacement [nm]
    dz = (df[alt_col] - df[cm_alt_col]) / 6076.12  # alt diff [nm]
    vec_cm2ac = np.stack([dx, dy, dz], axis=1)

    # --- Missile velocity vector from heading ---
    cm_heading_rad = np.radians(df[cm_heading_col])
    # assume level flight (no vertical velocity)
    vx = np.sin(cm_heading_rad)
    vy = np.cos(cm_heading_rad)
    vz = 0.0
    vec_cm_vel = np.stack([vx, vy, np.full_like(vx, vz)], axis=1)

    # --- CONDITION 2: Aft + distance ---
    dist = np.linalg.norm(vec_cm2ac, axis=1)   # straight-line dist [nm]
    projection = np.sum(vec_cm2ac * vec_cm_vel, axis=1)
    aft_mask = projection < 0                  # behind the CM
    cond2 = (dist <= 1.5) & aft_mask

    # --- CONDITION 3: Inside trailing cone ---
    dot = np.sum(vec_cm2ac * vec_cm_vel, axis=1)
    cos_angle = dot / (dist * np.linalg.norm(vec_cm_vel, axis=1))
    cos_angle = np.clip(cos_angle, -1, 1)      # numerical safety
    angle = np.degrees(np.arccos(cos_angle))
    cond3 = angle <= 30

    # --- CONDITION 4: Heading Sanity Check ---
    heading_diff = np.abs(df[heading_col] - df[cm_heading_col])
    heading_diff = np.where(heading_diff > 180, 360 - heading_diff, heading_diff)
    cond4 = heading_diff <= 20

    intercept_criteria = cond1 & cond2 & cond3 & cond4

    # --- Combine all conditions ---
    return intercept_criteria

In [32]:
sortie_df['Lead_Intercept'] = is_within_cone(sortie_df, 'Lead')
sortie_df['Wing_Intercept'] = is_within_cone(sortie_df, 'Wing')

## Define MOPs

In [33]:
def altitude_deviation(df, role, lead_alt=lead_alt, wing_alt=wing_alt):
    """
    Calculate altitude deviation from assigned altitude block.
    """
    alt_col = f'Altitude_Msl_{role}'
    assigned_alt = lead_alt if role == 'Lead' else wing_alt

    # define a block of altitudes +- 1000ft from assigned altitude
    alt_block_min = assigned_alt - 1000
    alt_block_max = assigned_alt + 1000
    
    # create a column for altitude deviation
    deviation = np.where(
        df[alt_col] < alt_block_min,
        alt_block_min - df[alt_col],
        np.where(
            df[alt_col] > alt_block_max,
            df[alt_col] - alt_block_max,
            0
        )
    )
    
    return deviation

In [34]:
sortie_df['Lead_Alt_Dev'] = altitude_deviation(sortie_df, 'Lead', lead_alt=lead_alt)
sortie_df['Wing_Alt_Dev'] = altitude_deviation(sortie_df, 'Wing', wing_alt=wing_alt)

### Altitude Deviation

In [47]:
# create a function that integrates the altitude deviation over time
def integrate_altitude_deviation(df, role, time_start=None, time_end=None):
    """
    Integrate altitude deviation over time to get total altitude deviation.
    """
    alt_dev_col = f'{role}_Alt_Dev'
    
    # Calculate time difference in seconds
    df_copy = df.copy()
    time_start = pd.to_datetime(time_start) if time_start else None
    time_end = pd.to_datetime(time_end) if time_end else None
    if time_start is not None:
        df_copy = df_copy[df_copy['Timestamp'] >= time_start]
    if time_end is not None:
        df_copy = df_copy[df_copy['Timestamp'] <= time_end]
    df_copy = df_copy.sort_values('Timestamp')
    df_copy['Time_Diff'] = df_copy['Timestamp'].diff().dt.total_seconds().fillna(0)
    
    # Integrate altitude deviation over time
    integrated_dev = (df_copy[alt_dev_col] * df_copy['Time_Diff']).cumsum()

    # only return the final value
    if not integrated_dev.empty:
        integrated_dev = integrated_dev.iloc[-1]
    else:
        integrated_dev = 0.0
    
    return integrated_dev

In [48]:
integrate_altitude_deviation(sortie_df, 'Lead')

54481.92214825266

### Terminal Condition Error

In [53]:
# define a function to calculate the terminal condition error for interception events 
def terminal_condition_error(df, role):
    """
    Calculate terminal condition error for interception events.
    """
    intercept_col = f'{role}_Intercept'
    
    # filter to only interception events
    intercept_events = df[df[intercept_col]]
    
    # record the intercepting aircraft, its altitude, airspeed, and distance to CM at the moment of interception
    if intercept_events.empty:
        return None
    else:
        alt_col = f'Altitude_Msl_{role}'
        airspeed_col = f'True_Airspeed_{role}'
        
        # Calculate distance to CM
        R = 3440.065
        lat_ac = np.radians(intercept_events[f'Latitude_{role}'])
        lon_ac = np.radians(intercept_events[f'Longitude_{role}'])
        lat_cm = np.radians(intercept_events['Latitude'])
        lon_cm = np.radians(intercept_events['Longitude'])
        dlat = lat_ac - lat_cm
        dlon = lon_ac - lon_cm
        dx = R * np.cos(lat_cm) * dlon
        dy = R * dlat
        dz = (intercept_events[alt_col] - intercept_events[f'CM_Altitude_{role}']) / 6076.12
        distance_to_cm = np.sqrt(dx**2 + dy**2 + dz**2)
        intercept_summary = pd.DataFrame({
            'Timestamp': intercept_events['Timestamp'],
            'Altitude': intercept_events[alt_col],
            'Airspeed': intercept_events[airspeed_col],
            'Distance_to_CM': distance_to_cm
        })
        return intercept_summary

In [54]:
terminal_condition_error(sortie_df, 'Lead')

Mulltiple CMs ^^, timing / consent, checks for both, order of events .., feed for both lead and wing .., times for scenario start / end ?