In [1]:
from RTINS import *
from Init_det_glrt import Init_det_glrt
from func_loaddataset import func_loaddataset
from utils import *

### ZUPT alogrithm

In [2]:
import numpy as np
import matplotlib.pyplot as plt

from geometrys import q2dcm, dcm2q, Rt2b

class INS:
    def __init__(self, simdata):
        self.simdata = simdata
        self.initial_state = None
        self.init_xh = None
        self.init_quat = None
        self.init_cov = None
        self.init_P = None
        Q, R, H = self.init_QRH()
        self.Q = Q
        self.R = R
        self.H = H

        if self.simdata['detector_prio'] == 'normalized velocity':
            self.c1, self.c2, self.c3 = -1e2 * self.simdata['Window_size'] / 2, -5e4 * self.simdata['Window_size'] / 2, 100
        else:
            self.c1, self.c2, self.c3 = -1e2 * self.simdata['Window_size'] / 2, -5e4 * self.simdata['Window_size'] / 2, 0

    def init_QRH(self):
        Q = np.zeros((6, 6))
        H = np.zeros((3, 9))
        Q[0:3, 0:3] = np.diag(self.simdata['sigma_acc'].ravel() ** 2)
        Q[3:6, 3:6] = np.diag(self.simdata['sigma_gyro'].ravel() ** 2)
        H[0:3, 3:6] = np.eye(3)
        R = np.diag(self.simdata['sigma_vel'] ** 2)
        return Q, R, H
    
    def init_P_value(self, init_P=None):
        if init_P is not None:
            return init_P
        P = np.zeros((9, 9))
        P[0:3, 0:3] = np.diag(self.simdata['sigma_initial_pos'].ravel() ** 2)
        P[3:6, 3:6] = np.diag(self.simdata['sigma_initial_vel'].ravel() ** 2)
        P[6:9, 6:9] = np.diag(self.simdata['sigma_initial_att'].ravel() ** 2)
        return P
    
    def init_state(self, imu_data, P, init_xh, init_quat):
        N = len(imu_data[0])
        cov = np.zeros((9, N))
        x_h = np.zeros((9, N))
        cov[:, 0] = np.diag(P)
        if init_xh is None and init_quat is None:
            x_h[:, 0], quat = self.init_Nav_eq(imu_data)
        else:
            x_h[:, 0], quat = init_xh, init_quat
        return x_h, quat, cov
    
    # only use when know init_state form the first batch
    def baseline3(self, imu, zupt, logL, adpt_flag, init_state=None, init_quat=None, init_P=None):
        N = len(imu[0])
        cov = np.zeros((9,N))
        x_h = np.zeros((9,N))
        cov[:, 0] = np.diag(init_P)
        x_h[:, 0] = init_state
        quat = init_quat
        Id = np.eye(init_P.shape[0])
        P = init_P

        if adpt_flag:
            zupt = np.zeros((1, len(logL[0]) + 5), dtype=bool)
            delta_t = 0
            gamma = np.zeros(len(logL[0]))

        for k in range(1, N):
            u_h = self.comp_imu_errors(imu[:, k], x_h[:, k-1])  # Computation of IMU errors
            x_h[:, k], quat = self.Navigation_equations(x_h[:, k-1], u_h, quat)  # Navigation equations
            F, G = self.state_matrix(quat, u_h)  # State matrix calculation

            P = F @ P @ F.T + G @ self.Q @ G.T
            P = (P + P.T) / 2
            cov[:, k] = np.diag(P)

            if adpt_flag:
                S = P[3:6, 3:6]
                v = x_h[3:6, k]

                gamma[k] = self.c1 + self.c2 * delta_t + self.c3 * (v.T @ np.linalg.inv(S) @ v)
                if logL[0][k] > gamma[k]:
                    zupt[0][k:k + self.simdata['Window_size']] = True
                    delta_t = 0
                else:
                    delta_t += self.simdata['Ts']

            if zupt[0][k]:
                K = P @ self.H.T @ np.linalg.inv(self.H @ P @ self.H.T + self.R)
                z = -x_h[3:6, k]
                dx = K @ z
                x_h[:, k], quat = self.comp_internal_states(x_h[:, k], dx, quat)
                P = (Id - K @ self.H) @ P
                P = (P + P.T) / 2
                cov[:, k] = np.diag(P)

        return x_h, cov, quat, P

    def baseline2(self, imu, zupt, logL, adpt_flag, init_state=None, init_quat=None, init_P=None):
        N = len(imu[0])
        P = self.init_P_value(init_P=init_P)
        x_h, quat, cov = self.init_state(imu, P, init_state, init_quat)
        Id = np.eye(P.shape[0])
        if adpt_flag:
            zupt = np.zeros((1, len(logL[0]) + 5), dtype=bool)
            delta_t = 0
            gamma = np.zeros(len(logL[0]))
            if self.simdata['detector_prio'] == 'normalized velocity':
                c1, c2, c3 = -1e2 * self.simdata['Window_size'] / 2, -5e4 * self.simdata['Window_size'] / 2, 100
            else:
                c1, c2, c3 = -1e2 * self.simdata['Window_size'] / 2, -5e4 * self.simdata['Window_size'] / 2, 0


        for k in range(1, N):
            u_h = self.comp_imu_errors(imu[:, k], x_h[:, k-1])  # Computation of IMU errors
            x_h[:, k], quat = self.Navigation_equations(x_h[:, k-1], u_h, quat)  # Navigation equations
            F, G = self.state_matrix(quat, u_h)  # State matrix calculation

            P = F @ P @ F.T + G @ self.Q @ G.T
            P = (P + P.T) / 2
            cov[:, k] = np.diag(P)

            if adpt_flag:
                S = P[3:6, 3:6]
                v = x_h[3:6, k]

                gamma[k] = c1 + c2 * delta_t + c3 * (v.T @ np.linalg.inv(S) @ v)
                if logL[0][k] > gamma[k]:
                    zupt[0][k:k + self.simdata['Window_size']] = True
                    delta_t = 0
                else:
                    delta_t += self.simdata['Ts']

            if zupt[0][k]:
                K = P @ self.H.T @ np.linalg.inv(self.H @ P @ self.H.T + self.R)
                z = -x_h[3:6, k]
                dx = K @ z
                x_h[:, k], quat = self.comp_internal_states(x_h[:, k], dx, quat)
                P = (Id - K @ self.H) @ P
                P = (P + P.T) / 2
                cov[:, k] = np.diag(P)

        if adpt_flag:
            t = self.simdata['Ts'] * np.arange(N)
            gamma[gamma > -1e2] = -1e1
        return x_h, cov, quat, P


    def detector_adaptive(self, u):
        """
        Wrapper function for running the zero-velocity detection algorithms.
        """
        zupt = np.zeros((1, (len(u[0]))))

        T = self.GLRT(u)

        # Check if the test statistics T are below the detector threshold
        W = self.simdata['Window_size']
        for k in range(len(T)):
            if T[k] < self.simdata['gamma']:
                zupt[0][k:k+W] = 1

        # Fix the edges of the detector statistics
        T = np.concatenate((np.full(int(np.floor(W/2)), max(T)), T, np.full(int(np.floor(W/2)), max(T))))
        
        # Log-likelihood
        logL = -W / 2 * T

        logL = logL[:len(u[0])]

        return zupt, logL.reshape(1,-1)
    
    def GLRT(self, u):
        """
        Function that runs the generalized likelihood test (SHOE detector).
        """
        g = self.simdata['g']
        sigma2_a = self.simdata['sigma_a'] ** 2
        sigma2_g = self.simdata['sigma_g'] ** 2
        W = self.simdata['Window_size']

        N = len(u[0])
        # print(N, '-',W)
        # temp_size = min(N - W + 1,1)
        T = np.zeros(N - W + 1)
        for k in range(N - W + 1):
            ya_m = np.mean(u[0:3, k:k+W], axis=1)
            for l in range(k, k + W):
                tmp = u[0:3, l] - g * ya_m / np.linalg.norm(ya_m)
                T[k] += np.dot(u[3:6, l], u[3:6, l]) / sigma2_g + np.dot(tmp, tmp) / sigma2_a

        T = T / W

        return T


    def baseline(self, u, zupt, logL, adpt_flag, init_state=None, init_quat=None, init_P=None):
        N = len(u[0])
        P, Q, R, H = self.init_filter(init_P=init_P)  # Initialization of filter

        x_h, cov, Id = self.init_vec(N, P)  # Initialization of vectors and matrices
        if init_state is not None:
            x_h[0:9, 0] = init_state
            quat = init_quat
        else:
            x_h[0:9, 0], quat = self.init_Nav_eq(u)  # Initialization of navigation equations

        if adpt_flag:
            zupt = np.zeros((1, len(logL[0]) + 5), dtype=bool)
            delta_t = 0
            gamma = np.zeros(len(logL[0]))
            if self.simdata['detector_prio'] == 'normalized velocity':
                c1, c2, c3 = -1e2 * self.simdata['Window_size'] / 2, -5e4 * self.simdata['Window_size'] / 2, 100
            else:
                c1, c2, c3 = -1e2 * self.simdata['Window_size'] / 2, -5e4 * self.simdata['Window_size'] / 2, 0

        for k in range(1, N):
            u_h = self.comp_imu_errors(u[:, k], x_h[:, k-1])  # Computation of IMU errors
            x_h[:, k], quat = self.Navigation_equations(x_h[:, k-1], u_h, quat)  # Navigation equations
            F, G = self.state_matrix(quat, u_h)  # State matrix calculation

            P = F @ P @ F.T + G @ Q @ G.T
            P = (P + P.T) / 2
            cov[:, k] = np.diag(P)

            if adpt_flag:
                S = P[3:6, 3:6]
                v = x_h[3:6, k]

                gamma[k] = c1 + c2 * delta_t + c3 * (v.T @ np.linalg.inv(S) @ v)
                if logL[0][k] > gamma[k]:
                    zupt[0][k:k + self.simdata['Window_size']] = True
                    delta_t = 0
                else:
                    delta_t += self.simdata['Ts']

            if zupt[0][k]:
                K = P @ H.T @ np.linalg.inv(H @ P @ H.T + R)
                z = -x_h[3:6, k]
                dx = K @ z
                x_h[:, k], quat = self.comp_internal_states(x_h[:, k], dx, quat)
                P = (Id - K @ H) @ P
                P = (P + P.T) / 2
                cov[:, k] = np.diag(P)

        if adpt_flag:
            t = self.simdata['Ts'] * np.arange(N)
            gamma[gamma > -1e2] = -1e1
            
        return x_h, cov, quat, P

    def init_vec(self, N, P): # checked
        cov = np.zeros((9, N))
        x_h = np.zeros((9, N))
        Id = np.eye(P.shape[0])
        cov[:, 0] = np.diag(P)
        return x_h, cov, Id
    
    def init_Nav_eq(self, u): # checked
        f_u = np.mean(u[0, :20])
        f_v = np.mean(u[1, :20])
        f_w = np.mean(u[2, :20])

        roll = np.arctan2(-f_v, -f_w)
        pitch = np.arctan2(f_u, np.sqrt(f_v ** 2 + f_w ** 2))

        attitude = np.array([roll, pitch, self.simdata['init_heading']])
        Rb2t = Rt2b(attitude).T
        quat = dcm2q(Rb2t)

        x = np.zeros(9)
        x[0:3] = self.simdata['init_pos'].flatten()
        x[6:9] = attitude
        return x, quat

    def init_filter(self, init_P=None): # checked
        P = np.zeros((9, 9))
        Q = np.zeros((6, 6))
        H = np.zeros((3, 9))
        
        if init_P is not None:
            P = init_P
        else:
            P[0:3, 0:3] = np.diag(self.simdata['sigma_initial_pos'].ravel() ** 2)
            P[3:6, 3:6] = np.diag(self.simdata['sigma_initial_vel'].ravel() ** 2)
            P[6:9, 6:9] = np.diag(self.simdata['sigma_initial_att'].ravel() ** 2)


        H[0:3, 3:6] = np.eye(3)
        Q[0:3, 0:3] = np.diag(self.simdata['sigma_acc'].ravel() ** 2)
        Q[3:6, 3:6] = np.diag(self.simdata['sigma_gyro'].ravel() ** 2)
        R = np.diag(self.simdata['sigma_vel'] ** 2)

        return P, Q, R, H
    

    def Navigation_equations(self, x, u, q): # checked
        y = np.zeros_like(x)
        Ts = self.simdata['Ts']

        w_tb = u[3:6]
        P = w_tb[0] * Ts
        Q = w_tb[1] * Ts
        R = w_tb[2] * Ts

        OMEGA = np.array([[0, R, -Q, P], [-R, 0, P, Q], [Q, -P, 0, R], [-P, -Q, -R, 0]]) * 0.5

        v = np.linalg.norm(w_tb) * Ts
        if v != 0:
            q = (np.cos(v / 2) * np.eye(4) + 2 / v * np.sin(v / 2) * OMEGA) @ q
            q = q / np.linalg.norm(q)

        Rb2t = q2dcm(q) # checked
        y[6] = np.arctan2(Rb2t[2, 1], Rb2t[2, 2])
        y[7] = -np.arctan(Rb2t[2, 0] / np.sqrt(1 - Rb2t[2, 0] ** 2))
        y[8] = np.arctan2(Rb2t[1, 0], Rb2t[0, 0])

        g_t = np.array([0, 0, self.simdata['g']])
        f_t = Rb2t @ u[0:3]
        acc_t = f_t + g_t

        A = np.eye(6)
        A[0, 3] = Ts
        A[1, 4] = Ts
        A[2, 5] = Ts
        B = np.vstack(((Ts ** 2) / 2 * np.eye(3), Ts * np.eye(3)))

        y[0:6] = A @ x[0:6] + B @ acc_t

        return y, q
    
    def state_matrix(self, q, u): # checked
        Rb2t = q2dcm(q)
        f_t = Rb2t @ u[0:3]
        St = np.array([[0, -f_t[2], f_t[1]], [f_t[2], 0, -f_t[0]], [-f_t[1], f_t[0], 0]])

        O = np.zeros((3, 3))
        I = np.eye(3)
        Fc = np.block([
            [O, I, O],
            [O, O, St],
            [O, O, O]
        ])
        Gc = np.block([
            [O, O],
            [Rb2t, O],
            [O, -Rb2t]
        ])

        F = np.eye(Fc.shape[0]) + (self.simdata['Ts'] * Fc)
        G = self.simdata['Ts'] * Gc
        return F, G
    
    def comp_internal_states(self, x_in, dx, q_in):
        R = q2dcm(q_in)
        x_out = x_in + dx

        epsilon = dx[6:9]
        OMEGA = np.array([[0, -epsilon[2], epsilon[1]], 
                    [epsilon[2], 0, -epsilon[0]], 
                    [-epsilon[1], epsilon[0], 0]])
        R = (np.eye(3) - OMEGA) @ R
        x_out[6] = np.arctan2(R[2, 1], R[2, 2])
        x_out[7] = -np.arctan(R[2, 0] / np.sqrt(1 - R[2, 0] ** 2))
        x_out[8] = np.arctan2(R[1, 0], R[0, 0])
        q_out = dcm2q(R)
        return x_out, q_out
    
    def comp_imu_errors(self, u_in, x_h):
        u_out = u_in
        return u_out

