<a href="https://colab.research.google.com/github/mkielo3/inekf_demo/blob/main/Left_Invariant_EKF_on_SE2_Demo-.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
%pip install gtsam

Collecting gtsam
  Downloading gtsam-4.2-cp311-cp311-manylinux2014_x86_64.whl.metadata (7.6 kB)
Downloading gtsam-4.2-cp311-cp311-manylinux2014_x86_64.whl (22.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m22.4/22.4 MB[0m [31m90.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: gtsam
Successfully installed gtsam-4.2


In [None]:
#!/usr/bin/env python3
import gtsam
import numpy as np
from gtsam import symbol

# Global parameter to control logging verbosity
verbose_logging = False

#!/usr/bin/env python3
import gtsam
import numpy as np
from gtsam import symbol

# Global parameter to control logging verbosity
verbose_logging = False

def convert_adjoint_to_inekf(pose_inv):
	"""Convert GTSAM's Adjoint to InEKF convention."""
	# @TODO - Replace with Pose2::AdjointMap()
	inekf_adjoint = np.eye(3)
	inekf_adjoint[1, 0] = pose_inv.y()
	inekf_adjoint[2, 0] = -pose_inv.x()
	return inekf_adjoint


def predict(X_prev, Sigma_prev, odometry, Q_model, graph, initial, state_index):
	"""
	Predict step of the InEKF using factor graphs.

	Args:
		X_prev: Previous state (Pose2)
		Sigma_prev: Previous covariance (3x3 matrix)
		odometry: Odometry measurement (Pose2)
		Q_model: Process noise model
		graph: Factor graph
		initial: Values
		state_index: Current state index

	Returns:
		X_predict: Predicted state
		Sigma_predict: Predicted covariance
	"""

	Q = Q_model.covariance()
	if verbose_logging:
		print("[InEKF] Process noise Q:\n", Q)

	odometry_inv = odometry.inverse()
	if verbose_logging:
		print("[SE2] inverse: Input state matrix:\n", odometry.matrix())
		print("[SE2] inverse: Final inverse matrix:\n", odometry_inv.matrix())

	# Get Adjoint matrix for left-invariant error propagation
	Phi = convert_adjoint_to_inekf(odometry_inv)
	if verbose_logging:
		print("[SE2] GTSAM Adjoint matrix:\n", odometry_inv.AdjointMap())
		print("[SE2] Our converted Adjoint matrix:\n", Phi)
		print("[InEKF] Phi matrix:\n", Phi)

	# First add process noise to covariance
	Sigma_with_noise = Sigma_prev + Q
	if verbose_logging:
		print("[InEKF] Sigma after adding process noise:\n", Sigma_with_noise)

	# Then apply Phi transformation for left-invariant error propagation
	Sigma_predict = Phi @ Sigma_with_noise @ Phi.T
	if verbose_logging:
		print("[InEKF] Final Sigma after Phi transformation:\n", Sigma_predict)

	# Print state propagation details
	if verbose_logging:
		print("[SE2] multiply: Left matrix:\n", X_prev.matrix())
		print("[SE2] multiply: Right matrix:\n", odometry.matrix())

	# Apply odometry to state
	X_predict = X_prev.compose(odometry)
	if verbose_logging:
		print("[SE2] multiply: Final result matrix:\n", X_predict.matrix())
		print("[InEKF] Predicted state completed")

	# Add between factor with propagated uncertainty
	propagated_noise = gtsam.noiseModel.Gaussian.Covariance(Sigma_predict)
	between_factor = gtsam.BetweenFactorPose2(symbol('x', state_index-1),
											 symbol('x', state_index),
											 odometry,
											 propagated_noise)

	# Add to graph and initial values
	graph.add(between_factor)
	initial.insert(symbol('x', state_index), X_predict)

	# Linearize and optimize
	linearized = graph.linearize(initial)
	delta = linearized.optimize()

	# Get covariance from information matrix
	info = linearized.hessian()[0]
	block_start = 3 * state_index
	block_end = block_start + 3
	Sigma_predict_optimized = np.linalg.inv(info[block_start:block_end, block_start:block_end])

	# Always print the final state and covariance regardless of verbose_logging
	print("\nState after predict:")
	print(X_predict)
	print("Left-invariant covariance after predict:")
	print(Sigma_predict_optimized)

	return X_predict, Sigma_predict_optimized


def update(X_predict, Sigma_predict, z_vec, H, M_model, initial, state_index):
	"""
	Update step of the InEKF.

	Args:
		X_predict: Predicted state (Pose2)
		Sigma_predict: Predicted covariance (3x3 matrix)
		z_vec: Measurement in homogeneous coordinates [x, y, 1]
		H: Measurement model matrix
		M_model: Measurement noise model
		initial: Values
		state_index: Current state index

	Returns:
		X_update: Updated state
		Sigma_update: Updated covariance
	"""
	# Prior factor doesnt work because it computes error = measurement - predicted measurement?
	# The innovation is computed in the body frame
	# The correction is applied through the exponential map
	# The uncertainty propagation preserves the left-invariant structure

	M = M_model.covariance()
	V = X_predict.transformTo(gtsam.Point2(z_vec[0], z_vec[1])) - np.array([0, 0])

	if verbose_logging:
		print("\nInnovation vector V:\n", V)

	# Compute innovation covariance
	R = X_predict.rotation().matrix()
	S = H @ Sigma_predict @ H.T + R[:2,:2].T @ M[:2,:2] @ R[:2,:2]
	S_inv = np.linalg.inv(S)

	if verbose_logging:
		print("Innovation covariance inverse S^(-1):\n", S_inv)

	# Compute Kalman gain
	# For InEKF:
	#   K = Σ^l H^lT S^(-1)
	#   X = X exp(-Kv^l)

	K = Sigma_predict @ H.T @ S_inv
	if verbose_logging:
		print("Kalman gain K:\n", K)

	# Compute state correction
	update_correction = K @ V
	gtsam_correction = np.array([update_correction[1], update_correction[2], update_correction[0]])

	if verbose_logging:
		print("State correction K*V:\n", gtsam_correction)

	# Apply correction
	X_update = X_predict.compose(gtsam.Pose2.Expmap(gtsam_correction))

	# Update covariance
	I = np.eye(3)
	Sigma_update = (I - K @ H) @ Sigma_predict

	# Always print the final state and covariance regardless of verbose_logging
	print("\nState after update:")
	print(X_update)
	print("Left-invariant covariance after update:")
	print(Sigma_update)

	# Update values
	initial.update(symbol('x', state_index), X_update)

	return X_update, Sigma_update



# Initialize NonlinearFactorGraph for the full trajectory
graph = gtsam.NonlinearFactorGraph()
initial = gtsam.Values()

# Initialize state X0 at origin
X_initial = gtsam.Pose2(0, 0, 0)
initial.insert(symbol('x', 0), X_initial)

# Add prior on X0 with uncertainty
prior_sigmas = np.array([np.sqrt(0.1), np.sqrt(0.1), np.sqrt(0.1)])
prior_noise = gtsam.noiseModel.Diagonal.Sigmas(prior_sigmas)
graph.add(gtsam.PriorFactorPose2(symbol('x', 0), X_initial, prior_noise))

# Process noise model
process_sigmas = np.array([np.sqrt(0.001), np.sqrt(0.05), np.sqrt(0.05)])
Q_model = gtsam.noiseModel.Diagonal.Sigmas(process_sigmas)

# Measurement noise model
meas_sigmas = np.array([0.1, 0.1, 0.1])
M_model = gtsam.noiseModel.Diagonal.Sigmas(meas_sigmas)

# Create H matrix for SE(2) measurement model
H = np.zeros((2, 3))
H[0, 1] = 1.0
H[1, 2] = 1.0

# ------ PREDICT STEP 1 ------
odometry = gtsam.Pose2(1, 1, 0.5)
X_predict, Sigma_predict = predict(X_initial, prior_noise.covariance(),
                                 odometry, Q_model, graph, initial, 1)

# ------ UPDATE STEP 1 ------
z_vec = np.array([1, 0, 1])  # Measurement in homogeneous coordinates
X_update, Sigma_update = update(X_predict, Sigma_predict, z_vec,
                              H, M_model, initial, 1)

# ------ PREDICT STEP 2 ------
odometry2 = gtsam.Pose2(1, 1, 0)
X_predict2, Sigma_predict2 = predict(X_update, Sigma_update,
                                   odometry2, Q_model, graph, initial, 2)

# ------ UPDATE STEP 2 ------
z2_vec = np.array([1, 1, 1])  # Position at (1,1) with homogeneous coordinate
X_update2, Sigma_update2 = update(X_predict2, Sigma_predict2, z2_vec,
                                H, M_model, initial, 2)

correct_final_state = np.array([1.08214, 1.01055, 0.37741])
correct_final_sigma = np.array([
		[0.01854011, -0.00273738, 0.00289377],
		[-0.00273738, 0.00896274, -0.0004273],
		[0.00289377, -0.0004273, 0.00901037]
	])


# Assert that the values match the expected results
np.testing.assert_allclose(
	np.array([X_update2.x(), X_update2.y(), X_update2.theta()]),
	correct_final_state,
	rtol=1e-5
)

np.testing.assert_allclose(
	Sigma_update2,
	correct_final_sigma,
	rtol=1e-5
)

print("\nAll tests passed!")