# EKF Dataset and Python Implementation

In [1]:
import numpy as np
from tinyekf import EKF
from time import sleep
from math import sin, cos, pi, radians
import csv
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.pyplot as plt

np.random.seed(43)

ModuleNotFoundError: No module named 'tinyekf'

### 1. Generate a Dataset
Define some helper functions:

In [None]:
# linear motion
def linear_motion(x0, grad, N=100):
    data = []
    for i in range(N):
        x0 = x0 + np.array(grad)
        data.append(x0)
    return np.array(data)


# circular motion
def circular_motion(center, radius=10, omega=0.1, N=100):
    """
    centre: (x, y, z) centre of rotation
    radius: radius of rotation
    omega: angular velocity
    """
    sgn = np.random.randint(2, size=3)*2 -1
    center_of_rotation_x = center[0]
    center_of_rotation_y = center[1]
    center_of_rotation_z = center[2]
    angle = radians(45)  #pi/4 # starting angle 45 degrees
    
    x = center_of_rotation_x + sgn[0] * radius * cos(angle) #Starting position x
    y = center_of_rotation_y + sgn[1] * radius * sin(angle) #Starting position y
    z = center_of_rotation_z + sgn[2] * radius * cos(angle) #Starting position y

    data = np.zeros((N, 3))
    for i in range(N):
        angle = angle + omega  # New angle, we add angular velocity
        x = x + sgn[0] * radius * omega * cos(angle + pi / 2) # New x
        y = y + sgn[1] * radius * omega * sin(angle + pi / 2) # New y
        z = z + sgn[2] * radius * omega * cos(angle + pi / 4) # New z
        data[i][0] = x
        data[i][1] = y
        data[i][2] = z
    return data

def euclidean_distance(x0, x1, bias=0):
    dx = x0 - x1
    dx2 = dx**2
    dist = np.sqrt(np.sum(dx2, axis=1)) + bias
    return dist
    

State = 
    1. Receiver position (linear motion)
    2. Receiver velocity (first derivative)

Observations =
    1. Satellite position (circular motion)
    2. Rho (euclidean distance from receiver)

In [None]:
# create receiver positions 
rx0 = np.random.randn(3) # initial position
rx_data = linear_motion(rx0, [0.2, -0.2, 0.2] , N=100)

# create satellite positions
st0 = np.random.randn(4, 3)
#sat1 = linear_motion(st0[0, :], [0.08, -0.1, 0.02], N=100)
#sat2 = linear_motion(st0[1, :], [0.02, 0.1, -0.2], N=100)
#sat3 = linear_motion(st0[2, :], [-0.4, 0.1, 0.02], N=100)
#sat4 = linear_motion(st0[3, :], [0.3, -0.1, 0.22], N=100)

sat1 = circular_motion(st0[0, :], N=100)
sat2 = circular_motion(st0[1, :], N=100)
sat3 = circular_motion(st0[2, :], N=100)
sat4 = circular_motion(st0[3, :], N=100)


# calculate the observed rho
rho1 = euclidean_distance(rx_data, sat1, bias=0.02)
rho2 = euclidean_distance(rx_data, sat2, bias=0.02)
rho3 = euclidean_distance(rx_data, sat3, bias=0.02)
rho4 = euclidean_distance(rx_data, sat4, bias=0.02)

# add white noise to measurements
sigma = 5e-1
mu = 0.0
sat1 = sat1 + np.random.normal(mu,sigma,(100,3))
sat2 = sat2 + np.random.normal(mu,sigma,(100,3))
sat3 = sat3 + np.random.normal(mu,sigma,(100,3))
sat4 = sat4 + np.random.normal(mu,sigma,(100,3))
rho1 = rho1 + np.random.normal(mu,sigma,100)
rho2 = rho2 + np.random.normal(mu,sigma,100)
rho3 = rho3 + np.random.normal(mu,sigma,100)
rho4 = rho4 + np.random.normal(mu,sigma,100)

# merge dataset
data = np.hstack((sat1, sat2))
data = np.hstack((data, sat3))
data = np.hstack((data, sat4))
rho = np.array([rho1, rho2, rho3, rho4]).T
data = np.hstack((data, rho)).astype(np.float32)

header="x1, y1, z1, x2, y2, z2, x3, y3, z3, x4, y4, z4, r1, r2, r3, r4"
np.savetxt("gps_data.csv", data, fmt="%.6f", delimiter=",", header=header, comments="")

### 2. Visualise the Dataset

In [None]:
data = np.loadtxt("gps_data.csv", delimiter=",", skiprows=1)