### TEST code

In [3]:
u = func_loaddataset('./data_inert.txt')

In [4]:
u.shape

(6, 11900)

In [5]:
simdata = Init_det_glrt()
# Variables for calculating the performance
D = len(simdata['data_sets'])
Jadapt = 0
Jadapt_all = np.zeros((D, 3))
gamma_vec = np.logspace(4, 6, 20)
M = len(gamma_vec)
ctr = 1
Niter = M
Jfixed = np.zeros(M)
Jfixed_all = np.zeros((M, D, 3))

ins = INS(simdata)

In [6]:
u1 = u[:,:4000]
u2 = u[:,4000:8000]
u3 = u[:,8000:12000]

In [7]:
z1, l1 = ins.detector_adaptive(u1)
z2, l2 = ins.detector_adaptive(u2)
z3, l3 = ins.detector_adaptive(u3)

In [8]:
za, la = ins.detector_adaptive(u[:, :12000])

In [9]:
np.all(np.hstack((z1,z2,z3)) == za)

True

In [10]:
np.all(np.hstack((l1,l2,l3)) == la)

False

In [11]:
zupt , logL = ins.detector_adaptive(u[:, :8000])
x_h, _, _, _ = ins.baseline2(u[:, :8000], zupt, logL, True)

xhs = None
logLs = None
n = 4000
num_batches = len(u[0]) // n
init_x2 = None
init_quat2 = None
init_P2 = None
counter = 0


