In [1]:
import csv
import pandas as pd
import numpy as np
from scipy.spatial.transform import Rotation as R

In [2]:
flight_nr = "17"

In [3]:
# Path templates for GPS and VO data
gps_data_template = "/mnt/hdd02/ba_copter_videos/Flight_{}/{}.csv"
vo_data_template = "/mnt/hdd02/ba_copter_videos/Flight_{}/CameraTrajectory_{}.csv"

# Path templates for synchronized GPS and VO trajectories
interpolated_gps_data_template = "/mnt/hdd02/ba_copter_videos/Flight_{}/gps_{}_interpolated.traj"
interpolated_vo_data_template = "/mnt/hdd02/ba_copter_videos/Flight_{}/vo_{}_interpolated.traj"
time_synced_gps_data_template = "/mnt/hdd02/ba_copter_videos/Flight_{}/gps_{}_time_synced.traj"
time_synced_vo_data_template = "/mnt/hdd02/ba_copter_videos/Flight_{}/vo_{}_time_synced.traj"
average_vo_data_template = "/mnt/hdd02/ba_copter_videos/Flight_{}/vo_{}_averaged.traj"

# Replace the flight number in the path templates
gps_data_path = gps_data_template.format(flight_nr, flight_nr)
vo_data_path = vo_data_template.format(flight_nr, flight_nr)

interpolated_gps_data_path = interpolated_gps_data_template.format(flight_nr, flight_nr)
interpolated_vo_data_path = interpolated_vo_data_template.format(flight_nr, flight_nr)
time_synced_gps_data_path = time_synced_gps_data_template.format(flight_nr, flight_nr)
time_synced_vo_data_path = time_synced_vo_data_template.format(flight_nr, flight_nr)
average_vo_data_path = average_vo_data_template.format(flight_nr, flight_nr)

In [4]:
def compute_quaternion_components(data):
    # Extracting data
    roll = np.deg2rad(data["roll(deg)"])
    pitch = np.deg2rad(data["pitch(deg)"])
    yaw = np.deg2rad(data["yaw(deg)"])

    # Initialize lists to store computed quaternion components
    x_list = []
    y_list = []
    z_list = []
    w_list = []

    # Compute quaternion components
    for r, p, y in zip(roll, pitch, yaw):
        r_obj = R.from_euler('xyz', [r, p, y], degrees=False)
        quaternion = r_obj.as_quat()
        x_list.append(quaternion[0])
        y_list.append(quaternion[1])
        z_list.append(quaternion[2])
        w_list.append(quaternion[3])
        
    return x_list, y_list, z_list, w_list

In [5]:
# read in data and translate coordinates into meters
def create_gps_data_list(data):
    # Extracting data
    time = data["time(millisecond)"]
    pos_x = data["latitude"]
    pos_y = data["longitude"]
    pos_z_m = data["altitude(m)"] 
    #pos_z_m = [ft * 0.3048 for ft in pos_z_ft] 
    quaternion_components = compute_quaternion_components(data)
    quaternion_x = quaternion_components[0]
    quaternion_y = quaternion_components[1]
    quaternion_z = quaternion_components[2]
    quaternion_w = quaternion_components[3]

    # Initialize list to store computed data
    data_list = []

    # Setting start point
    start_x = pos_x[0]
    start_y = pos_y[0]

    skip = True
    
    # Combine data into a list of tuples
    for t, x, y, z, qx, qy, qz, qw in zip(time, pos_x, pos_y, pos_z_m, quaternion_x, quaternion_y, quaternion_z, quaternion_w):
        if z == 0 & skip:   # Filter out initial points with height 0
            continue
        else:
            # Converting coordinates to meters and subtracting start point
            x_m = (x - start_x) * 111000  # 1 degree latitude is approximately 111 km
            y_m = (y - start_y) * 111000  # 1 degree longitude is approximately 111 km (at the equator)

            # Adding data to the list
            data_list.append((t, x_m, y_m, z, qx, qy, qz, qw))
            skip = False
    
    return data_list

