# EECE 5554 Lab 5: NUance Navigation Analysis

In [1]:
import pandas as pd
import numpy as np
import plotly.express as px
from scipy.signal import butter, lfilter, freqz
import plotly.graph_objects as go
from numpy.linalg import inv

## Load Datasets

In [None]:
# Parse IMU CSV by extracting the VNYMR string which contains magnetometer data
# VNYMR format: $VNYMR,yaw,pitch,roll,magX,magY,magZ,accelX,accelY,accelZ,gyroX,gyroY,gyroZ*checksum

def parse_vnymr(row):
    """Extract data from VNYMR string"""
    try:
        # Find the VNYMR string in the row
        vnymr_idx = None
        for i, val in enumerate(row):
            if isinstance(val, str) and val.startswith('$VNYMR'):
                vnymr_idx = i
                break
        
        if vnymr_idx is None:
            return {}
        
        vnymr = str(row[vnymr_idx])
        # Parse: $VNYMR,yaw,pitch,roll,magX,magY,magZ,accelX,accelY,accelZ,gyroX,gyroY,gyroZ*checksum
        parts = vnymr.split(',')
        if len(parts) >= 13:
            return {
                'time_sec': row[0],
                'time_nsec': row[1],
                'mag_x': float(parts[4]),
                'mag_y': float(parts[5]),
                'mag_z': float(parts[6]),
                'accel_x': float(parts[7]),
                'accel_y': float(parts[8]),
                'accel_z': float(parts[9]),
                'gyro_x': float(parts[10]),
                'gyro_y': float(parts[11]),
                'gyro_z': float(parts[12].split('*')[0])
            }
    except:
        return {}
    return {}

# Load the datasets
try:
    print("Loading and parsing IMU data...")
    imu_driving_raw = pd.read_csv('../data/imudriving1.csv', header=None)
    imu_circles_raw = pd.read_csv('../data/imugoing_in_circles1.csv', header=None)
    
    # Parse VNYMR strings
    imu_driving_data = [parse_vnymr(row) for _, row in imu_driving_raw.iterrows()]
    imu_circles_data = [parse_vnymr(row) for _, row in imu_circles_raw.iterrows()]
    
    imu_driving = pd.DataFrame([d for d in imu_driving_data if d])
    imu_circles = pd.DataFrame([d for d in imu_circles_data if d])
    
    print("Loading GPS data...")
    gps_driving_raw = pd.read_csv('../data/gpsdriving1.csv', header=None)
    gps_circles_raw = pd.read_csv('../data/gpsgoing_in_circles1.csv', header=None)
    
    gps_driving = pd.DataFrame({
        'time_sec': gps_driving_raw[0],
        'time_nsec': gps_driving_raw[1],
        'lat': pd.to_numeric(gps_driving_raw[3], errors='coerce'),
        'lon': pd.to_numeric(gps_driving_raw[4], errors='coerce'),
        'alt': pd.to_numeric(gps_driving_raw[5], errors='coerce'),
        'utm_e': pd.to_numeric(gps_driving_raw[6], errors='coerce'),
        'utm_n': pd.to_numeric(gps_driving_raw[7], errors='coerce')
    })
    
    gps_circles = pd.DataFrame({
        'time_sec': gps_circles_raw[0],
        'time_nsec': gps_circles_raw[1],
        'lat': pd.to_numeric(gps_circles_raw[3], errors='coerce'),
        'lon': pd.to_numeric(gps_circles_raw[4], errors='coerce'),
        'alt': pd.to_numeric(gps_circles_raw[5], errors='coerce'),
        'utm_e': pd.to_numeric(gps_circles_raw[6], errors='coerce'),
        'utm_n': pd.to_numeric(gps_circles_raw[7], errors='coerce')
    })

    # Convert time columns and create index
    for df in [imu_driving, gps_driving, imu_circles, gps_circles]:
        df['time_sec'] = pd.to_numeric(df['time_sec'], errors='coerce')
        df['time_nsec'] = pd.to_numeric(df['time_nsec'], errors='coerce')
        df['time'] = df['time_sec'] + df['time_nsec'] * 1e-9
        df.dropna(subset=['time'], inplace=True)
        df.set_index('time', inplace=True)

    print("Datasets loaded successfully.")
    print(f"IMU Driving: {len(imu_driving)} samples")
    print(f"GPS Driving: {len(gps_driving)} samples")
    print(f"IMU Circles: {len(imu_circles)} samples")
    print(f"GPS Circles: {len(gps_circles)} samples")
    print(f"\\nSample IMU data:")
    print(imu_driving[['mag_x', 'mag_y', 'mag_z', 'accel_x', 'gyro_z']].head())