for i in range(num_batches+1):
    print(i)
    ui = u[:, i*n:(i+1)*n]
    if init_x2 is None:
        zupt2 , logL2 = ins.detector_adaptive(ui)
        x_h2, _, quat2, P2 = ins.baseline2(ui, zupt2, logL2, True) # 9, N
    else:
        print(init_x2)
        zupt2 , logL2 = ins.detector_adaptive(ui)
        x_h2, _, quat2, P2 = ins.baseline3(ui, zupt2, logL2, True, init_x2, init_quat2, init_P2) # 9, N
    init_x2, init_quat2, init_P2 = x_h2[:,-1], quat2, P2
    if xhs is None:
        xhs = x_h2
    else:
        xhs = np.hstack((xhs,x_h2))
    if logLs is None:
        logLs = logL2
    else:
        logLs = np.hstack((logLs,logL2))
    
    if i >= 1:
        break


0
1
[-2.05112452e+00  2.19972028e+01  2.42584563e-01 -2.92523217e-01
 -2.96729391e-01  6.07348701e-01  3.01052880e+00  1.51364361e-02
 -1.26907597e+00]


In [12]:
logL==logLs

array([[ True,  True,  True, ...,  True, False, False]])

In [13]:
x_h[:3, 0] , xhs[:3, 0]

