### Implementation of UKF for Quadcopter dynamics

We use the full quadrotor state dynamics with state
 $$[x, \dot x, q, \dot \omega]$$

And inputs 
$$[f_T, \tau]$$

In [None]:
from scipy.spatial.transform import Rotation 

#TODO: Modify dynamics function so that it works with a batch of points. i.e., input has shape (L, 2L + 1)
# Should mostly have to change the skew symmetric omega matrix and the cross product operation.

# Matrix to quaternion: Rotation.from_matrix(rotation_matrix).as_quat()
# Quaternion to matrix: Rotation.from_quat(quaternion).as_matrix() 

"""
Approach: 
    1. We use the full quad dynamics. 
    2. Inputs are thrust and moments. 
    3. These don't actually matter for the estimator as they'll be modeled as noise and we don't have access to them. 
    4. If we want to test the estimator with a toy example before gazebo, we can ask chatGPT to generate a thrust and moment control 
    law for a simple back and forth trajectory and code to plot accs, vels, and pos from that. 
    5. That means our process noise vector has size 2, because of our two inputs. 
    6. (Optional) Before trying to implement the augmented noise step, try the additive following the old hw solution.
    7. When running the gazebo simulation we give the same trajectory generator commands as acceleration, this actually doesn't matter, 
    as the PX4 cascated PID controller gives thrust and moment commands at the lowest level. 

"""

In [None]:
class Quad():
    def __init__(self, x0, dt, m=2.5, d=0.35):
        """
        Constructor for Quadcopter

        Inputs: 
            - x0, true state initial state, (13, 1) nd.array 
            - dt, discretized time interval, 30 for gazebo sim, float. 
            - Quad mass, about 2.5 Kg for x500 model, float.
            - Quad arm length, about 0.35 m for x500 model, float.
        """
        self.dt = dt
        self.m = m 
        self.d = d
        self.g = 9.81 # m/s^2
        self.I = np.eye(3)
        self.I[0, 0] = m * (d ** 2)
        self.I[1, 1] = m * (d ** 2) 
        self.I[2, 2] = 0.5 * m * (d ** 2)

        self.x = copy.deepcopy(x0) 

        # state and control history (h for history not hat)
        self.xh = [copy.deepcopy(x0)]
        self.uh = [] 

        # Measurement noise for position measurement (simulate lidar) 
        self.R = np.diag([1e-2, 1e-2, 1e-2]) #TODO Try experimenting with this value

    def W(self, u, alphas=[0.1, 0.01, 0.01, 0.1, 0.1, 0.01, 0.01, 0.1]):
        """
        Process noise. 
        You could implement a model that adds more noise the faster 
        the vehicle is thrusting and the higher the moments, but for now
        let's just make it a fixed variance for each.

        Here thrust and moments have the same effect on each other's uncertainties
        (that's what the noise characteristics, alpha, are). But you could modify this 
        to something that makes more sense. 
        Perhaps thrust should be noisier if yaw is high.  
        """

        var1 = alphas[0] * u[0] ** 2 + alphas[1] * u[1] ** 2 + alphas[2] * u[2] ** 2 + alphas[3] * u[3] ** 2
        var2 = alphas[4] * u[0] ** 2 + alphas[5] * u[1] ** 2 + alphas[6] * u[2] ** 2 + alphas[7] * u[3] ** 2

        return np.diag([var1, var2, var2, var2])

    def f(self, xk, uk, dt):
        """
        Dynamics model for the drone. 
        Propagates state from k to k + 1 

        Inputs: 
            - State, x, at k, nd.array(13, 1) 
            - Control law, u, at k, total thrust and moment along each euler angle, nd.array (4, 1)
            - dt, discrete timestep 
        Output: 
            - xkp1, propagated state
        """

        # Extract state
        pk = xk[:3, :]
        vk = xk[3:6, :]
        qk = xk[6:10, :]
        omk = xk[10:13, :]

        # Extract inputs
        Tk = uk[0, :].reshape(1, 1)
        tauk = uk[1:, :]

        # Compute drone's acceleration at k 
        e3 = np.array([[0, 0, -1]]).T
        zb = np.array([[0, 0, 1]]).T
        Rk = Rotation.from_quat(qk, scalar_first=True).as_matrix()
        ak = self.g * e3 + Rk @ zb * Tk / self.m 

        # Propagate translational dynamics 
        pkp1 = pk + vk * dt 
        vkp1 = vk + ak * dt 

        # Compute skew-symmetric omega matrix 
        wx, wy, wz = omk[0, 0], omk[1, 0], omk[2, 0]
        Omk = np.array([[ 0, -wx, -wy, -wz], # This choice of skey symmetric matrix requires scalar first quaternion convention 
                        [wx,   0,  wz, -wy],
                        [wy, -wz,   0,  wx],
                        [wz,  wy,  -wx,  0]])

        # Propagate translational dynamics 
        qkp1 = qk + 0.5 * Omk @ qk * dt  # This equation is derived from quaternion differentiation. TODO: If code doesn't work well try changing to Rotation matrix propagation
        qkp1 = qkp1 / np.linalg.norm(qkp1) 
        omega = omk.reshape(3,) # Required for cross product
        omkp1 = omk + np.linalg.inv(self.I) @ (tauk + np.cross(omega, self.I @ omega).reshape(3, 1)) * dt

        # Construct propagated state vector 
        xkp1 = np.vstack((pkp1, vkp1, qkp1, omkp1))

        return xkp1 

    def h(self, xk):
        """
        Measurement model for lidar sensor 

        Inputs: 
            - xk, current state
        Outputs: 
            - Current position, nd.array (3, 2L + 1)
        """
        return xk[:3, :]

    def move(self, uk):
        """
        Simple physics simulation of the quadcopter 

        Inputs: 
            - uk, the control law at timestep k, nd.array (4, 1) 
        """

        # Add process noise to the desired commands 
        uk_noisy = np.random.multivariate_normal(uk[:, 0], self.W(uk)).reshape(4, 1)

        # Propagate state forward
        self.x = self.f(self.x, uk_noisy, self.dt)

        # Keep history 
        self.uh.append(copy.deepcopy(uk_noisy))
        self.xh.append(copy.deepcopy(self.x))

    def sense(self):
        """
        Simulate lidar sensor reading by adding noise to measurement

        Inputs: 
            - None, uses class variable self.x

        Outputs: 
            - noised up measurement
        """

        # Get measurement from measurement model 
        pk_mes = self.h(self.x)

        # Add noise to the reading  
        return np.random.multivariate_normal(pk_mes[:, 0], self.R).reshape(3, 1)