fig = plt.figure()
ax = fig.gca(projection='3d')
ax.set_title("Satellite 1,2,3,4 Motion")
ax.plot(data[:, 0], data[:, 1], data[:, 2])
ax.plot(data[:, 3], data[:, 4], data[:, 5])
ax.plot(data[:, 6], data[:, 7], data[:, 8])
ax.plot(data[:, 9], data[:, 10], data[:, 11])
ax.set_xlabel("X")
ax.set_ylabel("Y")
ax.set_zlabel("Z")
ax.set_axis_off

plt.show()


In [None]:
plt.figure()

plt.title("Measured Rho")

x = np.arange(len(data))

plt.plot(x, data[:, 12], "r") 
plt.plot(x, data[:, 13], "b") 
plt.plot(x, data[:, 14], "g") 
plt.plot(x, data[:, 15], "k") 
plt.show()

In [None]:
plt.figure()
plt.title("Actual Receiver Position")

x = np.arange(len(data))

plt.plot(x, rx_data[:, 0], "r")
plt.plot(x, rx_data[:, 1], "b") 
plt.plot(x, rx_data[:, 2], "g") 
plt.show()

We don't observe the actual receiver position. Rather, we use an Extended Kalman Filter to predict the receiver's position given noisy measurements for the satellite and rho.

### 3. Extended Kalman Filter (EKF)

In [None]:
def initQ():
    T = 1
    Sf = 0.1
    Sg = 0.01
    sigma = 0.5
    Qb1 = np.array([[Sf*T + Sg*T*T*T/3., Sg*T*T/2.],
                    [Sg*T*T/2., Sg*T]])
    Qb2 = np.array([[sigma*sigma*T*T*T/3., sigma*sigma*T*T/2.],
                    [sigma*sigma*T*T/2., sigma*sigma*T]])
    z0 = np.zeros((2, 2))

    Q = np.block([[Qb2, z0, z0, z0],
                 [z0, Qb2, z0, z0],
                 [z0, z0, Qb2, z0],
                 [z0, z0, z0, Qb1]])
    return Q


class GPS_EKF(EKF):

    def __init__(self):

        EKF.__init__(self, 8, 4, pval=0.5, qval=0.1, rval=20.0)
        self.x = np.array([rx0[0], 0.3,
                           rx0[1], -0.1,
                           rx0[2], 0.3,
                           0.02, 0])
        self.x = self.x
        #self.Q = initQ()

    def f(self, x, **kwargs):
        a = np.array([[1, 1], [0, 1]])
        F = np.kron(np.eye(4), a)
        x = np.dot(x, F.T)
        return x, F


    def h(self, x, **kwargs):

        if "SV_pos" not in kwargs:
            raise RuntimeError("Satellite positions required for observation "
                               "model")
        SV_pos = kwargs.get("SV_pos")

        # unpack state vector
        xyz = np.array([x[0], x[2], x[4]])
        bias = x[6]
        
        # pseudorange equation: hx = || xyz - SV_pos || + bias
        dx = xyz - SV_pos
        dx2 = dx**2
        hx = np.sqrt(np.sum(dx2, axis=1)) + bias

        H = np.zeros((4, 8))
        for i in range(4):
            for j in range(3):
                idx = 2*j
                H[i][idx] = dx[i][j]/hx[i]
            H[i][6] = 1

        return hx, H

In [None]:
ekf = GPS_EKF()
data = np.loadtxt("gps_data.csv", delimiter=",", skiprows=1)
rx_pos = np.zeros((len(data), 3))

for i, line in enumerate(data):
      
    # unpack sensor data
    SV_pos = np.array(line[:12]).astype(np.float32).reshape(4,3)
    SV_rho = np.array(line[12:]).astype(np.float32)

    # Run the EKF on the satellite position and psuedorange measurements,
    # getting back an updated state estimate. Take only the receiver
    # position. ignoring velocity etc.
    state, hx = ekf.step(SV_rho, SV_pos=SV_pos)
    #print("MSE: ", np.mean( (SV_rho - hx)**2 ))
    
    # save filtered (x,y,z) co-ordinates of receiver
    rx_pos[i][0] = state[0]
    rx_pos[i][1] = state[2]
    rx_pos[i][2] = state[4]

In [None]:
plt.figure()
plt.title("Filtered Receiver Position")

x = np.arange(len(rx_pos))

plt.plot(x, rx_pos[:, 0], "r")
plt.plot(x, rx_pos[:, 1], "b") 
plt.plot(x, rx_pos[:, 2], "g") 
plt.show()

In [None]:
rx_pos[:25]

In [None]:
rx0