In [6]:
def create_vo_data_list(data):
    # Lists for delta values
    time = []
    cam_delta_twb_x = []
    cam_delta_twb_y = []
    cam_delta_twb_z = []
    quaternion_x = []
    quaternion_y = []
    quaternion_z = []
    quaternion_w = []

    skip = True
    
    # Read trajectory data and store delta values in lists
    with open(data, 'r') as f:
        for line in f:
            parts = line.split()
            if parts[3] == 0 & skip:
                continue
            else:
                time.append(float(parts[0]))
                cam_delta_twb_x.append(float(parts[1]))
                cam_delta_twb_y.append(float(parts[2]))
                cam_delta_twb_z.append(float(parts[3]))
                quaternion_x.append(float(parts[4]))
                quaternion_y.append(float(parts[5]))
                quaternion_z.append(float(parts[6]))
                quaternion_w.append(float(parts[7]))
                skip = False

    data_list = []

    for t, x, y, z, qx, qy, qz, qw in zip(time, cam_delta_twb_x, cam_delta_twb_y, cam_delta_twb_z, quaternion_x, quaternion_y, quaternion_z, quaternion_w):
        data_list.append((t, x, y, z, qx, qy, qz, qw))
    
    return data_list

In [7]:
# translate both times in seconds and start with zero
def sync_times(gps_data, vo_data):
    # Extract timestamps from the datasets
    # Convert GPS times from milliseconds to seconds
    gps_times = [gps_point[0] / 1000 for gps_point in gps_data]

    # Convert VO times from nanoseconds to seconds
    vo_times = [vo_point[0] / 1e9 for vo_point in vo_data]

    # Set the start time to the minimum of both datasets
    gps_start_time = min(gps_times)
    vo_start_time = min(vo_times)

    # Synchronize the timestamps of both datasets
    synced_gps_times = [round(t - gps_start_time, 3) for t in gps_times]
    synced_vo_times = [round(t - vo_start_time, 3) for t in vo_times]

    # Synchronize GPS data
    synced_gps_data = []
    for gps_point, synced_time in zip(gps_data, synced_gps_times):
        synced_gps_data.append((synced_time, *gps_point[1:]))

    # Synchronize VO data
    synced_vo_data = []
    for vo_point, synced_time in zip(vo_data, synced_vo_times):
        synced_vo_data.append((synced_time, *vo_point[1:]))
    
    return synced_gps_data, synced_vo_data

In [8]:
# average vo values onto gps
def average_vo_with_gps(gps_data, vo_data):
    synced_vo_averages = []

    # Iterate through the GPS data
    for i in range(len(gps_data)):
        gps_time, gps_x, gps_y, gps_z, *gps_quaternion = gps_data[i]

        # Find the VO data points between the current and previous GPS data
        if i == 0:
            vo_points_between_gps = []
        else:
            vo_points_between_gps = [vo_point for vo_point in vo_data if gps_data[i-1][0] < vo_point[0] < gps_time]

        if vo_points_between_gps:
            # Compute the average of the VO data
            vo_average_x = sum([vo_point[1] for vo_point in vo_points_between_gps]) / len(vo_points_between_gps)
            vo_average_y = sum([vo_point[2] for vo_point in vo_points_between_gps]) / len(vo_points_between_gps)
            vo_average_z = sum([vo_point[3] for vo_point in vo_points_between_gps]) / len(vo_points_between_gps)
            vo_average_quaternion_x = sum([vo_point[4] for vo_point in vo_points_between_gps]) / len(vo_points_between_gps)
            vo_average_quaternion_y = sum([vo_point[5] for vo_point in vo_points_between_gps]) / len(vo_points_between_gps)
            vo_average_quaternion_z = sum([vo_point[6] for vo_point in vo_points_between_gps]) / len(vo_points_between_gps)
            vo_average_quaternion_w = sum([vo_point[7] for vo_point in vo_points_between_gps]) / len(vo_points_between_gps)

            # Append the synchronized data
            synced_vo_averages.append((gps_time, vo_average_x, vo_average_y, vo_average_z,
                                        vo_average_quaternion_x, vo_average_quaternion_y,
                                        vo_average_quaternion_z, vo_average_quaternion_w))
        else:
            # Append the GPS data itself if no VO data points are found
            synced_vo_averages.append((gps_time, gps_x, gps_y, gps_z, *gps_quaternion))

    return synced_vo_averages

