In [1]:
import rospy
from sensor_msgs.msg import Imu

import numpy as np
import torch

In [2]:
gyro_fps=400
accel_fps=250

In [3]:
def skew2vec(W):
    assert W.shape==(3,3)
    w=torch.tensor([(W[2,1]-W[1,2])/2,
                  (W[0,2]-W[2,0])/2,
                  (W[1,0]-W[0,1])/2,
                   ])
    return w

def vec2skew(w):
    assert w.shape==(3,)
    W=torch.tensor(
                  [[0,-1*w[2],w[1]],
                  [w[2],0,-1*w[0]],
                  [-1*w[1],w[0],0]
                  ])
    return W

In [4]:
def r2w(r):
    # exponential coordinate to w,theta coordinate
    theta = torch.linalg.norm(r)
    assert 0<theta and theta<np.pi
    w = r/theta
    
    if abs(theta-0)<0.1 or abs(theta-np.pi)<0.1:
        print("warning!! theta near singularity")
    
    return theta , w

def w2r(theta,w):
    assert 0<theta and theta<np.pi
    return theta*w

In [5]:
def Rodrigues(theta,W):
    return torch.eye(3)+(torch.sin(theta)*W)+((1-torch.cos(theta))*(W@W))

def matlog(R):
    tr = torch.trace(R)
    assert abs(torch.linalg.det(R)-1)<0.0001
    assert tr<3 and tr>-1
    if abs(tr-3)<0.1 or abs(tr+1)<0.1:
        print("warning!! R near singularity")
    
    theta=torch.arccos((tr-1)/2)
    W=(R-R.T)/(2*torch.sin(theta))
    
    return theta,W

In [6]:
def aJacobian(theta,W):
    rmat=theta*W
    A=torch.eye(3) - (((1-torch.cos(theta))/(theta**2))*rmat) + (((theta-torch.sin(theta))/(theta**3))*(rmat@rmat))         
    return A

In [7]:
def dynamic_model(r,wb,dt=1/400):
    assert type(r)==torch.Tensor
    wb_=torch.tensor(wb,dtype=float)
    theta,w=r2w(r)
    W=vec2skew(w)
    A=aJacobian(theta,W)
    Ainv=torch.inverse(A)
    return r+(dt*Ainv@wb_)

In [8]:
def measurement_model(r):
    g=torch.tensor([0,0,-9.8],dtype=float)
    
    theta,w=r2w(r)
    W=vec2skew(w)
    R=Rodrigues(theta,W)
    Rinv=torch.inverse(R)
    zhat=Rinv@g
    return zhat


In [9]:
def tensor_jacobian(T,r):
    shape=T.shape
    T_=T.view(-1)
    T_length=T_.shape[0]
    r_length=r.shape[0]
    J=torch.zeros((T_length,r_length),dtype=float)
    
    for i in range(T_length):
        if not r.grad==None:
            r.grad.zero_()
        T_[i].backward(retain_graph=True)
        J[i,:]=r.grad
    
    r.grad.zero_()
    
    J=J.view(*shape,r_length)
    return J
        
        
        
    
    
    
    

In [15]:
def EKF(r: torch.Tensor,cov: torch.Tensor, 
        wb: np.ndarray, a: np.ndarray, dt) -> torch.Tensor:
    S=0.5*torch.eye(3,dtype=float) #dynamics noise
    Q=0.5*torch.eye(3,dtype=float) #measurement noise
    
    g=dynamic_model(r,wb,dt)
    G=tensor_jacobian(g,r)
    r_bar=g.clone().detach().requires_grad_(True)
    h=measurement_model(r_bar)
    H=tensor_jacobian(h,r_bar)
    
    with torch.no_grad():
        cov_bar = G@cov@G.T + S
        K = cov_bar@H.T@torch.inverse(H@cov_bar@H.T+Q)
        r_next = r_bar + (K@(torch.tensor(a,dtype=float)-h))
        cov_next = (torch.eye(3,dtype=float)-(K@H))@cov_bar
    
    return r_next,cov_next
    

In [21]:
r0=np.array([np.pi/2,0,0])
r0=torch.tensor(r0,dtype=float)
cov0=1.*torch.eye(3,dtype=float)

In [22]:
r=r0.clone().requires_grad_(True)
cov=cov0.clone().requires_grad_(True)
a=np.array([0,-9.8,0])
wb=np.array([-3.,2.,-1.])
dt=1/400

In [23]:
EKF(r,cov,wb,a,dt)

(tensor([1.5708, 0.0059, 0.0020], dtype=torch.float64),
 tensor([[ 5.2117e-03, -5.6330e-03, -1.8663e-03],
         [-5.6330e-03,  1.5000e+00, -1.9110e-05],
         [-1.8663e-03, -1.9110e-05,  1.5000e+00]], dtype=torch.float64))

In [None]:
class IMU_reader:
    def __init__(self):
        #accelerometer calibration ax+b
        self.a=np.array([1.017125065,1.02456874,1.018181818])
        self.b=np.array([0.01525687597,-0.445687402,-0.3309090909])
        
        self.accel = np.zeros(3)
        self.gyro = np.zeros(3)
        rospy.init_node('listener', anonymous=True)
        rospy.Subscriber("/camera/accel/sample", Imu, self.accel_callback)
        rospy.Subscriber("/camera/gyro/sample", Imu, self.gyro_callback)

    def accel_callback(self,data):
        v=data.linear_acceleration
        v=np.array([v.x,v.y,v.z])
        self.accel=self.a*v+self.b

    def gyro_callback(self,data):
        v=data.angular_velocity
        v=np.array([v.x,v.y,v.z])
        self.gyro=v

In [None]:
imur=IMU_reader()