In [1]:
using LinearAlgebra
using Plots
using MultivariateStats, StatsPlots
using Distributions
using DelimitedFiles
using DataFrames
using CSV
using ProgressMeter

# Unscented Kalman Filtering on a Kinematic Bicycle

## Helper Functions

In [2]:
# f, Kinematic bicycle with teleporting steering
function dynamics(x, u, params)
    v, l = params
    px, py, θ = x
    δ = u[1]
    ẋ = [v*cos(θ);
         v*sin(θ);
         v*tan(δ) / l]
    return ẋ
end

dynamics (generic function with 1 method)

In [3]:
# Approximately integrate dynamics over a timestep dt to get a discrete update function
function rk4(xₖ, uₖ, params, dt)
    k1 = dynamics(xₖ, uₖ, params)
    k2 = dynamics(xₖ + k1*dt/2, uₖ, params)
    k3 = dynamics(xₖ + k2*dt/2, uₖ, params)
    k4 = dynamics(xₖ + k3*dt, uₖ, params)

    xₖ₊₁ = xₖ + dt * (k1 + 2k2 + 2k3 + k4) / 6
    return xₖ₊₁
end

rk4 (generic function with 1 method)

In [4]:
# Given a mean and covariance in N-dimensional space, generate 2N+1 weighted points
# with the given weighted mean and weighted covariance
function generate_sigma_points(x̂, Σ)
    Nx = length(x̂)
    A = sqrt(Σ)
    σ = [zeros(Nx) for j in 1:(2Nx+1)]
    W = zeros(2Nx+1)
    W[1] = 1/3

    σ[1] .= x̂

     for j in 1:Nx
        σ[1+j] .= @. x̂ + √(Nx / (1 - W[1])) * A[:,j] 
     end
 
     for j in 1:Nx
        σ[1+Nx+j] .= @. x̂ - √(Nx / (1 - W[1])) * A[:,j] 
     end
 
    W[2:end] .= (1- W[1])/(2Nx)
    
    return σ, W
end

generate_sigma_points (generic function with 1 method)

In [5]:
# maps vector in state space to vector in measurement space
# g, "GPS" measurement of positions
function measurement(x)
    y = x[1:2]
    return y
end

measurement (generic function with 1 method)

In [6]:
function accuracy_to_gps_noise(accuracy)
  accuracy /= 1000.0
  σ = (accuracy / (0.848867684498)) * (accuracy / (0.848867684498));
  return [σ 0; 
          0 σ];
end

accuracy_to_gps_noise (generic function with 1 method)

## Predict and Update