In [9]:
# interpolated missing gps values to vo
def interpolate_gps_with_vo(gps_data, vo_data):
    interpolated_gps_data = []

    # Iterate through the VO data
    for vo_point in vo_data:
        vo_time = vo_point[0]

        # Find the two GPS data points surrounding the VO time
        for prev_gps_point, next_gps_point in zip(gps_data[:-1], gps_data[1:]):
            prev_time, prev_x, prev_y, prev_z, *prev_quaternion = prev_gps_point
            next_time, next_x, next_y, next_z, *next_quaternion = next_gps_point

            if prev_time <= vo_time < next_time:
                # Calculate the interpolation factor
                factor = (vo_time - prev_time) / (next_time - prev_time)

                # Interpolate the GPS position
                interp_x = prev_x + factor * (next_x - prev_x)
                interp_y = prev_y + factor * (next_y - prev_y)
                interp_z = prev_z + factor * (next_z - prev_z)

                # Interpolate the GPS quaternion
                interp_quaternion = [
                    prev_quaternion[j] + factor * (next_quaternion[j] - prev_quaternion[j])
                    for j in range(4)
                ]

                # Append the interpolated GPS data
                interpolated_gps_data.append((vo_time, interp_x, interp_y, interp_z, *interp_quaternion))
                break

    return interpolated_gps_data

In [10]:
def slerp(q0, q1, t):
    """
    Perform Spherical Linear Interpolation (SLERP) between two quaternions.
    
    Args:
    - q0 (array): The starting quaternion.
    - q1 (array): The ending quaternion.
    - t (float): The interpolation factor between 0 and 1.
    
    Returns:
    - qt (array): The interpolated quaternion.
    """
    q0 = np.array(q0)
    q1 = np.array(q1)
    
    dot_product = np.dot(q0, q1)

    # If the dot product is negative, the quaternions have opposite handedness
    # and slerp won't take the shorter path. Fix by reversing one quaternion.
    if dot_product < 0.0:
        q1 = -q1
        dot_product = -dot_product

    # Clamp dot_product to avoid numerical errors
    dot_product = np.clip(dot_product, -1.0, 1.0)

    # Calculate coefficients
    theta_0 = np.arccos(dot_product)  # Angle between input quaternions
    sin_theta_0 = np.sin(theta_0)

    if sin_theta_0 > 0.001:
        theta = theta_0 * t
        sin_theta = np.sin(theta)
        s0 = np.cos(theta) - dot_product * sin_theta / sin_theta_0
        s1 = sin_theta / sin_theta_0
    else:
        # If the quaternions are nearly identical, use linear interpolation
        s0 = 1.0 - t
        s1 = t

    qt = (s0 * q0) + (s1 * q1)
    return qt