(array([0., 0., 0.]), array([0., 0., 0.]))

In [14]:
x_h[:3, -1] , xhs[:3, -1]

(array([-6.04281723,  2.26387556,  0.06875428]),
 array([-5.53793629,  2.35151431,  0.22561709]))

In [15]:
x_h[:3,n-1] , xhs[:3, n-1]

(array([-2.05112452, 21.99720281,  0.24258456]),
 array([-2.05112452, 21.99720281,  0.24258456]))

In [16]:
zupt.shape , zupt2.shape

((1, 8000), (1, 4000))

In [17]:
np.all(zupt[:, 500:] == zupt2[:, :500])

  np.all(zupt[:, 500:] == zupt2[:, :500])


False

In [18]:
logL.shape , logLs.shape

((1, 8000), (1, 8000))

In [19]:
np.all(logL == logLs)

False

## TEST batch adaptive problem

In [20]:
u1.shape, u2.shape, u3.shape

((6, 4000), (6, 4000), (6, 3900))

In [21]:
T1 = ins.GLRT(u1)
T2 = ins.GLRT(u2)
T3 = ins.GLRT(u3)

In [22]:
Ta = ins.GLRT(u)

In [23]:
Ta.shape

(11896,)

In [24]:
np.hstack((T1,T2,T3)).shape

