In [123]:
# NOT WORKING; Unscented Kalman Filter Code Prototype for 1D use on EXCLAIM Project with multiple position sensors whose time rates are different, code by Joaquin Matticoli (joaquinmatticoli@gmail.com)

In [124]:
import numpy as np
from numpy import dot
import filterpy
from filterpy.kalman import UnscentedKalmanFilter as ukf
from filterpy.kalman import MerweScaledSigmaPoints
from filterpy.kalman import update
from numpy.random import randn
#from filterpy.common import Q_continuous_white_noise
import matplotlib.pyplot as plt
import math
from math import sqrt
import cmath
import decimal
from decimal import *

In [125]:
from collections import namedtuple
gaussian = namedtuple('Gaussian', ['mean', 'var'])
gaussian.__repr__ = lambda s: '𝒩(μ={:.3f}, 𝜎²={:.3f})'.format(s[0], s[1]) # Not necessary, lets the creation of gaussian representation N(mean, variance)

np.random.seed(24) # remove for different random numbers each time

def print_gh(t, predict, update, z1, z2, K): # Prediction + Sensor 1 and 2
    predict_template = '{:.3f}    {:.3f}    {:.3f}'
    update_template = '   {:.3f}    {:.3f}     {:.3f}     {:.3f}    {:.3f}'

    print(predict_template.format(t, predict[0], predict[1]),end=' ')
    print(update_template.format(z1, z2, update[0], update[1], K))
    
def print_gh1(t, predict, update, z1, K): # Prediction + Sensor 1 only
    predict_template = '{:.3f}    {:.3f}    {:.3f}'
    update_template = '   {:.3f}    -----     {:.3f}     {:.3f}    {:.3f}'

    print(predict_template.format(t, predict[0], predict[1]),end=' ')
    print(update_template.format(z1, update[0], update[1], K))
    
def print_gh2(t, predict, update, z2, K): # Prediction + Sensor 2 only
    predict_template = '{:.3f}    {:.3f}    {:.3f}'
    update_template = '   -----    {:.3f}     {:.3f}     {:.3f}    {:.3f}'

    print(predict_template.format(t, predict[0], predict[1]),end=' ')
    print(update_template.format(z2, update[0], update[1], K))

In [126]:
class PendulumPayloadSimulation(object): #Pendulum example, no torque because it would be too hard to calculate
    def __init__(self, x0=0., measurement1_var=0.0, 
                 measurement2_var=0.0, process_var=0.0,
                 t_0=0., amp=1.0, freq=1.0, phase=0.0):
        """ x0 : initial orientation
            angular_velocity: (+=counterclockwise, -=clockwise)
            measurement_var: variances in measurement deg^2
            process_var: variance in process (rad/s)^2
            amp: amplitude
            freq: frequency
            phase: phase angle
        """
        self.x = x0
        self.meas1_std = sqrt(measurement1_var)
        self.meas2_std = sqrt(measurement2_var)
        self.process_std = sqrt(process_var)
        self.amp = math.radians(amp)
        self.freq = math.radians(freq)
        self.phase = math.radians(phase)

    def move(self, dt):
        """Compute new orientation of the payload in radians."""
        t = t_0 + dt
        self.x = self.amp*math.sin((self.freq*t-self.phase)) + randn()*self.process_std
        self.xdot = self.freq*self.amp*math.cos((self.freq*t-self.phase)) + randn()*self.process_std

    def sense_position(self):
        """ Returns measurement of new orientation in radians."""
        measurement = self.x + randn()*self.meas1_std
        return measurement
    
    def sense_velocity(self):
        """ Returns measurement of new angular velocity in radians/sec."""
        measurement2 = self.xdot + randn()*self.meas2_std
        return measurement2

    def move_and_sense(self, dt=1.0):
        """ Move payload, and return measurement of new orientation in radians"""
        self.move(dt=dt)
        return self.sense_position()

In [127]:
printcase = 0 # If set to 1 it will print out the results
plotcase = 0 # If set to 1 it will plot results

# -----------USER INPUT STARTS------------------------------

number_of_variables = 2 #number of variables being measured
number_of_sensors = 1 #number of sensors used
process_var = math.radians(5.) # variance in the payload's movement, idk what number is good for this??
sensor1_var = math.radians(2.) # variance in the position sensor (magnetometer)
sensor2_var = math.radians(0.5) # variance in velocity sensor (gyroscope)
x = gaussian(math.radians(0.), math.radians(100.))  # payload's initial orientation in rad
spectral_density = 3e-6 #Spectral density of the measurement noise, number taken from Giuseppe for gyro
dt1 = 0.5 # time step in seconds for first sensor
dt2 = 0.3 # time step in seconds for second sensor
torque = 10 #N*m

