In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from filterpy.kalman import UnscentedKalmanFilter as UKF
from filterpy.kalman import MerweScaledSigmaPoints

# Constants for spiral
R = 8.25  # Radius
alpha = 0.67  # Vertical rise rate


def quaternion_multiply(q, r):
    w0, x0, y0, z0 = q
    w1, x1, y1, z1 = r
    return np.array([
        -x0 * x1 - y0 * y1 - z0 * z1 + w0 * w1,
        x0 * w1 + y0 * z1 - z0 * y1 + w0 * x1,
        -x0 * z1 + y0 * w1 + z0 * x1 + w0 * y1,
        x0 * y1 - y0 * x1 + z0 * w1 + w0 * z1], dtype=float)


def integrate_quaternion(qx, qy, qz, qw, ang_vel_x, ang_vel_y, ang_vel_z, dt):
    omega = np.array([0, ang_vel_x, ang_vel_y, ang_vel_z])
    q = np.array([qw, qx, qy, qz])

    q_dot = 0.5 * quaternion_multiply(q, omega)
    q_new = q + q_dot * dt
    norm = np.linalg.norm(q_new)
    q_new /= norm
    return q_new[1], q_new[2], q_new[3], q_new[0]


def quaternion_to_rotation_matrix(q):
    w, x, y, z = q
    return np.array([
        [1 - 2 * (y ** 2 + z ** 2), 2 * (x * y - z * w), 2 * (x * z + y * w)],
        [2 * (x * y + z * w), 1 - 2 * (x ** 2 + z ** 2), 2 * (y * z - x * w)],
        [2 * (x * z - y * w), 2 * (y * z + x * w), 1 - 2 * (x ** 2 + y ** 2)]
    ])


def fx(x, dt):
    pos_x, pos_y, pos_z, vel_x, vel_y, vel_z, qx, qy, qz, qw, ang_vel_x, ang_vel_y, ang_vel_z = x
    q = np.array([qw, qx, qy, qz])
    R = quaternion_to_rotation_matrix(q)
    acc_body = np.array([Accel_X, Accel_Y, Accel_Z - g])
    acc_world = R @ acc_body

    new_pos = np.array([pos_x, pos_y, pos_z]) + acc_world * dt ** 2 * 0.5 + np.array([vel_x, vel_y, vel_z]) * dt
    new_vel = np.array([vel_x, vel_y, vel_z]) + acc_world * dt
    new_qx, new_qy, new_qz, new_qw = integrate_quaternion(qx, qy, qz, qw, ang_vel_x, ang_vel_y, ang_vel_z, dt)
    return np.concatenate((new_pos, new_vel, [new_qx, new_qy, new_qz, new_qw], [ang_vel_x, ang_vel_y, ang_vel_z]))


def hx(x):
    return x[:3]


def calculate_altitude(pressure, P0, T0, g, R):
    T0_kelvin = T0 + 273.15
    return (T0_kelvin / g) * np.log(P0 / pressure)


# Load the sensor data
data = pd.read_csv('../utils/spiral_data/sensor_data_up_w.csv')

P0 = data['Pressure'][0]
T0 = data['Temperature'][0]
R_const = 287.05
g = 9.80665

# Adjust initial positions and velocities for stability at the start
initial_x = 0.0
initial_y = 0.0
initial_z = alpha
initial_vertical_velocity = 0.0

dt = 1.0 / 25
n_states = 13
points = MerweScaledSigmaPoints(n=n_states, alpha=0.1, beta=2.0, kappa=-1.0)
kf = UKF(dim_x=13, dim_z=3, fx=fx, hx=hx, dt=dt, points=points)
kf.x = np.array([initial_x, initial_y, initial_z, 0, 0, initial_vertical_velocity, 1, 0, 0, 0, 0, 0, 0])

# Improve filter stability by adjusting noise
kf.P *= 0.1
kf.R = np.diag([0.15, 0.15, 0.001])  # Adjusted measurement noise
kf.Q = np.eye(n_states) * 0.01  # Improved process noise setup

positions = []

for index, row in data.iterrows():
    Accel_X, Accel_Y, Accel_Z = row['Accel_X'], row['Accel_Y'], row['Accel_Z']
    mag_x = row['Magneto_X']
    mag_y = row['Magneto_Y']
    mag_z = row['Magneto_Z']
    pressure = row['Pressure']

    altitude = calculate_altitude(pressure, P0, T0, g, R_const)
    z = np.array([0, 0, altitude])

    kf.predict()
    kf.update(z)

    positions.append(kf.x[:3])

positions = np.array(positions)

fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
ax.plot(positions[:, 0], positions[:, 1], positions[:, 2], label='Estimated Position')
ax.set_xlabel('X Position')
ax.set_ylabel('Y Position')
ax.set_zlabel('Z Position')
ax.legend()
plt.show()

time_steps = np.arange(positions.shape[0]) * dt

plt.figure()
plt.plot(time_steps, positions[:, 0], label='X Position')
plt.plot(time_steps, positions[:, 1], label='Y Position')
plt.plot(time_steps, positions[:, 2], label='Z Position')
plt.xlabel('Time (s)')
plt.ylabel('Position')
plt.title('X, Y, Z Positions Over Time')
plt.legend()
plt.show()

