# EKF-SLAM using Factor Graphs (SRIF-like)

GTSAM Copyright 2010-2023, Georgia Tech Research Corporation,
Atlanta, Georgia 30332-0415
All Rights Reserved
Authors: Frank Dellaert, et al. (see THANKS for the full author list)

See LICENSE for the license information

<a href="https://colab.research.google.com/github/borglab/gtsam/blob/develop/python/examples/EKF_SLAM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This notebook demonstrates 2D SLAM using GTSAM's factor graph tools in an **iterative, filter-like manner**, similar in structure to a Square Root Information Filter (SRIF) or an EKF, but leveraging factor graph elimination instead of explicit matrix operations.

**Scenario:**
A robot moves in a circular path within a 10x10 meter area containing randomly placed landmarks. It receives noisy odometry and bearing-range measurements.

**Factor Graph Filtering Approach:**
Instead of maintaining a large state vector and covariance matrix explicitly (like in classic EKF), we use GTSAM's `NonlinearFactorGraph` and `GaussianFactorGraph`:

1.  **Belief Representation:** The robot's belief (estimated state and uncertainty) after each step is implicitly represented by a factor graph (or the resulting Bayes Net after elimination).
2.  **Prediction:**
    *   Start with the factors representing the belief from the *previous* step.
    *   Add a new `BetweenFactorPose2` representing the noisy odometry measurement, connecting the previous pose `X(k)` to the new pose `X(k+1)`.
    *   Linearize all relevant factors around the current best estimate (`Values`).
    *   Eliminate the resulting `GaussianFactorGraph` using an ordering that marginalizes out the previous pose `X(k)`, yielding a `GaussianBayesNet` representing the *predictive* distribution over landmarks and `X(k+1)`.
3.  **Update:**
    *   Start with the factors representing the *predictive* belief.
    *   For each landmark measurement at the new pose `X(k+1)`:
        *   If the landmark `L(j)` is new, add it to the `Values` (initial estimate).
        *   Add a `BearingRangeFactor2D` connecting `X(k+1)` and `L(j)`.
    *   Linearize new/updated factors.
    *   Eliminate the `GaussianFactorGraph` to incorporate the measurement information, yielding a `GaussianBayesNet` representing the *posterior* belief.
    *   Optimize the final Bayes Net to get the updated state estimate (`Values`).
4.  **Iteration:** Repeat prediction and update for each time step.