(11888,)

In [35]:
zupt , logL = ins.detector_adaptive(u[:, :8000])
x_h, _, _, _ = ins.baseline2(u[:, :8000], zupt, logL, True)

In [42]:
xhs = None
logLs = None
n = 4000
num_batches = len(u[0]) // n
init_x2 = None
init_quat2 = None
init_P2 = None
counter = 0

i = 0
ui = u[:, i*n:(i+1)*n]
# zupt2 , logL2 = ins.detector_adaptive(ui)
x_h2, _, quat2, P2 = ins.baseline2(ui, zupt[:, :4000], logL[:, :4000], True) # 9, N
init_x2, init_quat2, init_P2 = x_h2[:,-1], quat2, P2
xhs = x_h2
logLs = logL2

# i = 1
# # zupt2 , logL2 = ins.detector_adaptive(ui)
# x_h2, _, quat2, P2 = ins.baseline3(ui, zupt[:, 4000:8000], logL[:, 4000:8000], True, init_x2, init_quat2, init_P2) # 9, N
# xhs = np.hstack((xhs,x_h2))
# logLs = np.hstack((logLs,logL2))


In [45]:
x_h2[:3, -1] , x_h[:3,4000-1]

(array([-2.05112452, 21.99720281,  0.24258456]),
 array([-2.05112452, 21.99720281,  0.24258456]))

