Example: Windowed Gaussian Process transition model

In [71]:
# define constants

from sklearn.gaussian_process.kernels import RBF
import numpy as np
from datetime import datetime, timedelta

PRIOR_SCALE = 10
WINDOW_SIZE = 10
TIME_INTERVAL = timedelta(seconds=1)

KERNEL_LENGTH_SCALE = 10
KERNEL_OUT_VAR = 30

NOISE_COVAR = 5
PRIOR_COVAR = 1
num_steps = 100

np.random.seed(0)

In [72]:
# Define our 2D transition model
import sys
import os
parent_dir = os.path.abspath(os.path.join(os.getcwd(), '..'))
sys.path.append(parent_dir)
print(os.listdir(parent_dir))
from stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, \
                                               SlidingWindowGaussianProcess

# Sliding window GP with RBF kernel
class RBFKernelGP(SlidingWindowGaussianProcess):
    def kernel(self, x, y, **kwargs):
        k = KERNEL_OUT_VAR*RBF(KERNEL_LENGTH_SCALE)
        return k(x,y)

transition_model = CombinedLinearGaussianTransitionModel([RBFKernelGP(window_size=WINDOW_SIZE),
                                                          RBFKernelGP(window_size=WINDOW_SIZE)])

['.circleci', '.codecov.yml', '.flake8', '.git', '.gitattributes', '.github', '.gitignore', '.git_archival.txt', '.pytest_cache', '.readthedocs.yml', '01_KalmanFilterTutorial (1).ipynb', '06_DataAssociation-MultiTargetTutorial.ipynb', 'binder', 'CITATION.cff', 'dataset_InsectFlightDynamics', 'docs', 'example_IWGP.py', 'example_WGP.py', 'InsectFlightDynamics_Sec1.ipynb', 'LICENSE', 'MANIFEST.in', 'notebooks', 'pyproject.toml', 'README.md', 'sec1.ipynb', 'sec1_cattle.ipynb', 'sec1_extra.ipynb', 'sec1_gp_generated.ipynb', 'sec1_gp_generated_single.ipynb', 'sec1_gp_prior.ipynb', 'sec1_hyp_optim.ipynb', 'sec2_igp_generated copy.ipynb', 'sec2_random.ipynb', 'sec3_dynamic_gp.ipynb', 'stonesoup', 'tractmethod', 'X and Y.', '__pycache__']


In [73]:
# Obtain ground truth coordinates from insect walk data

from get_coordinates import get_coordinates
import pandas as pd

# Load the CSV file into a DataFrame
file_path = 'Coordinate_second.csv'  # Replace with the actual file path
df = pd.read_csv(file_path)

# Extract all coordinates
type_name = 'Dmelanogaster'
movie_number = 1
fly_number = 5
x, y = get_coordinates(df, type_name, movie_number, fly_number)

In [74]:
# Convert raw coordiantes to ground truth path

from stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState
from stonesoup.types.state import GaussianState, StateVector

start_time = datetime.now().replace(microsecond=0) 
curr_time = start_time + (WINDOW_SIZE-1) * TIME_INTERVAL  # first prediction at t = 5
# num_steps = (len(x) - WINDOW_SIZE)
num_steps = 50

truth = GroundTruthPath()

# Generate path
for k in range(num_steps+1):
    truth.append(GroundTruthState(np.concatenate((np.flip(x[k:WINDOW_SIZE+k]), np.flip(y[k:WINDOW_SIZE+k]))), timestamp=curr_time))
    curr_time += TIME_INTERVAL


In [75]:
# Simulate gaussian noise measurements
from stonesoup.types.detection import Detection
from stonesoup.models.measurement.linear import LinearGaussian

# Define measurement model
measurement_model = LinearGaussian(
    ndim_state=WINDOW_SIZE*2,
    mapping=(0, WINDOW_SIZE),
    noise_covar=np.array([[NOISE_COVAR, 0],
                          [0, NOISE_COVAR]])
    )



# Generate measurements
measurements = []
for state in truth:
    measurement = measurement_model.function(state, noise=True)
    measurements.append(Detection(measurement,
                                  timestamp=state.timestamp,
                                  measurement_model=measurement_model))

In [76]:
# Plot ground truth and simulated measurements

from stonesoup.plotter import AnimatedPlotterly

timesteps = [start_time + TIME_INTERVAL * i for i in range(num_steps + WINDOW_SIZE)]

plotter = AnimatedPlotterly(timesteps, tail_length=1)
plotter.plot_ground_truths(truth, [0, WINDOW_SIZE])
plotter.plot_measurements(measurements, [0, WINDOW_SIZE])
plotter.fig

In [77]:
# Initialise predictor and prior for kalman filtering

from stonesoup.predictor.kalman import KalmanPredictor

predictor = KalmanPredictor(transition_model)

prior_state_vector = np.zeros(WINDOW_SIZE*2)
prior_state_vector[:WINDOW_SIZE] += x[0]
prior_state_vector[WINDOW_SIZE:] += y[0]
prior = GaussianState(StateVector(prior_state_vector), np.identity(WINDOW_SIZE*2)*PRIOR_COVAR, timestamp=truth[0].timestamp)

In [78]:
# Kalman filtering: Prediction and update steps

from stonesoup.types.hypothesis import SingleHypothesis
from stonesoup.types.track import Track
from stonesoup.updater.kalman import KalmanUpdater

updater = KalmanUpdater(measurement_model)
track = Track()
track.append(prior)

for i in range(1, len(measurements)):
    measurement = measurements[i]
    prediction = predictor.predict(prior, timestamp=measurement.timestamp, pred_time=(measurement.timestamp-start_time))
    hypothesis = SingleHypothesis(prediction, measurement)
    post = updater.update(hypothesis)
    track.append(post)
    prior = track[-1]

# zero division error if prior.timestamp == measurement.timestamp

In [79]:
# Using Kalman Smoother is equivalent to using last elem in sliding window
# Kalman update step smoothes previous states

In [80]:
plotter.plot_tracks(track, [0, WINDOW_SIZE], track_label="Sliding window GP", uncertainty=False)
plotter.fig