In [11]:
def interpolate_trajectory(traj1, traj2, point_multiplicator=1):
    """
    Interpolates two trajectories to have the same number of points.
    
    Args:
    - traj1 (list of tuples): The first trajectory, where each tuple contains (t, x, y, z, qx, qy, qz, qw).
    - traj2 (list of tuples): The second trajectory, where each tuple contains (t, x, y, z, qx, qy, qz, qw).
    - point_multiplicator (float): A factor to increase the number of points.
    
    Returns:
    - interpolated_traj1 (list of tuples): The interpolated first trajectory.
    - interpolated_traj2 (list of tuples): The interpolated second trajectory.
    """
    num_points = max(len(traj1), len(traj2))
    num_interpolated_points = int(num_points * point_multiplicator)
    
    # Convert the trajectories to numpy arrays for easier manipulation
    traj1_array = np.array(traj1)
    traj2_array = np.array(traj2)

    # add 0,0,0 coords
    np.append(traj1_array, (0.0, 0, 0, 0, 0, 0, 0, 0))
    np.append(traj2_array, (0.0, 0, 0, 0, 0, 0, 0, 0))

    # shorten longer trajectory
    #if len(traj1) != len(traj2):
    #    # Handle the case where the two trajectories have different lengths
    #    if len(traj1) > len(traj2):
    #        # Shorten the longer trajectory to match the length of the shorter one
    #        traj1 = traj1[:len(traj2)]
    #        print("shortened traj1")
    #    elif len(traj1) < len(traj2):
    #        # Shorten the shorter trajectory to match the length of the longer one
    #        traj2 = traj2[:len(traj1)]
    #        print("shortened traj2")
    
    # Extract unique sorted time values from both trajectories
    t_values = sorted(set(traj1_array[:, 0]).union(set(traj2_array[:, 0])))
    
    # Interpolate time values to match the desired number of points
    interpolated_t_values = np.linspace(min(t_values), max(t_values), num_interpolated_points)
    
    # Interpolate x, y, z coordinates for traj1
    interpolated_x1 = np.interp(interpolated_t_values, traj1_array[:, 0], traj1_array[:, 1])
    interpolated_y1 = np.interp(interpolated_t_values, traj1_array[:, 0], traj1_array[:, 2])
    interpolated_z1 = np.interp(interpolated_t_values, traj1_array[:, 0], traj1_array[:, 3])
    
    # Interpolate x, y, z coordinates for traj2
    interpolated_x2 = np.interp(interpolated_t_values, traj2_array[:, 0], traj2_array[:, 1])
    interpolated_y2 = np.interp(interpolated_t_values, traj2_array[:, 0], traj2_array[:, 2])
    interpolated_z2 = np.interp(interpolated_t_values, traj2_array[:, 0], traj2_array[:, 3])
    
    # Interpolate quaternions for traj1
    interpolated_q1 = []
    for t in interpolated_t_values:
        idx = np.searchsorted(traj1_array[:, 0], t)
        if idx == 0:
            q1 = traj1_array[0][4:8]
            interpolated_q1.append(q1)
        elif idx == len(traj1_array):
            q1 = traj1_array[-1][4:8]
            interpolated_q1.append(q1)
        else:
            t1 = traj1_array[idx - 1][0]
            t2 = traj1_array[idx][0]
            q1 = traj1_array[idx - 1][4:8]
            q2 = traj1_array[idx][4:8]
            factor = (t - t1) / (t2 - t1)
            interpolated_q1.append(slerp(q1, q2, factor))
    
    # Interpolate quaternions for traj2
    interpolated_q2 = []
    for t in interpolated_t_values:
        idx = np.searchsorted(traj2_array[:, 0], t)
        if idx == 0:
            q2 = traj2_array[0][4:8]
            interpolated_q2.append(q2)
        elif idx == len(traj2_array):
            q2 = traj2_array[-1][4:8]
            interpolated_q2.append(q2)
        else:
            t1 = traj2_array[idx - 1][0]
            t2 = traj2_array[idx][0]
            q1 = traj2_array[idx - 1][4:8]
            q2 = traj2_array[idx][4:8]
            factor = (t - t1) / (t2 - t1)
            interpolated_q2.append(slerp(q1, q2, factor))
    
    # Combine interpolated x, y, z coordinates and quaternions into tuples
    interpolated_traj1 = [(t, x, y, z, *q) for t, x, y, z, q in zip(interpolated_t_values, interpolated_x1, interpolated_y1, interpolated_z1, interpolated_q1)]
    interpolated_traj2 = [(t, x, y, z, *q) for t, x, y, z, q in zip(interpolated_t_values, interpolated_x2, interpolated_y2, interpolated_z2, interpolated_q2)]
    
    return interpolated_traj1, interpolated_traj2


In [12]:
# get data
gps_data = create_gps_data_list(pd.read_csv(gps_data_path))
vo_data = create_vo_data_list(vo_data_path)

# sync time
time_synced_gps_data, time_synced_vo_data = sync_times(gps_data, vo_data)

# value sync
interpolated_gps_data, interpolated_vo_data = interpolate_trajectory(time_synced_gps_data, time_synced_vo_data)
#average_vo_data = average_vo_with_gps(time_synced_gps_data, time_synced_vo_data)
#interpolated_gps_data = interpolate_gps_with_vo(time_synced_gps_data, time_synced_vo_data)

if len(interpolated_gps_data) != len(interpolated_vo_data):
    print(len(interpolated_gps_data), len(interpolated_vo_data))
    raise ValueError("Interpolation failed. GPS and VO Data has different shape")

# Write data to file
with open(interpolated_gps_data_path, 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerows(interpolated_gps_data)

with open(interpolated_vo_data_path, 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerows(interpolated_vo_data)

#with open(average_vo_data_path, 'w', newline='') as file:
#    writer = csv.writer(file)
#    writer.writerows(average_vo_data)

#with open(time_synced_gps_data_path, 'w', newline='') as file:
#    writer = csv.writer(file)
#    writer.writerows(time_synced_gps_data)

#with open(time_synced_vo_data_path, 'w', newline='') as file:
#    writer = csv.writer(file)
#    writer.writerows(time_synced_vo_data)

print("Done!")


shortened traj2
Done!