In [None]:
def quaternion_multiply(q, r):
    w0, x0, y0, z0 = q
    w1, x1, y1, z1 = r
    return np.array([
        -x0 * x1 - y0 * y1 - z0 * z1 + w0 * w1,
        x0 * w1 + y0 * z1 - z0 * y1 + w0 * x1,
        -x0 * z1 + y0 * w1 + z0 * x1 + w0 * y1,
        x0 * y1 - y0 * x1 + z0 * w1 + w0 * z1], dtype=float)


def integrate_quaternion(qx, qy, qz, qw, ang_vel_x, ang_vel_y, ang_vel_z, dt):
    omega = np.array([0, ang_vel_x, ang_vel_y, ang_vel_z])
    q = np.array([qw, qx, qy, qz])

    q_dot = 0.5 * quaternion_multiply(q, omega)
    q_new = q + q_dot * dt
    norm = np.linalg.norm(q_new)
    q_new /= norm
    return q_new[1], q_new[2], q_new[3], q_new[0]


def quaternion_to_rotation_matrix(q):
    w, x, y, z = q
    return np.array([
        [1 - 2 * (y ** 2 + z ** 2), 2 * (x * y - z * w), 2 * (x * z + y * w)],
        [2 * (x * y + z * w), 1 - 2 * (x ** 2 + z ** 2), 2 * (y * z - x * w)],
        [2 * (x * z - y * w), 2 * (y * z + x * w), 1 - 2 * (x ** 2 + y ** 2)]
    ])


def fx(x, dt):
    pos_x, pos_y, pos_z, vel_x, vel_y, vel_z, qx, qy, qz, qw, ang_vel_x, ang_vel_y, ang_vel_z = x
    q = np.array([qw, qx, qy, qz])
    R = quaternion_to_rotation_matrix(q)
    acc_body = np.array([Accel_X, Accel_Y, Accel_Z - g])
    acc_world = R @ acc_body

    new_pos = np.array([pos_x, pos_y, pos_z]) + acc_world * dt ** 2 * 0.5 + np.array([vel_x, vel_y, vel_z]) * dt
    new_vel = np.array([vel_x, vel_y, vel_z]) + acc_world * dt
    new_qx, new_qy, new_qz, new_qw = integrate_quaternion(qx, qy, qz, qw, ang_vel_x, ang_vel_y, ang_vel_z, dt)
    return np.concatenate((new_pos, new_vel, [new_qx, new_qy, new_qz, new_qw], [ang_vel_x, ang_vel_y, ang_vel_z]))


def hx(x):
    return x[:3]


def calculate_altitude(pressure, P0, T0, g, R):
    T0_kelvin = T0 + 273.15
    return (T0_kelvin / g) * np.log(P0 / pressure)


data = pd.read_csv('../utils/spiral_data/sensor_data_up_b.csv')

P0 = data['Pressure'][0]
T0 = data['Temperature'][0]
R_const = 287.05
g = 9.80665

dt = 1.0 / 25
n_states = 13
points = MerweScaledSigmaPoints(n=n_states, alpha=0.1, beta=2.0, kappa=-1.0)
kf = UKF(dim_x=13, dim_z=3, fx=fx, hx=hx, dt=dt, points=points)
kf.x = np.array([0, 0, 0, 0, 0, alpha, 1, 0, 0, 0, 0, 0, 0])
kf.P *= 0.1
kf.R = np.diag([1.0, 1.0, 0.1])  # Lower noise for altitude
kf.Q = np.eye(n_states) * 0.1

positions = []

for index, row in data.iterrows():
    Accel_X, Accel_Y, Accel_Z = row['Accel_X'], row['Accel_Y'], row['Accel_Z']
    mag_x = row['Magneto_X']
    mag_y = row['Magneto_Y']
    mag_z = row['Magneto_Z']
    pressure = row['Pressure']

    altitude = calculate_altitude(pressure, P0, T0, g, R_const)

    z = np.array([mag_x, mag_y, altitude])

    kf.predict()
    kf.update(z)

    positions.append(kf.x[:3])

positions = np.array(positions)

# 3D Plot of estimated spiral trajectory
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
ax.plot(positions[:, 0], positions[:, 1], positions[:, 2], label='Estimated Position')
ax.set_xlabel('X Position')
ax.set_ylabel('Y Position')
ax.set_zlabel('Z Position')
ax.legend()
plt.show()

# 2D Plot of X, Y, Z positions over time
time_steps = np.arange(positions.shape[0]) * dt

plt.figure()
plt.plot(time_steps, positions[:, 0], label='X Position')
plt.plot(time_steps, positions[:, 1], label='Y Position')
plt.plot(time_steps, positions[:, 2], label='Z Position')
plt.xlabel('Time (s)')
plt.ylabel('Position')
plt.title('X, Y, Z Positions Over Time')
plt.legend()
plt.show()