In [None]:
class UKF():
    def __init__(self, quad, x0, Q0, alpha=1e-3, beta=2, kappa=0):
        """
            Unscented Kalman Filter 
            
            Inputs:
                - quad model 
                - x0, initial state
                - Q0, initial covariance 
                - alpha, determines spread of sigma points distribution around mean state, usually 1e-3.
                - beta, incorporates prior knowledge to distribution, optimally 2 for Gaussians. 
                - Kappa, secondary scaling parameter, usually zero. 
        """
        # UT Params 
        self.kappa = kappa
        self.L = x0.shape[0]
        self.alpha = alpha 
        self.beta = beta 
        self.lamb = self.alpha ** 2 * (self.L + self.kappa) - self.L 

        # Initialize filter state and covariance 
        self.xhat = copy.deepcopy(x0)
        self.Q = copy.deepcopy(Q0)

        # Initialize filter state and covariance history 
        self.xhath = [copy.deepcopy(x0)]
        self.Qh = [copy.deepcopy(Q0)]

        # Save quad model instance as a class variable for dynamics 
        self.quad = quad

        # Dynamics and measurement models (assume we have perfect model knowledge) 
        self.f = quad.f 
        self.h = quad.h

        # Model noise as either a function of control or just even noise 
        even_noise = True 
        if even_noise: 
            self.W = np.diag([1.] * self.L) * 0.1 #NOTE: This covariance should have dimensions (L, L) as it is added to state covariance in filter
            self.R = quad.R
        else:
            self.W = quad.W 
            self.R = quad.R 

    def filter_UT(self, y_mes):
        """ 
            Apply Unscented Kalman Filter 

            Inputs: 
                - y_mes, measurement from quad simulation, nd.array(3, 1)
        """

        # Compute weights 
        Wm = np.zeros((2 * self.L + 1, 1))
        Wm[0, :] = self.lamb / (self.L + self.lamb)
        Wm[1:, :] = 1 / (2 * (self.L + self.lamb)) 

        Wc = np.zeros((2 * self.L + 1, 1))
        Wc[0, :] = self.lamb / (self.L + self.lamb) + (1 - self.alpha ** 2 + self.beta) 
        Wc[1:, :] = Wm[1:, :] 
        
        # Compute sigma points 
        sqrtQ = np.linalg.cholesky((self.L + self.lamb) * self.Q + 1e-9 * np.eye(self.L))
        Xk = np.zeros((2 * self.L + 1, self.L))
        Xk[0, :] = self.xhat[:, 0]
        Xk[1:self.L+1, :] = self.xhat[:, 0] + sqrtQ.T
        Xk[self.L+1:, :] = self.xhat[:, 0] - sqrtQ.T

        # Push sigma points through dynamics and recover state at k+1
        Xkp1 = np.array([self.f(Xk[i, :].reshape(-1, 1), np.zeros((4, 1)), self.quad.dt).flatten() for i in range(2 * self.L + 1)]).reshape((2 * self.L + 1, self.L))
        self.xhat = np.sum(Wm * Xkp1, axis=0).reshape(-1, 1)

        # Recover state covariance at k_1 
        self.Q = (Wc * (Xkp1 - self.xhat.T)). T @ (Xkp1 - self.xhat.T) + self.W 

        # Recompute sigma points for measurement
        sqrtQ = np.linalg.cholesky((self.L + self.lamb) * self.Q + 1e-9 * np.eye(self.L))
        Xk = np.zeros((2 * self.L + 1, self.L))
        Xk[0, :] = self.xhat[:, 0]
        Xk[1:self.L+1, :] = self.xhat[:, 0] + sqrtQ.T
        Xk[self.L+1:, :] = self.xhat[:, 0] - sqrtQ.T

        # Push sigma points through measurement model and recover mean at k+1
        Ykp1 = self.h(Xk.T).T 
        ykp1 = np.sum(Wm * Ykp1, axis=0).reshape(-1, 1)

        # Recover y covariance at k+1
        Qy_kp1 = (Wc.T * (Ykp1 - ykp1.T)).T @ (Ykp1 - ykp1.T) + self.R

        # Compute xy cross covariance at k+1 
        Qxy_kp1 =  (Wc * (Xkp1 - self.xhat.T)).T @ (Ykp1 - ykp1.T) 

        # Compute Kalman gain and perform measurement update 
        Kappa = Qxy_kp1 @ np.linalg.inv(Qy_kp1) 
        self.xhat += Kappa @ (y_mes - ykp1)
        self.Q -= Kappa @ Qy_kp1 @ Kappa.T  

    def filter(self, y_mes):
        """
        Wrapper function to keep track of estimate and covariance history
        """
        self.xhath.append(copy.deepcopy(self.xhat))
        self.Qh.append(copy.deepcopy(self.Q))