initial_angular_velocity = 0. # initial angular velocity in rad/s???
total_time = 10. # Total time passed from beginning to end of experiment
t = 0. #initial time in seconds
amplitude = 30 # max theta, degrees
frequency = 1 # angular frequency, deg/s
phase_angle = 0 # degrees

# -----------USER INPUT ENDS--------------------------------

# I = 3900 # Moment of inertia around azimuth for mission in kg * m^2
L = 0.5 # length of pendulum, meters
m = 1 # mass, kg
g = 9.807 # gravity, m/s/s
process_model = gaussian(initial_angular_velocity, process_var) # displacement to add to x
t_0 = t
smallest_time = min(dt1, dt2) # The smaller time increment
n_iter = math.floor(total_time/smallest_time)
dt = smallest_time #initialize dt, will be modified constantly in later equations
    
# Initializes payload simulation and gets measurements
payload = PendulumPayloadSimulation(
    x0=x.mean,
    measurement1_var=sensor1_var, 
    measurement2_var=sensor2_var,
    process_var=process_model.var,
    t_0=t,
    amp=amplitude,
    freq=frequency,
    phase=phase_angle)

def remain(x1, x2): #produces correct remainder, tries to solve roundoff error, % function does not work properly for decimals
    w = Decimal(str(x1))
    q = Decimal(str(x2))
    ans =  w % q
    return float(ans)
zs, zs2, ztrue = [], [], []
check = 0
while t <= total_time: # Runs payload simulation
    t = round(t,1)
    r1 = remain(t, dt1)
    r2 = remain(t, dt2)
    if r1 == 0. and r2 == 0.: #takes measurement of both sensors but only moves once
        zs.append(payload.move_and_sense(dt=dt1))
        zs2.append(payload.sense_velocity())
#         print('Here! Time is: %.2f' % t) #check
        t = t + smallest_time
    elif r1 < smallest_time and check == 0: #Sensor 1 is taking a measurement
        t = t - r1
        zs.append(payload.move_and_sense(dt=dt1))
#         print('Here1 Time is: %.2f' % t) #check
        t = t + r1
        check = 1
    else: #Sensor 2 is taking a measurement
        zs2.append(payload.sense_velocity())
#         print('Here2 Time is: %.2f' % t) #check
        t = t + smallest_time
        check = 0
    ztrue.append(payload.x)
finalposition = payload.x #Records final position for table text
    
def Hx(x): #Our measurements are linear(?) so we don't need this
    return np.dot(f.H,f.x)

def Fx(x, dt): #Performs same thing as llinear KF prediction step does; Not sure if correct
    F = np.array([[1,dt],[0,1]])
    return dot(F,x)

def state_mean(sigmas, Wm): #Copied this, not sure if correct. Supposed to take average of angles correctly
    x = np.zeros(3)

    sum_sin = np.sum(dot(np.sin(sigmas[:, 1]), Wm))
    sum_cos = np.sum(dot(np.cos(sigmas[:, 1]), Wm))
    x[0] = np.sum(dot(sigmas[:, 0], Wm))
    x[1] = np.sum(dot(sigmas[:, 1], Wm))
    x[2] = math.atan2(sum_sin, sum_cos)
    return x

def z_mean(sigmas, Wm): #Copied this, not sure if correct. Supposed to take average of angles correctly
    z_count = sigmas.shape[1]
    x = np.zeros(z_count)

    for z in range(0, z_count, 2):
        sum_sin = np.sum(dot(math.sin(sigmas[:, z+1]), Wm))
        sum_cos = np.sum(dot(math.cos(sigmas[:, z+1]), Wm))

        x[z] = np.sum(dot(sigmas[:,z], Wm))
        x[z+1] = math.atan2(sum_sin, sum_cos)
    return x

In [128]:
def plotstuff():
    plt.figure() #Plots KF estimate vs true value
    plt.plot(np.linspace(0,total_time,len(zs)),zs,'k+',label='Sensor 1 Noisy Position Measurements')
#     plt.plot(np.linspace(0,total_time,len(zs2)),zs2,'r*',label='Sensor 2 Noisy Velocity Measurements')
    plt.plot(np.linspace(0,total_time,len(xprior)),xprior,'r*',label='A Priori Estimate')
    plt.plot(np.linspace(0,total_time,len(xhat)),xhat,'b-',label='A Posteriori Estimate')
    plt.plot(np.linspace(0,total_time,len(ztrue)),ztrue,color='g',label='True Value')
    plt.legend()
    plt.title('Estimate vs. Iteration Step', fontweight='bold')
    plt.xlabel('Time (sec)')
    plt.ylabel('Orientation (Radians)')
    
    plt.figure() # Plots variance over time
    valid_iter = range(1,n_iter) # Phat not valid at step 0
    plt.plot(valid_iter,Phat[1:n_iter],label='A Posteri Error Estimate')
    plt.title('Estimated $\it{\mathbf{a \ priori}}$ Error vs. Iteration Step', fontweight='bold')
    plt.xlabel('Iteration')
    plt.ylabel('$(Radians)^2$')
    #plt.setp(plt.gca(),'ylim',[0,.01])
    plt.show()