In [7]:
# Given a state estimate and covariance, apply nonlinear dynamics over dt to sigma points
# and calculate a new state estimate and covariance
function ukf_predict(x̂ₖ, Σₖ, Q, uₖ, dt, params)
    Nx = length(x̂ₖ)
    σ, W = generate_sigma_points(x̂ₖ, Σₖ)

    for k in 1:lastindex(σ)
        σ[k] .= rk4(σ[k], uₖ, params, dt)
    end 

    x̂ₖ₊₁ = zeros(Nx)
    Σₖ₊₁ = zeros(Nx, Nx)

    for k in 1:lastindex(σ)
        x̂ₖ₊₁ .+= W[k].* σ[k]
    end
    for k in 1:lastindex(σ)
        Σₖ₊₁ .+= W[k].* ((σ[k] - x̂ₖ₊₁) * (σ[k] - x̂ₖ₊₁)')
    end

    Σₖ₊₁ .+= Q

    return x̂ₖ₊₁, Σₖ₊₁
end

ukf_predict (generic function with 1 method)

In [8]:
# Given a state estimate, covariance of the state estimate, measurement and covariance of the measurement,
# apply the measurement function to sigma points,
# calculate the mean and covariance in measurement space, then use this to calculate the Kalman gain, 
# then use the gain and measurement to calculate the updated state estimate and covariance.
function ukf_update(x̂, Σ, y, R)
    Nx = length(x̂)
    Ny = length(y)
    σ, W = generate_sigma_points(x̂, Σ)
    z = [zeros(Ny) for _ in σ]

    for j in 1:lastindex(σ)
        z[j] = measurement(σ[j])
    end 

    ẑ = zeros(Ny)
    S = zeros(Ny, Ny)
    Cxz = zeros(Nx, Ny)

    for k in 1:lastindex(z)
        ẑ .+= W[k].* z[k]
    end
    for k in 1:lastindex(z)
        S .+= W[k].* (z[k] - ẑ) * (z[k] - ẑ)'
        Cxz .+= W[k] .* (σ[k] - x̂) * (z[k] - ẑ)'
    end
    S .+= R

    K = Cxz * inv(S)

    x̂ₖ₊₁ = x̂ + K * (y - ẑ)
    Σₖ₊₁ = Σ - K * S * K'
    return x̂ₖ₊₁, Σₖ₊₁
end

ukf_update (generic function with 1 method)

# Testing
using the filter on a simulated scenario

## Defining properties of the scenario
system parameters and covariances

In [9]:
# System parameters

l = 1.0 # Wheelbase, (m)
v = 0.0 # Velocity, (m/s)
params = v, l

(0.0, 1.0)

In [10]:
# Covariances. Remember, these are in squared units. To get an idea of the 
# corresponding 95% confidence interval, take the square root and multiply by ~2.

# Σ0 = diagm(ones(3)) # Initial estimate covariance for the filter
# Σ0 = diagm([1.5, 1.5, 1.5]) 
# Σ0 = diagm([3, 3, 3]) 
# Σ0 = zeros(3)
Σ0 = diagm([1e-4; 1e-4; 1e-6]) 

R = diagm([1e-2, 1e-2]) # Sensor covariances (m^2, m^2)
Q = diagm([1e-4; 1e-4; 1e-6]) # Process covariances (m^2, m^2, rad^2) 
                              # ^ the process covariances are timestep size dependent

3×3 Matrix{Float64}:
 0.0001  0.0     0.0
 0.0     0.0001  0.0
 0.0     0.0     1.0e-6

## Loading Measurement and Input Data
gps, encoder, steering

In [11]:
gps_df      = DataFrame(CSV.File("nand-logs/nand-lgap-gps.csv"))
encoder_df  = DataFrame(CSV.File("nand-logs/nand-lgap-encoder.csv"))
steering_df = DataFrame(CSV.File("nand-logs/nand-lgap-steering.csv"))

gps_df[!, "timestamp"] = gps_df[!, "timestamp"] ./ 1000.0           
encoder_df[!, "timestamp"] = encoder_df[!, "timestamp"] ./ 1000.0   
steering_df[!, "timestamp"] = steering_df[!, "timestamp"] ./ 1000.0 


40756-element Vector{Float64}:
 122.55
 122.563
 122.574
 122.585
 122.596
 122.607
 122.618
 122.629
 122.64
 122.651
   ⋮
 580.738
 580.749
 580.76
 580.771
 580.782
 580.793
 580.804
 580.816
 580.827

## Running the UKF

In [12]:
# Simulate a non-deterministic process with process covariance Q and measurement covariance R,
# and with an Unscented Kalman Filter applied
function simulate_uk_filter(Q, R)
  gps_row = 1
  encoder_row = 1
  steering_row = 1

  x̂ = [gps_df[1, "pos_x"]; gps_df[1, "pos_y"]; 0.0]
  Σ = diagm([1.0; 1.0; 1.0])

  Xhat = []
  Sigma = []

  v = 0.0

  last_predict_timestamp = 277.757 #steering_df[1, "timestamp"]

  p = Progress(trunc(Int, round(gps_df[end, "timestamp"]* 1000)))

  try
  while (gps_row <= size(gps_df, 1) && encoder_row <= size(encoder_df, 1) && steering_row <= size(steering_df, 1))
    
    if (last_predict_timestamp > gps_df[gps_row, "timestamp"] && last_predict_timestamp > encoder_df[encoder_row, "timestamp"] && last_predict_timestamp > steering_df[steering_row, "timestamp"])
      continue
    end

    if (gps_df[gps_row, "timestamp"] < encoder_df[encoder_row, "timestamp"] && gps_df[gps_row, "timestamp"] < steering_df[steering_row, "timestamp"]) 
      # gps is the next timestamp
      # set the new gps noise
      # predict using most recent steering and velocity
      # then update using the current gps noise
      dt = gps_df[gps_row, "timestamp"] - last_predict_timestamp;

      R = accuracy_to_gps_noise(gps_df[gps_row, "accuracy"])

      x̂, Σ = ukf_predict(x̂, Σ, Q, deg2rad(steering_df[steering_row, "steering"]), dt, (l, v))

      x̂, Σ = ukf_update(x̂, Σ, [gps_df[gps_row, "pos_x"]; gps_df[gps_row, "pos_y"]], R)

      last_predict_timestamp = gps_df[gps_row, "timestamp"]
      gps_row += 1
    elseif (encoder_df[encoder_row, "timestamp"] < steering_df[steering_row, "timestamp"])
      # encoder is the next timestamp
      # set the new speed stored by instance of UKF
      # predict using most recent steering and new speed
      dt = encoder_df[encoder_row, "timestamp"] - last_predict_timestamp;

      v = encoder_df[encoder_row, "speed"]
      x̂, Σ = ukf_predict(x̂, Σ, Q, deg2rad(steering_df[steering_row, "steering"]), dt, (l, v))

      last_predict_timestamp = encoder_df[encoder_row, "timestamp"]
      encoder_row += 1
    else 
      # steering is the next timestamp
      # predict using this new steering
      dt = steering_df[steering_row, "timestamp"] - last_predict_timestamp;

      x̂, Σ = ukf_predict(x̂, Σ, Q, deg2rad(steering_df[steering_row, "steering"]), dt, (l, v))

      last_predict_timestamp = steering_df[steering_row, "timestamp"]
      steering_row += 1
    end

    update!(p, trunc(Int, last_predict_timestamp*1000))
    flush(stdout)
    push!(Xhat, x̂)
    push!(Sigma, Σ)
  end

catch e
  println(last_predict_timestamp)
  println(x̂)
  println(Σ)
  flush(stdout)
end

  return Xhat, Sigma
end

simulate_uk_filter (generic function with 1 method)

In [13]:
Xhat, Sigma = simulate_uk_filter(Q, R)

## Visualizations

In [None]:
# Let's see how the filter does on position estimation

plot(title="Trajectory Filtering", xlabel="X (m)", ylabel="Y (m)", aspect_ratio=:equal)
plot!(gps_df[:, "pos_x"], gps_df[:, "pos_y"], label="Measured", linewidth=0.5)
plot!([x[1] for x in Xhat], [x[2] for x in Xhat], label="Filtered")

In [None]:
# Remember, we aren't measuring the heading, only a noisy position, 
# but the filter is able to use this information to converge to
# a good estimate of the heading over time.

plot(title="Heading Filtering", xlabel="Time (s)", ylabel="Heading (rad)")
plot!(t, [x[3] for x in X], label="True")
plot!(t, [x[3] for x in Xhat], label="Filtered", color="green")

In [None]:
# And let's see how the filter's confidence in its estimate evolves over time

plot(title="Estimate Standard Deviations", xlabel="Time(s)", ylabel="σᵢᵢ", yscale=:log10, ylim=(1e-3, 1))
plot!(t, [√(σ²[1,1]) for σ² in Σs], label="X Position (m)")
plot!(t, [√(σ²[2,2]) for σ² in Σs], label="Y Position (m)")
plot!(t, [√(σ²[3,3]) for σ² in Σs], label="Heading (rad)")