except Exception as e:
    print(f"Error loading data files: {e}")
    import traceback
    traceback.print_exc()

Loading and parsing IMU data...
Loading GPS data...
Error loading data files: 'time_sec'


## Step 1: Magnetometer Calibration (Plot 0 & 1)

In [6]:
# Extract magnetometer data from the 'going in circles' dataset
# Convert to numeric and select only the mag columns
imu_circles['mag_x'] = pd.to_numeric(imu_circles['mag_x'], errors='coerce')
imu_circles['mag_y'] = pd.to_numeric(imu_circles['mag_y'], errors='coerce')
imu_circles['mag_z'] = pd.to_numeric(imu_circles['mag_z'], errors='coerce')

mag_cal_data = imu_circles[['mag_x', 'mag_y', 'mag_z']].dropna()

# Plot uncalibrated data
fig = px.scatter_3d(mag_cal_data, x='mag_x', y='mag_y', z='mag_z', 
                    title='Uncalibrated Magnetometer Data (3D)',
                    labels={'mag_x': 'Mag X', 'mag_y': 'Mag Y', 'mag_z': 'Mag Z'})
fig.show()

# Hard and soft iron calibration using least squares
# Center the data (Hard-iron correction)
mag_bias = mag_cal_data.mean()
mag_hard_corrected = mag_cal_data - mag_bias

# Soft-iron correction (scaling to make sphere)
scale = (mag_hard_corrected.max() - mag_hard_corrected.min()) / 2
avg_scale = scale.mean()
scale_factors = avg_scale / scale
mag_calibrated = mag_hard_corrected * scale_factors

print(f"Magnetometer Bias (Hard-iron): {mag_bias.values}")
print(f"Scale Factors (Soft-iron): {scale_factors.values}")

# Plot 0: Magnetometer data before and after correction (2D projection)
fig0 = go.Figure()
fig0.add_trace(go.Scatter(x=mag_cal_data['mag_x'], y=mag_cal_data['mag_y'], 
                          mode='markers', name='Uncalibrated', marker=dict(size=3)))
fig0.add_trace(go.Scatter(x=mag_calibrated['mag_x'], y=mag_calibrated['mag_y'], 
                          mode='markers', name='Calibrated', marker=dict(size=3)))
fig0.update_layout(title='Plot 0: Magnetometer Data Before and After Correction (XY Plane)', 
                   xaxis_title='Mag X (µT)', yaxis_title='Mag Y (µT)',
                   legend_title='Calibration')
fig0.show()

# Apply calibration to the driving data
imu_driving['mag_x'] = pd.to_numeric(imu_driving['mag_x'], errors='coerce')
imu_driving['mag_y'] = pd.to_numeric(imu_driving['mag_y'], errors='coerce')
imu_driving['mag_z'] = pd.to_numeric(imu_driving['mag_z'], errors='coerce')

mag_driving_data = imu_driving[['mag_x', 'mag_y', 'mag_z']].dropna()
mag_driving_calibrated = (mag_driving_data - mag_bias) * scale_factors

# Calculate Yaw
def calculate_yaw(mag_x, mag_y):
    return np.unwrap(np.arctan2(-mag_y, mag_x))

# Yaw before and after calibration
yaw_uncalibrated = calculate_yaw(mag_driving_data['mag_x'], mag_driving_data['mag_y'])
yaw_calibrated = calculate_yaw(mag_driving_calibrated['mag_x'], mag_driving_calibrated['mag_y'])

# Plot 1: Magnetometer yaw estimation before and after calibration
fig1 = go.Figure()
fig1.add_trace(go.Scatter(x=mag_driving_data.index, y=np.rad2deg(yaw_uncalibrated), 
                          mode='lines', name='Uncalibrated Yaw'))