In [47]:
Id = np.eye(9)
N = 4000
x_temp = np.zeros((9,N))
x_temp[:, 0] = x_h2[:, -1]
zupt = np.zeros((1, N + 5), dtype=bool)
dT = 0
gamma = np.zeros(N)

In [None]:
Id = np.eye(9)
N = 4000
x_temp = np.zeros((9,N))
x_temp[:, 0] = x_h2[:, -1]
imu = u2

zupt = np.zeros((1, len(logL[0]) + 5), dtype=bool)
delta_t = 0
gamma = np.zeros(len(logL[0]))
if ins.simdata['detector_prio'] == 'normalized velocity':
    c1, c2, c3 = -1e2 * ins.simdata['Window_size'] / 2, -5e4 * ins.simdata['Window_size'] / 2, 100
else:
    c1, c2, c3 = -1e2 * ins.simdata['Window_size'] / 2, -5e4 * ins.simdata['Window_size'] / 2, 0

for k in range(0, N):
    u_h = ins.comp_imu_errors(imu[:, k], x_h[:, k-1])  # Computation of IMU errors
    x_h[:, k], quat = ins.Navigation_equations(x_h[:, k-1], u_h, quat)  # Navigation equations
    F, G = ins.state_matrix(quat, u_h)  # State matrix calculation

    P = F @ P @ F.T + G @ ins.Q @ G.T
    P = (P + P.T) / 2

    S = P[3:6, 3:6]
    v = x_h[3:6, k]

    gamma[k] = c1 + c2 * delta_t + c3 * (v.T @ np.linalg.inv(S) @ v)
    if logL[0][k] > gamma[k]:
        zupt[0][k:k + ins.simdata['Window_size']] = True
        delta_t = 0
    else:
        delta_t += ins.simdata['Ts']

    if zupt[0][k]:
        K = P @ ins.H.T @ np.linalg.inv(ins.H @ P @ ins.H.T + ins.R)
        z = -x_h[3:6, k]
        dx = K @ z
        x_h[:, k], quat = ins.comp_internal_states(x_h[:, k], dx, quat)
        P = (Id - K @ ins.H) @ P
        P = (P + P.T) / 2