Phat = [] #Variance after update
xhat = [] #Estimate after update
xprior = [] #Prediction before update
counter = 0
if printcase == 1: #Sets up title for table
    print('PREDICT                        UPDATE')
    print('  t        x       var       z1       z2        x       var       K')

# perform Kalman filter on measurements zs1 and zs2
points1 = MerweScaledSigmaPoints(n=number_of_variables, alpha=.00001, beta=2, kappa=(3-number_of_variables)) # Initializes Unscented Kalman filter function; Not sure what to put for alpha?
f = ukf(dim_x=number_of_variables, dim_z=number_of_sensors, fx=Fx, hx=Hx, dt=dt, points=points1, x_mean_fn=state_mean, z_mean_fn=z_mean)
u = torque # Eventually this won't be a constant and will change over iterations
B = 1/(m*L**2)
f.x = np.array([x.mean, process_model.mean])
f.Q *= np.array([[(dt**3)/3, (dt**2)/2],
                 [(dt**2)/2,  dt      ]]) *spectral_density # This is the same as running the Q_continuous_white_noise
f.P = np.diag([x.var,process_var]) #Variance

counter1 = 0
counter2 = 0
t = t_0
t_old = t_0
check = 0
# print('dt1 = %.2f' % dt1, '   dt2 = %.2f' % dt2) # Check
while t <= total_time: #Runs Unscented Kalman filter through each iteration
    t = round(t,1)
    r1 = remain(t, dt1)
    r2 = remain(t, dt2)
    if r1 == 0. and r2 == 0.: #Both sensors are taking measurements, perform two updates, one with each sensor is this right?????????????????
        dt = t - t_old
        f.predict(dt=dt)
        f.R[0,0] = sensor1_var #Taking average of variances
        f.H = np.array([[1., 0.]])
        f.update(zs[counter1])
        f.R[0,0] = sensor2_var
        f.H = np.array([[0., 1.]])
        f.update(zs2[counter2])
#         print('time: %.2f ' % t, 'r1 = %.2f' % r1, 'r2 = %.2f' % r2, 'both match!') # Check
        t_old = t
        t = t + smallest_time
        prior = gaussian(f.x_prior[0,0], f.P_prior[0,0])
        x = gaussian(f.x[0,0], f.P[0,0])   
        if printcase == 1:
            print_gh(t_old, prior, x, zs[counter1], zs2[counter2], f.K[0,0])
        counter1 = counter1 + 1
        counter2 = counter2 + 1
    elif r1 < smallest_time and check == 0: #Sensor 1 is taking a measurement
        t = t - r1
        dt = t - t_old
        f.predict(dt=dt)
        f.H = np.array([[1., 0.]])
        f.R[0,0] = sensor1_var
        f.update(zs[counter1])
#         print('time: %.2f ' % t, 'r1 = %.2f' % r1, 'r2 = %.2f' % r2) # Check
        t_old = t
        t = t + r1
        check = 1
        prior = gaussian(f.x_prior[0,0], f.P_prior[0,0])
        x = gaussian(f.x[0,0], f.P[0,0])   
        if printcase == 1:
            print_gh1(t_old, prior, x, zs[counter1], f.K[0,0])
        counter1 = counter1 + 1
    else: #Sensor 2 is taking a measurement
        dt = t - t_old
        f.predict(dt=dt)
        f.H = np.array([[0., 1.]])
        f.R[0,0] = sensor2_var
        f.update(zs2[counter2])
#         print('time: %.2f ' % t, 'r1 = %.2f' % r1, 'r2 = %.2f' % r2) # Check
        t_old = t
        t = t + smallest_time
        check = 0
        prior = gaussian(f.x_prior[0,0], f.P_prior[0,0])
        x = gaussian(f.x[0,0], f.P[0,0])
        if printcase == 1:
            print_gh2(t_old, prior, x, zs2[counter2], f.K[0,0])
        counter2 = counter2 + 1
    if plotcase == 1:
#         x1.append(prior.mean)
        xprior.append(f.x_prior[0,0])
        xhat.append(x.mean)
        Phat.append(x.var)

if printcase == 1:
    print()
    print('Final estimate:           {:10.3f}'.format(x.mean))
    print('Actual final orientation: {:10.3f}'.format(payload.x))
if plotcase == 1:
    print()
    plotstuff()

ValueError: operands could not be broadcast together with shapes (5,2) (1,3) 