fig1.add_trace(go.Scatter(x=mag_driving_calibrated.index, y=np.rad2deg(yaw_calibrated), 
                          mode='lines', name='Calibrated Yaw'))
fig1.update_layout(title='Plot 1: Magnetometer Yaw Estimation vs. Time', 
                   xaxis_title='Time (s)', yaxis_title='Yaw (degrees)')
fig1.show()

print(f"\\nCalibration complete. Applied to {len(mag_driving_calibrated)} driving samples.")

Magnetometer Bias (Hard-iron): [nan nan nan]
Scale Factors (Soft-iron): [nan nan nan]


\nCalibration complete. Applied to 0 driving samples.


## Step 2: Heading Estimation (Plot 2 & 3)

In [None]:
# Gyro integration for yaw
gyro_z = imu_driving['gyro_z'].dropna()
dt = imu_driving.index.to_series().diff().median()
gyro_yaw = np.rad2deg(np.cumsum(gyro_z) * dt)

# Plot 2: Gyro yaw estimation
fig2 = go.Figure()
fig2.add_trace(go.Scatter(x=imu_driving.index, y=gyro_yaw, mode='lines', name='Gyro Integrated Yaw'))
fig2.update_layout(title='Plot 2: Gyro Yaw Estimation vs. Time', xaxis_title='Time', yaxis_title='Yaw (degrees)')
fig2.show()

# Complementary Filter
def complementary_filter(mag_yaw, gyro_yaw, alpha=0.98):
    """
    Combines magnetometer and gyroscope yaw estimates using a complementary filter.
    alpha is the weight for the gyroscope data.
    """
    filtered_yaw = np.zeros_like(mag_yaw)
    filtered_yaw[0] = mag_yaw[0]
    for i in range(1, len(mag_yaw)):
        filtered_yaw[i] = alpha * (filtered_yaw[i-1] + gyro_yaw[i] - gyro_yaw[i-1]) + (1 - alpha) * mag_yaw[i]
    return filtered_yaw

# Low-pass filter for magnetometer yaw
def low_pass_filter(data, cutoff, fs, order=5):
    nyq = 0.5 * fs
    normal_cutoff = cutoff / nyq
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    y = lfilter(b, a, data)
    return y

# High-pass filter for gyro yaw
def high_pass_filter(data, cutoff, fs, order=5):
    nyq = 0.5 * fs
    normal_cutoff = cutoff / nyq
    b, a = butter(order, normal_cutoff, btype='high', analog=False)
    y = lfilter(b, a, data)
    return y

fs = 1 / dt  # Sampling frequency
cutoff = 0.1  # Cutoff frequency for filters

# Ensure yaw_calibrated and gyro_yaw are aligned and have the same length
common_index = imu_driving.index.intersection(mag_driving_calibrated.index)
yaw_calibrated_aligned = np.interp(imu_driving.index, common_index, yaw_calibrated)
gyro_yaw_aligned = np.interp(imu_driving.index, imu_driving.index, np.cumsum(gyro_z) * dt)


mag_yaw_lp = low_pass_filter(yaw_calibrated_aligned, cutoff, fs)
gyro_yaw_hp = high_pass_filter(gyro_yaw_aligned, cutoff, fs)

# Combine with complementary filter
comp_filter_yaw = complementary_filter(mag_yaw_lp, gyro_yaw_hp)

# Plot 3: Filter outputs
fig3 = go.Figure()
fig3.add_trace(go.Scatter(x=imu_driving.index, y=np.rad2deg(mag_yaw_lp), mode='lines', name='Low-pass Mag Yaw'))
fig3.add_trace(go.Scatter(x=imu_driving.index, y=np.rad2deg(gyro_yaw_hp), mode='lines', name='High-pass Gyro Yaw'))
fig3.add_trace(go.Scatter(x=imu_driving.index, y=np.rad2deg(comp_filter_yaw), mode='lines', name='Complementary Filter Yaw'))
fig3.update_layout(title='Plot 3: Filter Outputs for Yaw Estimation', xaxis_title='Time', yaxis_title='Yaw (degrees)')
fig3.show()

## Step 3: Forward Velocity Estimation (Plot 4 & 5)

