In [79]:
import numpy as np


def generate_data(L, N, T=20, dt=60, v_max=10):
    """
    L: number of landmarks
    N: number of observers
    T: number of time steps
    dt: time step duration
    v_max: maximum speed of the landmarks and observers
    """

    # Random initial positions
    landmarks = np.random.rand(L, 3) * 10
    observers = np.random.rand(N, 3) * 10

    # Random constant velocities
    velocities_landmarks = np.random.rand(L, 3) * v_max
    velocities_observers = np.random.rand(N, 3) * v_max

    # Initialize arrays
    ranges = np.zeros((T, L, N))
    pos = np.zeros((T, N, 3))
    true_positions = np.zeros((T, L, 3))

    # Compute positions and ranges at each time step
    for t in range(T):
        # Update positions
        landmarks += velocities_landmarks * dt
        observers += velocities_observers * dt

        # Store true landmark positions
        true_positions[t, :] = landmarks

        # Compute and store ranges and observer positions
        for n in range(N):
            pos[t, n] = observers[n]
            for l in range(L):
                diff = landmarks[l] - observers[n]
                ranges[t, l, n] = np.linalg.norm(diff)

    return ranges, pos, true_positions

# Generate some data
L = 2  # number of landmarks
N = 3  # number of observations
T = 20  # number of time steps
ranges, pos, true_pos = generate_data(L, N, T)

In [87]:
import numpy as np
from filterpy.kalman import KalmanFilter
from scipy.optimize import least_squares

class LandmarkTracker:
    def __init__(self, n_landmarks):
        self.filters = [self.init_filter() for _ in range(n_landmarks)]

    def init_filter(self):
        kf = KalmanFilter(dim_x=6, dim_z=3)

        # State transition matrix
        kf.F = np.array([[1, 0, 0, 1, 0, 0],
                         [0, 1, 0, 0, 1, 0],
                         [0, 0, 1, 0, 0, 1],
                         [0, 0, 0, 1, 0, 0],
                         [0, 0, 0, 0, 1, 0],
                         [0, 0, 0, 0, 0, 1]])

        # Measurement function
        kf.H = np.array([[1, 0, 0, 0, 0, 0],
                         [0, 1, 0, 0, 0, 0],
                         [0, 0, 1, 0, 0, 0]])

        kf.R *= np.array([[0.1, 0, 0],
                          [0, 0.1, 0],
                          [0, 0, 0.1]])

        kf.Q *= np.array([[1, 0, 0, 0, 0, 0],
                          [0, 1, 0, 0, 0, 0],
                          [0, 0, 1, 0, 0, 0],
                          [0, 0, 0, 1, 0, 0],
                          [0, 0, 0, 0, 1, 0],
                          [0, 0, 0, 0, 0, 1]])

        return kf

    def euclidean_to_cartesian(self, distances):
        def residuals(coords, distances):
            return np.sqrt(np.sum((self.observers - coords)**2, axis=1)) - distances

        coords = least_squares(residuals, self.observers.mean(axis=0), args=(distances,)).x
        return coords

    def update(self, ranges, pos):
        out = []
        self.observers = pos
        for i, kf in enumerate(self.filters):
            kf.predict()
            cartesian_measurements = self.euclidean_to_cartesian(ranges[i])
            print("cartesian",cartesian_measurements)
            kf.update(cartesian_measurements)
            out.append(kf.x[:3]) 
        return out

In [88]:
%%time
tracker = LandmarkTracker(L)

# Call particle filter
for t in range(T):
    landmark_states = tracker.update(ranges[t], pos[t])
    print(t)
    print("PREDICTED POS")
    print(state)
    print("REAL POS")
    print(true_pos[t])

cartesian [254.32728014  62.65312256 416.73918612]
cartesian [184.21377345 260.02356994 390.55474586]
0
PREDICTED POS
[[6560.86447493 6868.09399085 5243.86157092]
 [3925.09088748 7438.38111862 7602.45745291]]
REAL POS
[[254.32728014  62.65312256 416.73918612]
 [ 35.20890567 532.15606216 296.05372299]]
cartesian [-161.73289445 1352.91546644  402.85054318]
cartesian [  65.03529327 1061.23383758  590.15484642]
1
PREDICTED POS
[[6560.86447493 6868.09399085 5243.86157092]
 [3925.09088748 7438.38111862 7602.45745291]]
REAL POS
[[ 507.63567088  124.5302888   830.4136344 ]
 [  65.03529327 1061.23383758  590.15484642]]
cartesian [-240.0216776  2026.27503576  603.23685602]
cartesian [  94.86168088 1590.311613    884.25596985]
2
PREDICTED POS
[[6560.86447493 6868.09399085 5243.86157092]
 [3925.09088748 7438.38111862 7602.45745291]]
REAL POS
[[ 760.94406162  186.40745504 1244.08808267]
 [  94.86168088 1590.311613    884.25596985]]
cartesian [-318.31286086 2699.63398698  803.63527033]
cartesian [ 1

In [64]:
class LandmarkTracker:
    def __init__(self, n_landmarks, n_observers):
        self.n_landmarks = n_landmarks
        self.n_observers = n_observers
        
        # Initialize state of each landmark
        # State is [x, y, z, vx, vy, vz] for each landmark
        self.state = np.zeros((n_landmarks, 6))
        
        # Initialize covariance of each landmark's state
        # We'll assume each landmark's initial state is uncertain
        self.covariance = np.eye(3) * 1000  # Increase this if initial state is more uncertain

        # Process noise standard deviation (for Q matrix in Kalman filter)
        self.process_noise_std = 0.1  # Increase this if landmarks move unpredictably
        
        # Observation noise standard deviation (for R matrix in Kalman filter)
        self.observation_noise_std = 0.1  # Increase this if range measurements are noisy

    def predict(self):
        # Predict the new state and covariance of each landmark
        for i in range(self.n_landmarks):
            self.state[i, :3] += self.state[i, 3:]  # Update position with velocity
            Q = np.eye(6) * self.process_noise_std**2  # Process noise covariance
            self.covariance += Q  # Update covariance with process noise

    def update(self, ranges, observers_positions):
        # Update the state and covariance of each landmark using the new observations
        for i in range(self.n_landmarks):
            def residuals(state):
                # Calculate residuals between observed and predicted ranges
                predicted_ranges = np.sqrt(np.sum((observers_positions - state[:3])**2, axis=1))
                return predicted_ranges - ranges[i]

            initial_guess = self.state[i, :3]
            result = least_squares(residuals, initial_guess)
            H = self.jacobian(observers_positions, result.x)
            R = np.eye(self.n_observers) * self.observation_noise_std**2
            K = self.covariance @ H.T @ np.linalg.inv(H @ self.covariance @ H.T + R)
            self.state[i] = self.state[i] + K @ residuals(result.x)
            self.covariance = (np.eye(6) - K @ H) @ self.covariance

    def jacobian(self, observers_positions, landmark_position):
        # Calculate the Jacobian matrix
        predicted_ranges = np.sqrt(np.sum((observers_positions - landmark_position)**2, axis=1))
        return (observers_positions - landmark_position) / predicted_ranges[:, None]

    def track(self, ranges, observers_positions):
        # Predict and update the state of each landmark
        self.predict()
        self.update(ranges, observers_positions)
        return self.state