In [None]:
def quaternion_multiply(q, r):
    w0, x0, y0, z0 = q
    w1, x1, y1, z1 = r
    return np.array([
        -x0 * x1 - y0 * y1 - z0 * z1 + w0 * w1,
        x0 * w1 + y0 * z1 - z0 * y1 + w0 * x1,
        -x0 * z1 + y0 * w1 + z0 * x1 + w0 * y1,
        x0 * y1 - y0 * x1 + z0 * w1 + w0 * z1], dtype=float)


def integrate_quaternion(qx, qy, qz, qw, ang_vel_x, ang_vel_y, ang_vel_z, dt):
    omega = np.array([0, ang_vel_x, ang_vel_y, ang_vel_z])
    q = np.array([qw, qx, qy, qz])

    q_dot = 0.5 * quaternion_multiply(q, omega)
    q_new = q + q_dot * dt
    norm = np.linalg.norm(q_new)
    q_new /= norm
    return q_new[1], q_new[2], q_new[3], q_new[0]


def quaternion_to_rotation_matrix(q):
    w, x, y, z = q
    return np.array([
        [1 - 2 * (y ** 2 + z ** 2), 2 * (x * y - z * w), 2 * (x * z + y * w)],
        [2 * (x * y + z * w), 1 - 2 * (x ** 2 + z ** 2), 2 * (y * z - x * w)],
        [2 * (x * z - y * w), 2 * (y * z + x * w), 1 - 2 * (x ** 2 + y ** 2)]
    ])


def fx(x, dt):
    pos_x, pos_y, pos_z, vel_x, vel_y, vel_z, qx, qy, qz, qw, ang_vel_x, ang_vel_y, ang_vel_z = x
    q = np.array([qw, qx, qy, qz])
    R = quaternion_to_rotation_matrix(q)
    acc_body = np.array([Accel_X, Accel_Y, Accel_Z - g])
    acc_world = R @ acc_body

    new_pos = np.array([pos_x, pos_y, pos_z]) + acc_world * dt ** 2 * 0.5 + np.array([vel_x, vel_y, vel_z]) * dt
    new_vel = np.array([vel_x, vel_y, vel_z]) + acc_world * dt
    new_qx, new_qy, new_qz, new_qw = integrate_quaternion(qx, qy, qz, qw, ang_vel_x, ang_vel_y, ang_vel_z, dt)
    return np.concatenate((new_pos, new_vel, [new_qx, new_qy, new_qz, new_qw], [ang_vel_x, ang_vel_y, ang_vel_z]))


def hx(x):
    return x[:3]


def calculate_altitude(pressure, P0, T0, g, R):
    T0_kelvin = T0 + 273.15
    return (T0_kelvin / g) * np.log(P0 / pressure)


# Load the sensor data
data = pd.read_csv('../utils/spiral_data/sensor_data_up_b.csv')

P0 = data['Pressure'][0]
T0 = data['Temperature'][0]
R_const = 287.05
g = 9.80665

# Adjust initial positions and velocities for stability at the start
initial_x = 0.0
initial_y = 0.0
initial_z = alpha
initial_vertical_velocity = 0.0

dt = 1.0 / 25
n_states = 13
points = MerweScaledSigmaPoints(n=n_states, alpha=0.1, beta=2.0, kappa=-1.0)
kf = UKF(dim_x=13, dim_z=3, fx=fx, hx=hx, dt=dt, points=points)
kf.x = np.array([initial_x, initial_y, initial_z, 0, 0, initial_vertical_velocity, 1, 0, 0, 0, 0, 0, 0])

# Improve filter stability by adjusting noise
kf.P *= 0.1
kf.R = np.diag([0.15, 0.15, 0.001])  # Adjusted measurement noise
kf.Q = np.eye(n_states) * 0.01  # Improved process noise setup

positions = []

for index, row in data.iterrows():
    Accel_X, Accel_Y, Accel_Z = row['Accel_X'], row['Accel_Y'], row['Accel_Z']
    mag_x = row['Magneto_X']
    mag_y = row['Magneto_Y']
    mag_z = row['Magneto_Z']
    pressure = row['Pressure']

    altitude = calculate_altitude(pressure, P0, T0, g, R_const)
    z = np.array([mag_x, mag_y, altitude])

    kf.predict()
    kf.update(z)

    positions.append(kf.x[:3])

positions = np.array(positions)

fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
ax.plot(positions[:, 0], positions[:, 1], positions[:, 2], label='Estimated Position')
ax.set_xlabel('X Position')
ax.set_ylabel('Y Position')
ax.set_zlabel('Z Position')
ax.legend()
plt.show()

time_steps = np.arange(positions.shape[0]) * dt

plt.figure()
plt.plot(time_steps, positions[:, 0], label='X Position')
plt.plot(time_steps, positions[:, 1], label='Y Position')
plt.plot(time_steps, positions[:, 2], label='Z Position')
plt.xlabel('Time (s)')
plt.ylabel('Position')
plt.title('X, Y, Z Positions Over Time')
plt.legend()
plt.show()

# Findings

The magnetometer either needs to be calibrated properly or not used in the Group T building as there are too many sources of magnetic interference. Switching to the accelerometer now.