In [None]:
# Forward velocity from accelerometer
accel_x = imu_driving['accel_x'].dropna()

# Remove gravity component (assuming IMU is mostly flat)
accel_x_no_gravity = accel_x - accel_x.mean()

# Integrate to get velocity
velocity_from_accel = np.cumsum(accel_x_no_gravity) * dt

# Plot 4: Forward velocity from accelerometer
fig4 = go.Figure()
fig4.add_trace(go.Scatter(x=imu_driving.index, y=velocity_from_accel, mode='lines', name='Velocity from Accelerometer'))
fig4.update_layout(title='Plot 4: Forward Velocity from Accelerometer', xaxis_title='Time', yaxis_title='Velocity (m/s)')
fig4.show()

# Velocity from GPS
utm_e = gps_driving['utm_e']
utm_n = gps_driving['utm_n']
gps_time = gps_driving.index

dx = utm_e.diff()
dy = utm_n.diff()
dt_gps = gps_time.to_series().diff().dt.total_seconds()

velocity_from_gps = np.sqrt(dx**2 + dy**2) / dt_gps

# Plot 5: Forward velocity from GPS
fig5 = go.Figure()
fig5.add_trace(go.Scatter(x=gps_driving.index[1:], y=velocity_from_gps.dropna(), mode='lines', name='Velocity from GPS'))
fig5.update_layout(title='Plot 5: Forward Velocity from GPS', xaxis_title='Time', yaxis_title='Velocity (m/s)')
fig5.show()

## Step 4: Inertial Odometry and Trajectory Comparison (Plot 6)

In [None]:
# Align velocity and yaw data
common_time = imu_driving.index
vel_aligned = np.interp(common_time, imu_driving.index, velocity_from_accel)
yaw_aligned = np.interp(common_time, common_time, comp_filter_yaw) # Using complementary filter yaw

# Rotate velocity into ENU frame
vx = vel_aligned * np.cos(yaw_aligned)
vy = vel_aligned * np.sin(yaw_aligned)

# Integrate to get position
x_imu = np.cumsum(vx) * dt
y_imu = np.cumsum(vy) * dt

# Align GPS data for plotting
x_gps = gps_driving['utm_e'] - gps_driving['utm_e'].iloc[0]
y_gps = gps_driving['utm_n'] - gps_driving['utm_n'].iloc[0]

# Plot 6: Trajectory comparison
fig6 = go.Figure()
fig6.add_trace(go.Scatter(x=x_imu, y=y_imu, mode='lines', name='IMU Trajectory'))
fig6.add_trace(go.Scatter(x=x_gps, y=y_gps, mode='lines', name='GPS Trajectory'))
fig6.update_layout(title='Plot 6: Estimated Trajectory from IMU vs. GPS',
                   xaxis_title='East (m)', yaxis_title='North (m)',
                   legend_title='Source')
fig6.update_yaxes(scaleanchor = "x", scaleratio = 1)
fig6.show()

## Questions and Answers

**Q0: How did you calibrate the magnetometer from the data you collected? What were the sources of distortion present, and how do you know?**

*Your Answer Here*

**Q1: How did you use a complementary filter to develop a combined estimate of yaw? What components of the filter were present, and what cutoff frequency(ies) did you use?**

*Your Answer Here*

**Q2: Which estimate or estimates for yaw would you trust for navigation? Why?**

*Your Answer Here*

**Q3: What adjustments did you make to the forward velocity estimate, and why?**

*Your Answer Here*

**Q4: What discrepancies are present in the velocity estimate between accel and GPS. Why?**

*Your Answer Here*

**Q5: Compute $\omega x'$ and compare it to $y''_{obs}$. How well do they agree? If there is a difference, what is it due to?**

*Your Answer Here*

**Q6: Estimate the trajectory of the vehicle (xe,xn) from inertial data and compare with GPS. (adjust heading so that the first straight line from both are oriented in the same direction). Report any scaling factor used for comparing the tracks.**

*Your Answer Here*

**Q7: For what period of time did your GPS and IMU estimates of position match closely? (within 2 m) Given this performance, how long do you think your navigation approach could work without another position fix?**

*Your Answer Here*