In [None]:
# Simulate quad for a back and forth trajectory in x 
dt = 0.1

# Initial conditions 
Q0 = np.diag([1.] * 13) * 1e-1
x0 = np.array([0., 0., 1.5., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0.,]) # p: 0, 0, 1.5; v: 0, 0, 0; q: 1, 0, 0, 0; omega: 0, 0, 0

# Initialize quad and ukf
quad = Quad()

In [None]:
### Dynamics 
def get_drone_frame(point):
    """
    Computes the basis for the drone frame in the world frame from the quadcopter flat outputs.
    Note since we don't have yaw, sigma[3][0] will have to be replaced by noise. 

    To compute the orientation we need:
        - Position
        - Acceleration 
        - Yaw 
    
    For simplicity we should first try fixing yaw. Then we can try passing in as noise and see how the filter performs. 
    """
    g = 9.81

    # Construct differentially flat vectors 
    sigma = np.array([[point.p.x, point.p.y, point.p.z, point.psi]]).T
    sigma_dot_dot = np.array([[point.a.x, point.a.y, point.a.z, 0]]).T

    # Compute z_B 
    t = np.array([[sigma_dot_dot[0][0], sigma_dot_dot[1][0], sigma_dot_dot[2][0] + g]]).T
    z_B = t / np.linalg.norm(t) 

    # Compute intermediate yaw vector x_C
    x_C = np.array([[np.cos(sigma[3][0]), np.sin(sigma[3][0]), 0]]).T

    # Compute x-axis and y-axis of drone frame measured in world frame
    cross_prod = np.cross(z_B.T[0], x_C.T[0]) 
    y_B = np.array([cross_prod / np.linalg.norm(cross_prod)]).T 
    x_B = np.array([np.cross(y_B.T[0], z_B.T[0])]).T 

    return x_B, y_B, z_B 

def get_orientation(point):
    """
    Stack the basis column vectors for the quadcopter body to obtain the orientation of the quadcopter in the world frame as a matrix. 
    Then convert it into a quaternion. 
    """
    x_B, y_B, z_B = get_drone_frame(point)

    # Populate rotation matrix of drone frame measured from world frame
    R_W_B = np.hstack((x_B, y_B, z_B))

    return Rotation.from_matrix(R_W_B).as_quat() # As [x, y, z, w] vector

def get_angular(point):
    """
    Get angular accelerations from flat outputs. 
    Since we don't have jerk, we could either model it as noise or take derivatives of accelerations. 
    Either way we won't use angular velocities at the moment so don't worry about this. 
    """
    m = 2.906 
    g = 9.81
    z_W = np.array([[0, 0, 1]]).T 
    x_B, y_B, z_B = get_drone_frame(point)
    jerk = np.array([[point.j.x, point.j.y, point.j.z]]).T 
    dpsi = point.dpsi

    # Compute u1 
    f_des = m * g * z_W + m * np.array([[point.a.x, point.a.y, point.a.z]]).T
    u1 = (f_des.T @ z_B)[0, 0]

    # Compute h_om
    h_om = m / u1 * (jerk - (z_B.T @ jerk)[0, 0] * z_B)

    # Compute angular velocities 
    p = float(- (h_om.T @ y_B)[0, 0])
    q = float((h_om.T @ x_B)[0, 0])
    r = float(dpsi * (z_W.T @ z_B)[0, 0]) 

    return p, q, r