In [None]:
import numpy as np
import pandas as pd
from pyproj import CRS, Transformer
import math

def normalize_angle(angle):
    return (angle + np.pi) % (2 * np.pi) - np.pi

print("=== Extended Kalman Filter with IMU + GPS + Magnetometer Fusion ===")

# Load CSV data
data = pd.read_csv('/home/pi/Downloads/imu_gps_mag_log_20250502_200256.csv')

# Constants
dt = 0.1  # Time step (adjust based on your system)
g = 9.80665
deg2rad = np.pi / 180

# Initial GPS position
init_lat = float(data.loc[0, 'Lat'])
init_lon = float(data.loc[0, 'Lon'])

# UTM projection
zone_number = int((init_lon + 180) / 6) + 1
is_northern = init_lat >= 0
proj_wgs84 = CRS.from_epsg(4326)
proj_utm = CRS.from_epsg(32600 + zone_number) if is_northern else CRS.from_epsg(32700 + zone_number)
to_utm = Transformer.from_crs(proj_wgs84, proj_utm, always_xy=True)
to_wgs = Transformer.from_crs(proj_utm, proj_wgs84, always_xy=True)

# Convert initial GPS to UTM
utm_x, utm_y = to_utm.transform(init_lon, init_lat)

# Initial states
state = np.array([utm_x, utm_y, 0.0, 0.0, 0.0])  # [x, y, vx, vy, yaw]
imu_only_state = np.copy(state)
P = np.eye(5) * 100.0  # Reduced uncertainty for better convergence

# Noise parameters (tuned values)
Q = np.diag([0.5, 0.5, 0.1, 0.1, 0.05])
R = np.diag([5.0, 5.0])  # GPS measurement noise

H = np.array([
    [1, 0, 0, 0, 0],
    [0, 1, 0, 0, 0]
])

output = []

for i in range(len(data)):
    try:
        # Sensor readings
        lat = float(data.loc[i, 'Lat'])
        lon = float(data.loc[i, 'Lon'])
        Ax = float(data.loc[i, 'Ax (m/s²)'])
        Ay = float(data.loc[i, 'Ay (m/s²)'])
        Gz = float(data.loc[i, 'Gz (°/s)']) * deg2rad

        # Optional magnetometer (for yaw correction)
        use_mag = 'MagX (µT)' in data.columns and 'MagY (µT)' in data.columns
        if use_mag:
            MagX = float(data.loc[i, 'MagX (µT)'])
            MagY = float(data.loc[i, 'MagY (µT)'])
            yaw_mag = math.atan2(-MagY, MagX)  # -Y for compass heading
        else:
            yaw_mag = None

        # Convert GPS to UTM
        gps_x, gps_y = to_utm.transform(lon, lat)

        # ===== IMU-only prediction =====
        yaw_imu = imu_only_state[4]
        ax_imu = Ax * math.cos(yaw_imu) - Ay * math.sin(yaw_imu)
        ay_imu = Ax * math.sin(yaw_imu) + Ay * math.cos(yaw_imu)

        imu_only_state[2] += ax_imu * dt
        imu_only_state[3] += ay_imu * dt
        imu_only_state[0] += imu_only_state[2] * dt
        imu_only_state[1] += imu_only_state[3] * dt
        imu_only_state[4] += Gz * dt
        imu_only_state[4] = normalize_angle(imu_only_state[4])
        imu_lon, imu_lat = to_wgs.transform(imu_only_state[0], imu_only_state[1])

        # ===== EKF prediction =====
        yaw = state[4]
        ax_global = Ax * math.cos(yaw) - Ay * math.sin(yaw)
        ay_global = Ax * math.sin(yaw) + Ay * math.cos(yaw)

        # State prediction
        state[2] += ax_global * dt
        state[3] += ay_global * dt
        state[0] += state[2] * dt
        state[1] += state[3] * dt
        state[4] += Gz * dt
        state[4] = normalize_angle(state[4])

        # Transition Jacobian F
        F = np.eye(5)
        F[0, 2] = dt
        F[1, 3] = dt
        F[2, 4] = -Ax * math.sin(yaw) * dt - Ay * math.cos(yaw) * dt
        F[3, 4] = Ax * math.cos(yaw) * dt - Ay * math.sin(yaw) * dt

        # Covariance prediction
        P = F @ P @ F.T + Q

        # Measurement update with GPS
        Z = np.array([gps_x, gps_y])
        Y = Z - H @ state
        S = H @ P @ H.T + R
        K = P @ H.T @ np.linalg.inv(S)

        state = state + K @ Y
        P = (np.eye(5) - K @ H) @ P

        # Optional magnetometer correction
        if yaw_mag is not None:
            yaw_diff = normalize_angle(yaw_mag - state[4])
            state[4] += 0.05 * yaw_diff  # Simple complementary correction
            state[4] = normalize_angle(state[4])

        filt_lon, filt_lat = to_wgs.transform(state[0], state[1])

        output.append({
            'GPS_LAT': lat,
            'GPS_LONG': lon,
            'FILTERED_LAT': filt_lat,
            'FILTERED_LONG': filt_lon,
            'IMU_LAT': imu_lat,
            'IMU_LONG': imu_lon,
            'YAW_FILTERED_DEG': np.degrees(state[4])
        })

    except Exception as e:
        print(f"Error at row {i}: {e}")
        continue

# Save results
df_out = pd.DataFrame(output)
df_out.to_csv("ekf_filtered_output.csv", index=False)
print("✅ Filtered output saved as 'ekf_filtered_output.csv'")