**Advantages:**
*   Leverages GTSAM's robust factor graph machinery.
*   Handles non-linearities through iterative linearization (like EKF, but potentially more robust within GTSAM's optimization context).
*   Avoids explicit management of large, dense covariance matrices.

**Output:** The process will be visualized using a Plotly animation showing the robot's estimated path, mapped landmarks, and uncertainty ellipses evolving over time.

## 1. Setup and Imports

In [None]:
# Install GTSAM and Plotly from pip if running in Google Colab
try:
    import google.colab
    %pip install --quiet gtsam-develop plotly
except ImportError:
    pass # Not in Colab

In [None]:
import numpy as np
import matplotlib.pyplot as plt # For initial plot if desired
import plotly.graph_objects as go
from tqdm.notebook import tqdm # Progress bar
import math

import gtsam
from gtsam.symbol_shorthand import X, L # Symbols for poses and landmarks

## 2. Simulation Parameters

In [None]:
# World parameters
NUM_LANDMARKS = 15
WORLD_SIZE = 10.0 # Environment bounds [-WORLD_SIZE/2, WORLD_SIZE/2]

# Robot parameters
ROBOT_RADIUS = 3.0
ROBOT_ANGULAR_VEL = np.deg2rad(20.0) # Radians per step
NUM_STEPS = 50 # Reduced steps for faster animation generation
DT = 1.0 # Time step duration (simplified)

# Noise parameters (GTSAM Noise Models)
# Prior noise on the first pose (x, y, theta)
PRIOR_NOISE = gtsam.noiseModel.Diagonal.Sigmas(np.array([0.1, 0.1, np.deg2rad(1.0)]))
# Odometry noise (dx, dy, dtheta)
ODOMETRY_NOISE = gtsam.noiseModel.Diagonal.Sigmas(np.array([0.1, 0.05, np.deg2rad(2.0)]))
# Measurement noise (bearing, range)
MEASUREMENT_NOISE = gtsam.noiseModel.Diagonal.Sigmas(np.array([np.deg2rad(2.0), 0.2]))

# Create samplers for noise models
prior_noise_sampler = gtsam.Sampler(PRIOR_NOISE, 42)
odometry_noise_sampler = gtsam.Sampler(ODOMETRY_NOISE, 42)
measurement_noise_sampler = gtsam.Sampler(MEASUREMENT_NOISE, 42)

# Sensor parameters
MAX_SENSOR_RANGE = 5.0

## 3. Generate Ground Truth Data

Create true landmark positions, robot path, and simulate noisy measurements.

In [None]:
# 3.1 Ground Truth Landmarks
landmarks_gt = (np.random.rand(2, NUM_LANDMARKS) - 0.5) * WORLD_SIZE
landmark_ids = list(range(NUM_LANDMARKS))
landmarks_gt_dict = {L(i): gtsam.Point2(landmarks_gt[:, i]) for i in range(NUM_LANDMARKS)}

# 3.2 Ground Truth Robot Path
poses_gt = []
current_pose_gt = gtsam.Pose2(ROBOT_RADIUS, 0, np.pi / 2) # Start on circle edge
poses_gt.append(current_pose_gt)

for k in range(NUM_STEPS):
    delta_theta = ROBOT_ANGULAR_VEL * DT
    arc_length = ROBOT_ANGULAR_VEL * ROBOT_RADIUS * DT
    motion_command = gtsam.Pose2(arc_length, 0, delta_theta)
    current_pose_gt = current_pose_gt.compose(motion_command)
    poses_gt.append(current_pose_gt)

# 3.3 Simulate Noisy Odometry Measurements (as Pose2 objects)
odometry_measurements = []
for k in range(NUM_STEPS):
    pose_k = poses_gt[k]
    pose_k1 = poses_gt[k+1]
    true_odom = pose_k.between(pose_k1)
    
    # Sample noise directly for Pose2 composition (approximate)
    # A more rigorous approach samples from the tangent space
    odom_noise_vec = odometry_noise_sampler.sample()
    noisy_odom = true_odom.compose(gtsam.Pose2(odom_noise_vec[0], odom_noise_vec[1], odom_noise_vec[2]))
    odometry_measurements.append(noisy_odom)

# 3.4 Simulate Noisy Bearing-Range Measurements
# measurements_sim[k] = list of (L(lm_id), range, bearing) for measurements *at* pose k
measurements_sim = [[] for _ in range(NUM_STEPS + 1)]
for k in range(NUM_STEPS + 1):
    robot_pose = poses_gt[k]
    for lm_id in range(NUM_LANDMARKS):
        lm_gt_pt = landmarks_gt_dict[L(lm_id)]
        try:
            # Use BearingRangeFactor2D to simulate the measurement model
            measurement_factor = gtsam.BearingRangeFactor2D(X(k), L(lm_id), 
                                                            robot_pose.bearing(lm_gt_pt),
                                                            robot_pose.range(lm_gt_pt), 
                                                            MEASUREMENT_NOISE)
            true_range = measurement_factor.measured().range()
            true_bearing = measurement_factor.measured().bearing()

            if true_range <= MAX_SENSOR_RANGE and abs(true_bearing.theta())<np.pi/4:
                # Sample noise
                noise_vec = measurement_noise_sampler.sample()
                noisy_bearing = true_bearing.retract(np.array([noise_vec[0]])) # Retract on SO(2)
                noisy_range = true_range + noise_vec[1]
                
                if noisy_range > 0: # Ensure range is positive
                   measurements_sim[k].append((L(lm_id), noisy_bearing, noisy_range))
        except Exception as e:
            # Catch potential errors like point being too close to the pose
            print(f"Sim Warning at step {k}, landmark {lm_id}: {e}")
            pass

print(f"Generated {NUM_LANDMARKS} landmarks.")
print(f"Generated {NUM_STEPS} ground truth poses and odometry measurements.")
num_meas_total = sum(len(m_list) for m_list in measurements_sim)
print(f"Generated {num_meas_total} bearing-range measurements across all steps.")

## 4. Iterative Factor Graph SLAM Implementation

Here we perform the step-by-step prediction and update using factor graphs.

In [None]:
# --- Initialization ---
current_estimate = gtsam.Values() # Stores current best estimates (linearization points)
current_graph = gtsam.NonlinearFactorGraph() # Stores factors added so far
current_pose_key = X(0)

# Add prior on the initial pose X(0)
initial_pose = poses_gt[0] # Start at ground truth (or add noise)
current_estimate.insert(current_pose_key, initial_pose)
current_graph.add(gtsam.PriorFactorPose2(current_pose_key, initial_pose, PRIOR_NOISE))

# Variables to store results for animation
results_history = [current_estimate] # Store Values object at each step
marginals_history = [] # Store Marginals object at each step
known_landmarks = set() # Set of landmark keys L(j) currently in the state

# Initial marginals (only for X(0))
try:
    initial_gaussian_graph = current_graph.linearize(current_estimate)
    initial_marginals = gtsam.Marginals(initial_gaussian_graph, current_estimate)
    marginals_history.append(initial_marginals)
except Exception as e:
    print(f"Error computing initial marginals: {e}")
    marginals_history.append(None) # Append placeholder if fails


### Main Iterative Loop

In [None]:
print("Running Iterative Factor Graph SLAM...")

for k in tqdm(range(NUM_STEPS)):
    # === Prediction Step ===
    prev_pose_key = X(k)
    current_pose_key = X(k + 1)
    odom_k = odometry_measurements[k]
    
    # Predict next pose and add to Values (linearization point)
    predicted_pose = current_estimate.atPose2(prev_pose_key).compose(odom_k)
    current_estimate.insert(current_pose_key, predicted_pose)
    
    # Add odometry factor to the graph
    current_graph.add(gtsam.BetweenFactorPose2(prev_pose_key, current_pose_key, odom_k, ODOMETRY_NOISE))

    # --- At this point, the full graph contains the history --- 
    # --- To mimic SRIF/EKF, we *could* eliminate prev_pose_key, but --- 
    # --- it's simpler here to keep the full graph and optimize it --- 
    # --- then extract the current belief for the next step's factors --- 
    # --- Let's proceed with adding measurements first ----

    # === Update Step ===
    measurements_k1 = measurements_sim[k + 1] # Measurements taken AT pose k+1
    landmarks_observed_this_step = set()

    for lm_key, measured_bearing, measured_range in measurements_k1:
        landmarks_observed_this_step.add(lm_key)
        
        # If landmark is new, initialize its estimate
        if not current_estimate.exists(lm_key):
            # Initial guess based on current pose estimate and measurement
            current_pose_val = current_estimate.atPose2(current_pose_key)
            delta_x = measured_range * math.cos(measured_bearing.theta())
            delta_y = measured_range * math.sin(measured_bearing.theta())
            # Transform delta from robot frame to world frame
            lm_initial_guess = current_pose_val.transformFrom(gtsam.Point2(delta_x, delta_y))
            current_estimate.insert(lm_key, lm_initial_guess)
            known_landmarks.add(lm_key)
            # print(f"Step {k+1}: Initialized Landmark {lm_key.index()}")

        # Add measurement factor
        current_graph.add(gtsam.BearingRangeFactor2D(current_pose_key, lm_key, 
                                                     measured_bearing, measured_range,
                                                     MEASUREMENT_NOISE))
    
    # --- Optimization (Incremental Smoothing) ---
    # In a true filter, we'd eliminate previous states.
    # Here, we re-optimize the growing graph. For efficiency in large problems, 
    # iSAM2 would be used, but Levenberg-Marquardt on the growing graph 
    # demonstrates the concept for this smaller example.
    
    optimizer = gtsam.LevenbergMarquardtOptimizer(current_graph, current_estimate)
    try:
        current_estimate = optimizer.optimize() # Update estimates by optimizing the whole graph so far
    except Exception as e:
        print(f"Optimization failed at step {k+1}: {e}")
        # Potentially revert estimate update or handle error
        # For simplicity, we continue with the potentially unoptimized estimate
        pass 

    # Store results for animation
    results_history.append(gtsam.Values(current_estimate)) # Store a copy
    
    # Calculate marginals for visualization (can be slow for large graphs)
    try:
        current_gaussian_graph = current_graph.linearize(current_estimate)
        current_marginals = gtsam.Marginals(current_gaussian_graph, current_estimate)
        marginals_history.append(current_marginals)
    except Exception as e:
        print(f"Marginals calculation failed at step {k+1}: {e}")
        marginals_history.append(None) # Append placeholder

print("Iterative Factor Graph SLAM finished.")
print(f"Final number of poses: {k+2}")
print(f"Number of landmarks mapped: {len(known_landmarks)}")

## 5. Create Plotly Animation

In [None]:
print("Generating Plotly animation...")

def ellipse_path(cx, cy, sizex, sizey, angle, N=60):
    """SVG path string for an ellipse centered at (cx, cy), rotated by `angle` in degrees."""
    angle_rad = np.radians(angle)
    t = np.linspace(0, 2 * np.pi, N)
    x = (sizex / 2) * np.cos(t)
    y = (sizey / 2) * np.sin(t)

    x_rot = cx + x * np.cos(angle_rad) - y * np.sin(angle_rad)
    y_rot = cy + x * np.sin(angle_rad) + y * np.cos(angle_rad)

    path = f"M {x_rot[0]},{y_rot[0]} " + " ".join(
        f"L{x_},{y_}" for x_, y_ in zip(x_rot[1:], y_rot[1:])
    ) + " Z"
    return path

# Helper to convert GTSAM covariance to Plotly ellipse parameters
def gtsam_cov_to_plotly_ellipse(cov_matrix, scale=2.0):
    """Calculates ellipse parameters (angle, width, height) from a 2x2 covariance matrix."""
    # Ensure positive definite - add small epsilon if needed
    cov = cov_matrix[:2, :2] + np.eye(2) * 1e-9 
    try:
        eigvals, eigvecs = np.linalg.eigh(cov)
        # Ensure eigenvalues are positive for sqrt
        eigvals = np.maximum(eigvals, 1e-9)
    except np.linalg.LinAlgError:
        print("Warning: Covariance matrix SVD failed, using default ellipse.")
        return 0, 0.1*scale, 0.1*scale # Default small ellipse
        
    # Width/Height are 2*scale*sqrt(eigenvalue)
    width = 2 * scale * np.sqrt(eigvals[1])
    height = 2 * scale * np.sqrt(eigvals[0])
    
    # Angle of the major axis (corresponding to largest eigenvalue)
    angle_rad = np.arctan2(eigvecs[1, 1], eigvecs[0, 1])
    angle_deg = np.degrees(angle_rad)
    
    return angle_deg, width, height

# --- Create Plotly Figure --- 
fig = go.Figure()

# Add Ground Truth Landmarks (static)
fig.add_trace(go.Scatter(
    x=landmarks_gt[0, :], 
    y=landmarks_gt[1, :], 
    mode='markers', 
    marker=dict(color='black', size=8, symbol='star'),
    name='Landmarks GT'
))

# Add Ground Truth Path (static)
gt_path_x = [p.x() for p in poses_gt]
gt_path_y = [p.y() for p in poses_gt]
fig.add_trace(go.Scatter(
    x=gt_path_x,
    y=gt_path_y,
    mode='lines',
    line=dict(color='gray', width=1, dash='dash'),
    name='Path GT'
))

# --- Animation Frames --- 
frames = []
steps = list(range(NUM_STEPS + 1))

for k in tqdm(steps):
    frame_data = []
    step_results = results_history[k]
    step_marginals = marginals_history[k]
    
    # Estimated Path up to step k
    est_path_x = [results_history[i].atPose2(X(i)).x() for i in range(k + 1)]
    est_path_y = [results_history[i].atPose2(X(i)).y() for i in range(k + 1)]
    frame_data.append(go.Scatter(
        x=est_path_x,
        y=est_path_y,
        mode='lines+markers',
        line=dict(color='red', width=2),
        marker=dict(size=4, color='red'),
        name='Path Est'
    ))
    
    # Estimated Landmarks known at step k
    est_landmarks_x = []
    est_landmarks_y = []
    landmark_keys_in_frame = []
    for lm_key in step_results.keys():
        if gtsam.symbolChr(lm_key)==108: # Check if it's a landmark key
            lm_pose = step_results.atPoint2(lm_key)
            est_landmarks_x.append(lm_pose[0])
            est_landmarks_y.append(lm_pose[1])
            landmark_keys_in_frame.append(lm_key)
            
    if est_landmarks_x:
        frame_data.append(go.Scatter(
            x=est_landmarks_x, 
            y=est_landmarks_y, 
            mode='markers',
            marker=dict(color='blue', size=6, symbol='x'),
            name='Landmarks Est'
        ))

    # Current Pose Covariance Ellipse
    shapes = [] # List to hold ellipse shapes for this frame
    if step_marginals is not None:
        try:
            current_pose_key = X(k)
            pose_cov = step_marginals.marginalCovariance(current_pose_key)
            pose_mean = step_results.atPose2(current_pose_key).translation()
            angle, width, height = gtsam_cov_to_plotly_ellipse(pose_cov)
            cx, cy = pose_mean[0], pose_mean[1]
            shapes.append(dict(
                type="path",
                path=ellipse_path(cx=cx, cy=cy, sizex=width, sizey=height, angle=angle, N=60),
                xref="x", yref="y",
                fillcolor="rgba(255,0,255,0.2)",
                line_color="rgba(255,0,255,0.5)",
                name=f'Pose {k} Cov'
            ))
        except Exception as e:
            print(f"Warning: Failed getting pose cov ellipse at step {k}: {e}")

        # Landmark Covariance Ellipses
        for lm_key in landmark_keys_in_frame:
            index = gtsam.symbolIndex(lm_key)
            try:
                lm_cov = step_marginals.marginalCovariance(lm_key)
                lm_mean = step_results.atPoint2(lm_key)
                angle, width, height = gtsam_cov_to_plotly_ellipse(lm_cov)
                cx, cy = lm_mean[0], lm_mean[1]
                shapes.append(dict(
                    type="path",
                    path=ellipse_path(cx=cx, cy=cy, sizex=width, sizey=height, angle=angle, N=60),
                    xref="x", yref="y",
                    fillcolor="rgba(0,0,255,0.1)",
                    line_color="rgba(0,0,255,0.3)",
                    name=f'LM {index} Cov'
                ))
            except Exception as e:
                 print(f"Warning: Failed getting landmark {index} cov ellipse at step {k}: {e}")

    frames.append(go.Frame(data=frame_data, name=str(k), layout=go.Layout(shapes=shapes)))

# --- Set Initial State and Layout --- 
fig.update(frames=frames)

# Set initial data to the first frame's data
if frames:
    fig.add_traces(frames[0].data)
    initial_shapes = frames[0].layout.shapes if frames[0].layout else []
else:
    initial_shapes = []

# Define slider
sliders = [dict(
    active=0,
    currentvalue={"prefix": "Step: "},
    pad={"t": 50},
    steps=[dict(label=str(k), 
                method='animate', 
                args=[[str(k)], 
                      dict(mode='immediate', 
                           frame=dict(duration=100, redraw=True), 
                           transition=dict(duration=0))])
           for k in steps]
)]

# Update layout
fig.update_layout(
    title='Iterative Factor Graph SLAM Animation',
    xaxis=dict(range=[-WORLD_SIZE/2 - 2, WORLD_SIZE/2 + 2], constrain='domain'),
    yaxis=dict(range=[-WORLD_SIZE/2 - 2, WORLD_SIZE/2 + 2], scaleanchor='x', scaleratio=1),
    width=800, height=800,
    hovermode='closest',
    updatemenus=[dict(type='buttons', 
                      showactive=False,
                      buttons=[dict(label='Play',
                                  method='animate', 
                                  args=[None, 
                                        dict(mode='immediate', 
                                             frame=dict(duration=100, redraw=True), 
                                             transition=dict(duration=0), 
                                             fromcurrent=True)])])],
    sliders=sliders,
    shapes=initial_shapes # Set initial shapes
)

print("Displaying animation...")
fig.show()

## 6. Discussion

*   **Approach:** This notebook implemented SLAM iteratively using GTSAM factor graphs. At each step, new factors (odometry, measurements) were added, and the graph was re-optimized using Levenberg-Marquardt. This is more akin to **incremental smoothing** than a classic filter (which would explicitly marginalize past states).
*   **Efficiency:** Optimizing the entire graph at every step becomes computationally expensive for long trajectories. For real-time performance or large problems, **iSAM2 (Incremental Smoothing and Mapping)** is the preferred GTSAM algorithm. iSAM2 efficiently updates the solution by only re-linearizing and re-solving parts of the graph affected by new measurements.
*   **Accuracy vs. EKF:** This factor graph approach generally handles non-linearities better than a standard EKF because it re-linearizes during optimization. It avoids some of the consistency pitfalls associated with the EKF's single linearization point per step.
*   **Visualization:** The Plotly animation shows the evolution of the robot's path estimate, the map of landmarks, and their associated uncertainties (covariance ellipses). You can observe how measurements help refine the estimates and reduce uncertainty, especially when loops are closed (implicitly here through repeated observations of landmarks).