<a href="https://colab.research.google.com/github/mugalan/classical-mechanics-from-a-geometric-point-of-view/blob/main/rigid-body-control/RigidBodyIntinsicEKF_DHSM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Rigid Body Intrinsic EKF

In [None]:
import numpy as np
import scipy as sp
from scipy.integrate import odeint
import math
from numpy import linalg
import sympy
from sympy import symbols
from sympy import *

import plotly.graph_objects as go
import plotly.express as px
from sympy.physics.mechanics import dynamicsymbols, init_vprinting
from IPython.display import display, Math, Latex

In [None]:
import numpy as np

class KFShapeError(ValueError):
    pass

class KFValueError(ValueError):
    pass

class LinearKF:
    """
    Linear-Gaussian Kalman Filter utilities focused on the measurement update.

    Notation (matching your equations):
        m_k^- : predicted mean      (n,)
        P_k^- : predicted covariance (n,n)
        H_k   : measurement matrix   (p,n)
        y_k   : measurement vector   (p,)
        Σ_m   : measurement noise covariance R (p,p)

    Update:
        K_k = P_k^- H_k^T (H_k P_k^- H_k^T + Σ_m)^{-1}
        m_k = m_k^- + K_k (y_k - H_k m_k^-)
        P_k = (I - K_k H_k) P_k^-      (or Joseph form)
    """

    def __init__(self, *, use_joseph: bool = True, symmetrize: bool = True, atol_sym: float = 1e-10):
        """
        Parameters
        ----------
        use_joseph : bool
            Use Joseph-stabilized covariance update (default True).
        symmetrize : bool
            Force symmetry of P_k at the end via (P + P^T)/2 (default True).
        atol_sym : float
            Tolerance for symmetry checks.
        """
        self.use_joseph = use_joseph
        self.symmetrize = symmetrize
        self.atol_sym = atol_sym

    # ---------- public API ----------

    def measurement_update(self, m_pred, P_pred, H, y, R):
        """
        Compute K_k, m_k, P_k given m_k^-, P_k^-, H_k, y_k, Σ_m (=R).

        Returns
        -------
        m_upd : (n,)
        P_upd : (n,n)
        K     : (n,p)
        S     : (p,p)   innovation covariance = H P^- H^T + R
        """
        m_pred, P_pred, H, y, R, n, p = self._validate_shapes(m_pred, P_pred, H, y, R)

        # Symmetry checks and SPD requirements
        self._assert_symmetric(P_pred, "P_pred", atol=self.atol_sym)
        self._assert_symmetric(R, "R", atol=self.atol_sym)
        _ = self._assert_spd(R, "R")  # R must be SPD

        # Innovation covariance S = H P^- H^T + R
        HP = H @ P_pred               # (p,n)
        S = HP @ H.T + R              # (p,p)
        try:
            Ls = np.linalg.cholesky(S)
        except np.linalg.LinAlgError as e:
            raise KFValueError(
                f"Innovation covariance `S` is not SPD. "
                f"Check H, P_pred, and R. Cholesky failed: {e}"
            )

        # Gain K = P^- H^T S^{-1} without explicit inverse
        U = np.linalg.solve(Ls, HP)   # (p,n)
        X = np.linalg.solve(Ls.T, U)  # (p,n)
        K = X.T                       # (n,p)

        # Residual
        v = y - H @ m_pred            # (p,)

        # Updated mean
        m_upd = m_pred + K @ v        # (n,)

        # Updated covariance
        I = np.eye(n)
        if self.use_joseph:
            I_KH = (I - K @ H)
            P_upd = I_KH @ P_pred @ I_KH.T + K @ R @ K.T
        else:
            P_upd = (I - K @ H) @ P_pred

        if self.symmetrize:
            P_upd = 0.5 * (P_upd + P_upd.T)

        return m_upd, P_upd, K, S

    def innovation(self, m_pred, P_pred, H, R, y=None):
        """
        Compute innovation covariance S = H P^- H^T + R and (optionally) residual v.

        Returns
        -------
        (v, S) if y is given; otherwise just S.
        """
        # Validate without using y unless provided
        m_pred = self._as_1d(m_pred, "m_pred")
        P_pred = self._as_2d(P_pred, "P_pred")
        H = self._as_2d(H, "H")
        R = self._as_2d(R, "R")

        n = m_pred.shape[0]
        if P_pred.shape != (n, n):
            raise KFShapeError(f"`P_pred` must have shape {(n, n)}; got {P_pred.shape}.")
        if H.shape[1] != n:
            raise KFShapeError(f"`H` second dim must be {n}; got {H.shape}.")

        p = H.shape[0]
        if R.shape != (p, p):
            raise KFShapeError(f"`R` must have shape {(p, p)}; got {R.shape}.")

        self._assert_symmetric(P_pred, "P_pred", atol=self.atol_sym)
        self._assert_symmetric(R, "R", atol=self.atol_sym)
        _ = self._assert_spd(R, "R")

        S = H @ P_pred @ H.T + R
        if y is None:
            return S
        y = self._as_1d(y, "y")
        if y.shape[0] != p:
            raise KFShapeError(f"`y` length must be {p}; got {y.shape}.")
        v = y - H @ m_pred
        return v, S

    def set_options(self, *, use_joseph=None, symmetrize=None, atol_sym=None):
        """Update instance options."""
        if use_joseph is not None:
            self.use_joseph = bool(use_joseph)
        if symmetrize is not None:
            self.symmetrize = bool(symmetrize)
        if atol_sym is not None:
            self.atol_sym = float(atol_sym)

    # ---------- helpers (static) ----------

    @staticmethod
    def _as_1d(x, name):
        x = np.asarray(x, dtype=float)
        if x.ndim == 2 and x.shape[1] == 1:
            x = x[:, 0]
        if x.ndim != 1:
            raise KFShapeError(f"`{name}` must be a 1D vector of shape (n,) or (n,1); got {x.shape}.")
        return x

    @staticmethod
    def _as_2d(x, name):
        x = np.asarray(x, dtype=float)
        if x.ndim != 2:
            raise KFShapeError(f"`{name}` must be a 2D array; got {x.ndim}D with shape {x.shape}.")
        return x

    @staticmethod
    def _assert_symmetric(M, name, atol=1e-10):
        if M.shape[0] != M.shape[1]:
            raise KFShapeError(f"`{name}` must be square; got {M.shape}.")
        if not np.allclose(M, M.T, atol=atol):
            raise KFValueError(f"`{name}` must be symmetric within atol={atol}.")

    @staticmethod
    def _assert_spd(M, name):
        try:
            return np.linalg.cholesky(M)
        except np.linalg.LinAlgError as e:
            raise KFValueError(f"`{name}` must be symmetric positive definite (SPD). Cholesky failed: {e}")

    # ---------- internal ----------

    def _validate_shapes(self, m_pred, P_pred, H, y, R):
        m_pred = self._as_1d(m_pred, "m_pred")
        y = self._as_1d(y, "y")
        P_pred = self._as_2d(P_pred, "P_pred")
        H = self._as_2d(H, "H")
        R = self._as_2d(R, "R")

        n = m_pred.shape[0]
        if P_pred.shape != (n, n):
            raise KFShapeError(f"`P_pred` must have shape {(n, n)} to match `m_pred`; got {P_pred.shape}.")
        if H.shape[1] != n:
            raise KFShapeError(f"Second dim of `H` must equal len(m_pred)={n}; got H.shape={H.shape}.")
        p = H.shape[0]
        if y.shape[0] != p:
            raise KFShapeError(f"`y` length must match number of rows in `H` (p={p}); got y.shape={y.shape}.")
        if R.shape != (p, p):
            raise KFShapeError(f"`R` must have shape {(p, p)}; got {R.shape}.")
        return m_pred, P_pred, H, y, R, n, p

In [None]:
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import numpy as np


class LDSShapeError(ValueError): ...
class LDSValueError(ValueError): ...

class LinearGaussianSystemSyms:
    """
    Discrete-time linear Gaussian system:
        x_k = A_k x_{k-1} + w_{k-1},   w_{k-1} ~ N(0, Σ_p)
        y_k = H_k x_k       + z_k,     z_k     ~ N(0, Σ_m)
    """

    def __init__(self, A, H, Sigma_p, Sigma_m, x0=None, rng=None):
        self.A = self._as_2d(A, "A")
        self.H = self._as_2d(H, "H")
        self.Sigma_p = self._as_2d(Sigma_p, "Sigma_p")
        self.Sigma_m = self._as_2d(Sigma_m, "Sigma_m")

        n = self.A.shape[0]
        p = self.H.shape[0]

        # Shape checks
        if self.A.shape != (n, n):
            raise LDSShapeError(f"`A` must be (n,n); got {self.A.shape}.")
        if self.H.shape[1] != n:
            raise LDSShapeError(f"`H` must be (p,n) with n={n}; got {self.H.shape}.")
        if self.Sigma_p.shape != (n, n):
            raise LDSShapeError(f"`Sigma_p` must be (n,n); got {self.Sigma_p.shape}.")
        if self.Sigma_m.shape != (p, p):
            raise LDSShapeError(f"`Sigma_m` must be (p,p); got {self.Sigma_m.shape}.")

        # Symmetry + SPD
        self._assert_symmetric(self.Sigma_p, "Sigma_p")
        self._assert_symmetric(self.Sigma_m, "Sigma_m")
        self.Lp = self._assert_spd(self.Sigma_p, "Sigma_p")  # Cholesky factors for sampling
        self.Lm = self._assert_spd(self.Sigma_m, "Sigma_m")

        self.n, self.p = n, p
        self.rng = rng if isinstance(rng, np.random.Generator) else np.random.default_rng(rng)
        self.x = self._as_1d(x0, "x0") if x0 is not None else np.zeros(n)

    # ---------- stepping ----------

    def step_state(self):
        """x_k = A x_{k-1} + w_{k-1}  (returns x_k)"""
        w = self.Lp @ self.rng.standard_normal(self.n)
        self.x = self.A @ self.x + w
        return self.x

    def step_measurement(self, x=None):
        """y_k = H x_k + z_k  (returns y_k)"""
        xk = self._as_1d(x, "x") if x is not None else self.x
        z = self.Lm @ self.rng.standard_normal(self.p)
        return self.H @ xk + z

    def step(self):
        """Advance one step and return (x_k, y_k)."""
        xk = self.step_state()
        yk = self.step_measurement(xk)
        return xk, yk

    def simulate(self, T):
        """Run T steps; returns X (T,n) and Y (T,p)."""
        X = np.zeros((T, self.n))
        Y = np.zeros((T, self.p))
        for k in range(T):
            X[k], Y[k] = self.step()
        return X, Y

    # ---------- helpers ----------
    @staticmethod
    def _as_2d(M, name):
        M = np.asarray(M, dtype=float)
        if M.ndim != 2:
            raise LDSShapeError(f"`{name}` must be 2D; got shape {M.shape}.")
        return M

    @staticmethod
    def _as_1d(v, name):
        v = np.asarray(v, dtype=float)
        if v.ndim == 2 and v.shape[1] == 1:
            v = v[:, 0]
        if v.ndim != 1:
            raise LDSShapeError(f"`{name}` must be 1D; got shape {v.shape}.")
        return v

    @staticmethod
    def _assert_symmetric(M, name, atol=1e-12):
        if M.shape[0] != M.shape[1]:
            raise LDSShapeError(f"`{name}` must be square; got {M.shape}.")
        if not np.allclose(M, M.T, atol=atol):
            raise LDSValueError(f"`{name}` must be symmetric within atol={atol}.")

    @staticmethod
    def _assert_spd(M, name):
        try:
            return np.linalg.cholesky(M)
        except np.linalg.LinAlgError as e:
            raise LDSValueError(f"`{name}` must be symmetric positive definite (SPD). Cholesky failed: {e}")

    def animate_measurement_gaussians_scalar(
        self,
        *,
        T: int | None = None,
        Y: np.ndarray | None = None,
        m0: np.ndarray | None = None,
        P0: np.ndarray | None = None,
        kf=None,
        frame_ms: int = 120,
        auto_play: bool = False,
        save_html_path: str | None = None,
        show: bool = True,
        return_fig: bool = False,
        component_label: str = "y"
    ):
        if self.p != 1:
            raise ValueError(f"This animation requires scalar measurements (p=1); got p={self.p}.")

        A = np.asarray(self.A, float)
        H = np.asarray(self.H, float).reshape(1, -1)    # (1,n)
        Q = np.asarray(self.Sigma_p, float)
        Rm = np.asarray(self.Sigma_m, float)
        R_scalar = float(np.atleast_2d(Rm)[0, 0])       # already a Python float

        n = self.n

        if Y is None:
            if T is None:
                raise ValueError("Provide either Y or T.")
            _, Y_sim = self.simulate(T)
            Y = np.asarray(Y_sim, float).reshape(-1)
        else:
            Y = np.asarray(Y, float).reshape(-1)
            T = Y.shape[0]

        m = np.zeros(n) if m0 is None else np.asarray(m0, float).reshape(-1)
        if m.shape[0] != n:
            raise ValueError(f"`m0` must have length n={n}; got {m.shape}.")
        P = (1e2 * np.eye(n)) if P0 is None else np.asarray(P0, float)
        if P.shape != (n, n):
            raise ValueError(f"`P0` must be (n,n)=({n},{n}); got {P.shape}.")

        if kf is None:
            kf = LinearKF(use_joseph=True, symmetrize=True)

        mu_pred = np.zeros(T)
        sig_pred = np.zeros(T)
        mu_post = np.zeros(T)
        sig_post = np.zeros(T)

        for k in range(T):
            # Predict
            m_pred = A @ m
            P_pred = A @ P @ A.T + Q

            # Predictive y ~ N(H m^-, H P^- H^T + R)
            mu_pred[k] = (H @ m_pred).item()                      # <-- .item()
            S_pred = (H @ P_pred @ H.T).item() + R_scalar         # <-- .item()
            sig_pred[k] = float(np.sqrt(S_pred)) if S_pred > 0 else 0.0

            # Update with robust KF
            m, P, _, _ = kf.measurement_update(m_pred, P_pred, H, np.array([Y[k]]), np.array([[R_scalar]]))

            # Posterior-predictive y ~ N(H m, H P H^T + R)
            mu_post[k] = (H @ m).item()                           # <-- .item()
            S_post = (H @ P @ H.T).item() + R_scalar              # <-- .item()
            sig_post[k] = float(np.sqrt(S_post)) if S_post > 0 else 0.0

        # ---- plotting (unchanged except uses mu_* / sig_* arrays) ----
        def _gauss_pdf(x, mu, sigma):
            if sigma <= 0:
                return np.zeros_like(x)
            return (1.0 / (np.sqrt(2.0*np.pi) * sigma)) * np.exp(-0.5 * ((x - mu)/sigma)**2)

        pred_lo = np.min(mu_pred - 4.0 * sig_pred)
        pred_hi = np.max(mu_pred + 4.0 * sig_pred)
        post_lo = np.min(mu_post - 4.0 * sig_post)
        post_hi = np.max(mu_post + 4.0 * sig_post)
        y_lo = float(np.min(Y)) - 1e-6
        y_hi = float(np.max(Y)) + 1e-6

        x_min = float(np.min([pred_lo, post_lo, y_lo]))
        x_max = float(np.max([pred_hi, post_hi, y_hi]))
        if not np.isfinite(x_min) or not np.isfinite(x_max) or x_max <= x_min:
            x_min, x_max = -3.0, 3.0

        xgrid = np.linspace(x_min, x_max, 700)

        frames = []
        for k in range(T):
            y_pred_pdf = _gauss_pdf(xgrid, mu_pred[k], sig_pred[k])
            y_post_pdf = _gauss_pdf(xgrid, mu_post[k], sig_post[k])
            ymax = float(max(y_pred_pdf.max(initial=0.0), y_post_pdf.max(initial=0.0)))

            frames.append(
                go.Frame(
                    name=str(k),
                    data=[
                        go.Scatter(x=xgrid, y=y_pred_pdf, mode="lines",
                                name="Predictive p(y_k|Y_{k-1})", showlegend=False),
                        go.Scatter(x=xgrid, y=y_post_pdf, mode="lines",
                                name="Posterior p(y_k|Y_k)", showlegend=False),
                        go.Scatter(x=[Y[k]], y=[ymax*0.9], mode="markers",
                                name=f"observed {component_label}_k", showlegend=False, marker=dict(size=8)),
                    ],
                    layout=go.Layout(
                        annotations=[
                            dict(
                                x=0.98, y=0.95, xref="paper", yref="paper",
                                xanchor="right", yanchor="top",
                                text=(f"k={k} | μ⁻={mu_pred[k]:.3f}, σ⁻={sig_pred[k]:.3f} "
                                    f"| μ={mu_post[k]:.3f}, σ={sig_post[k]:.3f}"),
                                showarrow=False, font=dict(size=12)
                            )
                        ]
                    )
                )
            )

        y_pred_pdf0 = _gauss_pdf(xgrid, mu_pred[0], sig_pred[0])
        y_post_pdf0 = _gauss_pdf(xgrid, mu_post[0], sig_post[0])
        ymax0 = float(max(y_pred_pdf0.max(initial=0.0), y_post_pdf0.max(initial=0.0)))

        fig = go.Figure(
            data=[
                go.Scatter(x=xgrid, y=y_pred_pdf0, mode="lines", name="Predictive p(y_k|Y_{k-1})"),
                go.Scatter(x=xgrid, y=y_post_pdf0, mode="lines", name="Posterior p(y_k|Y_k)"),
                go.Scatter(x=[Y[0]], y=[ymax0*0.9], mode="markers",
                        name=f"observed {component_label}_k", marker=dict(size=8)),
            ],
            layout=go.Layout(
                title="Scalar KF: Predictive vs Posterior-Predictive Measurement Gaussians",
                xaxis_title=component_label,
                yaxis_title="density",
                template="plotly_white",
                legend=dict(orientation="v", x=1.02, xanchor="left", y=0.5, yanchor="middle"),
                margin=dict(r=160, l=60, t=60, b=80),
                updatemenus=[
                    dict(
                        type="buttons", direction="left",
                        x=0.0, y=1.15, xanchor="left", yanchor="top",
                        buttons=[
                            dict(label="Play", method="animate",
                                args=[None, {"frame": {"duration": frame_ms, "redraw": True},
                                            "transition": {"duration": 0},
                                            "fromcurrent": True}]),
                            dict(label="Pause", method="animate",
                                args=[[None], {"frame": {"duration": 0, "redraw": False},
                                                "mode": "immediate",
                                                "transition": {"duration": 0}}]),
                        ],
                    )
                ],
                sliders=[
                    dict(
                        active=0, x=0.05, y=-0.12, xanchor="left", yanchor="top",
                        len=0.9,
                        currentvalue=dict(prefix="k = ", visible=True, xanchor="right"),
                        steps=[dict(method="animate",
                                    args=[[str(k)], {"mode": "immediate",
                                                    "frame": {"duration": 0, "redraw": True},
                                                    "transition": {"duration": 0}}],
                                    label=str(k)) for k in range(T)]
                    )
                ],
            ),
            frames=frames
        )

        if save_html_path:
            fig.write_html(save_html_path, include_plotlyjs="cdn", auto_play=auto_play)

        if show:
            fig.show()
        if return_fig:
            return fig

    def plot_y(self, Y, *, nbins=40, component_labels=None,
                        curve_samples=500, show=True, return_fig=False):
        """
        Plot all measurement components y[:, j] together in one figure as scatter traces.

        Parameters
        ----------
        Y : array_like, shape (T, p)
            Collected measurements over T steps.
        nbins, curve_samples : kept for API compatibility (not used).
        component_labels : list[str] | None
            Optional labels per y-dimension; defaults to ['y[0]', ..., 'y[p-1]'].
        show : bool, default True
            Call fig.show().
        return_fig : bool, default False
            If True, return the plotly Figure.

        Returns
        -------
        fig : plotly.graph_objects.Figure (only if return_fig=True)
        """
        Y = np.asarray(Y, dtype=float)
        if Y.ndim != 2:
            raise LDSShapeError(f"`Y` must be 2D with shape (T, p); got {Y.shape}.")
        T, p = Y.shape
        if p != self.p:
            raise LDSShapeError(f"`Y` second dim (p={p}) must match system p={self.p}.")

        labels = component_labels or [f"y[{j}]" for j in range(p)]
        t = np.arange(T)

        fig = go.Figure()

        for j in range(p):
            yj = Y[:, j]
            mask = np.isfinite(yj)
            if not np.any(mask):
                # If a component has no finite points, annotate and skip
                continue
            fig.add_trace(
                go.Scatter(
                    x=t[mask], y=yj[mask],
                    mode="lines+markers",
                    name=labels[j],
                    marker=dict(size=5),
                    line=dict(width=1),
                    hovertemplate="k=%{x}<br>y=%{y:.4g}<extra>" + labels[j] + "</extra>",
                )
            )

        fig.update_layout(
            title="Measurement Traces (all y components)",
            template="plotly_white",
            height=420 if p <= 3 else 480,
            legend=dict(
                orientation="v",
                x=1.02, xanchor="left",
                y=0.5,  yanchor="middle",
            ),
            margin=dict(r=160),
        )
        fig.update_xaxes(title_text="time step (k)")
        fig.update_yaxes(title_text="y components")

        if show:
            fig.show()
        if return_fig:
            return fig


    def filter_with_kf_and_plot(
        self,
        *,
        T: int | None = None,
        Y: np.ndarray | None = None,
        m0: np.ndarray | None = None,
        P0: np.ndarray | None = None,
        kf=None,  # instance of LinearKF; if None, a default LinearKF() is used
        component_labels: list[str] | None = None,
        nbins: int = 40,            # kept for signature compatibility (unused)
        curve_samples: int = 500,
        show: bool = True,
        return_fig: bool = False,
    ):
        """
        Run a Kalman filter over provided (or simulated) measurements and plot:
          (1) y_k (noisy) vs filtered estimate H m_k  [time series]
          (2) residual Gaussians ONLY (no histograms):
              - Gaussian Approx. (MLE mu/sigma from residuals e_k = y_k - H m_k)
              - Modeled Noise Gaussian N(0, diag(Sigma_m))

        Returns
        -------
        M : ndarray, shape (T, n)
            Filtered means m_k for k=1..T.
        Yhat : ndarray, shape (T, p)
            Filtered measurement estimates H m_k.
        (fig_ts, fig_gauss) : Figures if return_fig=True
        """
        # ----- validate / prepare inputs -----
        A = np.asarray(self.A, float)
        H = np.asarray(self.H, float)
        Q = np.asarray(self.Sigma_p, float)
        R = np.asarray(self.Sigma_m, float)
        n, p = self.n, self.p

        if Y is None:
            if T is None:
                raise ValueError("Provide either Y or T.")
            _, Y = self.simulate(T)
        else:
            Y = np.asarray(Y, float)
            if Y.ndim != 2 or Y.shape[1] != p:
                raise ValueError(f"`Y` must have shape (T, p={p}); got {Y.shape}.")
            T = Y.shape[0]

        m = np.zeros(n) if m0 is None else np.asarray(m0, float).reshape(-1)
        if m.shape[0] != n:
            raise ValueError(f"`m0` must have length n={n}; got {m.shape}.")
        P = (1e2 * np.eye(n)) if P0 is None else np.asarray(P0, float)
        if P.shape != (n, n):
            raise ValueError(f"`P0` must be shape (n,n)=({n},{n}); got {P.shape}.")

        if kf is None:
            kf = LinearKF(use_joseph=True, symmetrize=True)

        # ----- filtering loop -----
        M = np.zeros((T, n))
        Yhat = np.zeros((T, p))

        for k in range(T):
            # Predict
            m_pred = A @ m
            P_pred = A @ P @ A.T + Q

            # Update
            m, P, _, _ = kf.measurement_update(m_pred, P_pred, H, Y[k], R)

            # Store
            M[k] = m
            Yhat[k] = H @ m

        # Residuals (posterior): e_k = y_k - H m_k
        E = Y - Yhat  # shape (T, p)

        # ----- plot (1): time series y vs Hm -----
        labels = component_labels or [f"y[{j}]" for j in range(p)]
        t = np.arange(T)
        fig_ts = make_subplots(rows=p, cols=1, shared_xaxes=True,
                               subplot_titles=[f"{lab}: noisy vs filtered Hm" for lab in labels])

        for j in range(p):
            fig_ts.add_trace(
                go.Scatter(
                    x=t, y=Y[:, j],
                    mode="markers",
                    name=f"{labels[j]} noisy",
                    marker=dict(size=5),
                ),
                row=j+1, col=1
            )
            fig_ts.add_trace(
                go.Scatter(
                    x=t, y=Yhat[:, j],
                    mode="lines",
                    name=f"{labels[j]} filtered (H m)",
                ),
                row=j+1, col=1
            )
            fig_ts.update_yaxes(title_text=labels[j], row=j+1, col=1)

        fig_ts.update_xaxes(title_text="time step", row=p, col=1)
        fig_ts.update_layout(
            title="Measurements vs. KF Filtered Estimates (H m_k)",
            template="plotly_white",
            height=max(300, 260 * p),
            legend=dict(
                orientation="v",
                x=1.02, xanchor="left",
                y=0.5,  yanchor="middle",
            ),
            margin=dict(r=140),
        )

        # ----- plot (2): residual Gaussians only -----
        def _gauss_pdf(x, mu, sigma):
            if sigma <= 0:
                return np.zeros_like(x)
            return (1.0 / (np.sqrt(2.0 * np.pi) * sigma)) * np.exp(-0.5 * ((x - mu) / sigma) ** 2)

        fig_gauss = make_subplots(
            rows=p, cols=1, shared_xaxes=False,
            subplot_titles=[f"{lab} residual Gaussian fits" for lab in labels]
        )

        for j in range(p):
            ej_raw = E[:, j]
            ej = ej_raw[np.isfinite(ej_raw)]
            # Sample-based parameters (MLE)
            mu_hat = float(np.mean(ej)) if ej.size else 0.0
            sigma_hat = float(np.std(ej, ddof=0)) if ej.size else 0.0
            # Modeled marginal std from R (diagonal)
            sigma_model = float(np.sqrt(max(R[j, j], 0.0)))

            # x-range covering both curves; ensure a minimum width
            span = max(1e-6, 4.0 * sigma_hat, 4.0 * sigma_model, np.ptp(ej) if ej.size > 1 else 0.0)
            center = mu_hat
            lo = center - 0.5 * span
            hi = center + 0.5 * span
            if not np.isfinite(lo) or not np.isfinite(hi) or hi <= lo:
                lo, hi = -1.0, 1.0

            xgrid = np.linspace(lo, hi, curve_samples)
            pdf_hat = _gauss_pdf(xgrid, mu_hat, sigma_hat)
            pdf_model = _gauss_pdf(xgrid, 0.0, sigma_model)

            fig_gauss.add_trace(
                go.Scatter(
                    x=xgrid, y=pdf_hat,
                    mode="lines",
                    name=f"{labels[j]} Gaussian Approx (μ̂={mu_hat:.3g}, σ̂={sigma_hat:.3g})",
                    hovertemplate="x=%{x:.4g}<br>pdf=%{y:.4g}<extra></extra>",
                ),
                row=j+1, col=1
            )
            fig_gauss.add_trace(
                go.Scatter(
                    x=xgrid, y=pdf_model,
                    mode="lines",
                    name=f"{labels[j]} Modeled N(0, R_jj) (σ={sigma_model:.3g})",
                    hovertemplate="x=%{x:.4g}<br>pdf=%{y:.4g}<extra></extra>",
                ),
                row=j+1, col=1
            )

            fig_gauss.update_xaxes(title_text=f"{labels[j]} residual e = y - Hm", row=j+1, col=1)
            fig_gauss.update_yaxes(title_text="density", row=j+1, col=1)

        fig_gauss.update_layout(
            title="Residual Gaussian Fits (Sample MLE vs. Modeled Noise)",
            template="plotly_white",
            height=max(300, 260 * p),
            legend=dict(
                orientation="v",
                x=1.02, xanchor="left",
                y=0.5,  yanchor="middle",
            ),
            margin=dict(r=240),
        )

        if show:
            fig_ts.show()
            fig_gauss.show()

        return (M, Yhat, fig_ts, fig_gauss) if return_fig else (M, Yhat)

In [None]:
# 2D position-only measurements (p=2), CV model in x & y
def make_cv2d(dt=0.1, q=1e-2, r=0.5, x0=(0,0, 1,0.5), seed=0):
    A = np.array([
        [1, 0, dt,  0],
        [0, 1,  0, dt],
        [0, 0,  1,  0],
        [0, 0,  0,  1],
    ])
    # Measure x and y only
    H = np.array([
        [1, 0, 0, 0],
        [0, 1, 0, 0],
    ])
    Q_block = np.array([[dt**3/3, dt**2/2],
                        [dt**2/2, dt      ]]) * q
    Q = np.block([
        [Q_block,         np.zeros((2,2))],
        [np.zeros((2,2)), Q_block        ]
    ])
    R = np.eye(2) * (r**2)
    return LinearGaussianSystemSyms(A=A, H=H, Sigma_p=Q, Sigma_m=R,
                                x0=np.asarray(x0, float), rng=np.random.default_rng(seed))


In [None]:
sys = make_cv2d(dt=0.1, q=1e-2, r=0.5, x0=(0,0, 0.5, -0.2), seed=7)
X2, Y2 = sys.simulate(T=300)
sys.plot_y(Y2, nbins=40, component_labels=["pos_x", "pos_y"])

In [None]:
# Option A: simulate 300 steps internally, starting from broad prior
M, Yhat= sys.filter_with_kf_and_plot(T=300, m0=np.array([0,0, 0,0]), P0=np.eye(4)*100,
                                      component_labels=["pos_x","pos_y"], show=True)

# Option B: if you already have measurements Y, just pass them:
# X, Y = sys.simulate(T=300)
# M, Yhat = sys.filter_with_kf_and_plot(Y=Y, m0=np.array([0,0, 0,0]), P0=np.eye(4)*100,
#                                       component_labels=["pos_x","pos_y"])

In [None]:
fig_1.show()

In [None]:
m_upd

#New

# The Kalman Filter on $\mathbb{R}^n$

Consider a system modelled by a Markov process
\begin{align}
x_k &= A_k\,x_{k-1} + G_k\,w_{k-1}, \qquad\\
y_k & = H_k\,x_k + z_k. \qquad
\end{align}
We assume $p(w_k)=\mathscr{N}(0,\Sigma_p)$ and $p(z_k)=\mathscr{N}(0,\Sigma_m)$, where $\mathscr{N}(\mu,\Sigma)$ denotes a multivariate Gaussian with mean $\mu$ and covariance $\Sigma$.

Let $Y_k$ denote the event corresponding to a realization of $\{y_1,\dots,y_k\}$. Define
\begin{align}
\widehat{x}_k^+ &\triangleq x_k|Y_k,\qquad\\
\widehat{x}_k^- &\triangleq x_k|Y_{k-1},\qquad\\
y_k^- &\triangleq y_k|Y_{k-1}.\\
\end{align}
Assume the model
\begin{align}
\widehat{x}_k^- &= A_k\,x_{k-1}^+ + G_{k-1}\,w_{k-1}, \qquad\\
y_k^- &= H_k\,\widehat{x}_k^- + z_k. \qquad
\end{align}
If $p(x_0^-)=\mathscr{N}(m_0,P_0)$, then since $p(w_k)=\mathscr{N}(0,\Sigma_p)$, the linear system above implies
$p(x_k|Y_{k-1})=p(\widehat{x}_k^-)=\mathscr{N}(m_k^-,P_k^-)$,
with predicted mean and covariance
\begin{align}
m_k^- &= A_k\,m_{k-1}, \qquad\\
P_k^- &= A_k\,P_{k-1}\,A_k^{T} + G_k\,\Sigma_p\,G_k^{T}. \qquad
\end{align}
Here $p(x_k|Y_k)=p(\widehat{x}_k^+)=\mathscr{N}(m_k,P_k)$.  ￼

Furthermore, since $p(z_k)=\mathscr{N}(0,\Sigma_m)$ we have,
\begin{align}
p(y_k|Y_{k-1})=\mathscr{N}\big(H_k m_k^-,\; H_k P_k^- H_k^{T}+\Sigma_m\big),
\end{align}
so the joint distribution is
\begin{align}
p
\begin{pmatrix}x_k|Y_{k-1}\\[2pt] y_k|Y_{k-1}\end{pmatrix}
=
\mathscr{N}\left(
\begin{bmatrix}
m_k^- \\
H_k m_k^-
\end{bmatrix},
\begin{bmatrix}
P_k^- & P_k^- H_k^{\!T} \\
H_k P_k^- & H_k P_k^- H_k^{\!T} + \Sigma_m
\end{bmatrix}
\right).
\end{align}
Then from the properties of multivariate normals, the conditional distribution $p(x_k|Y_k)$ is
\begin{align}
p(x_k|Y_k)=\mathscr{N}\Big(
m_k^- + P_k^- H_k^{\!T}\!\big(H_k P_k^- H_k^{\!T}+\Sigma_m\big)^{-1}\!(y_k - H_k m_k^-),\;
P_k^- - P_k^- H_k^{\!T}\!\big(H_k P_k^- H_k^{\!T}+\Sigma_m\big)^{-1}\!H_k P_k^-
\Big). \quad
\end{align}

Thus the updated mean and covariance are
\begin{align}
K_k &\triangleq P_k^- H_k^{\!T}\!\big(H_k P_k^- H_k^{\!T}+\Sigma_m\big)^{-1}, \qquad \\
m_k &= m_k^- + K_k\,(y_k - H_k m_k^-), \qquad \\
P_k &= (I - K_k H_k)\,P_k^-. \qquad
\end{align}
Here $\Sigma_p = \mathbb{E}(w_k w_k^{T})$ and $\Sigma_m=\mathbb{E}(z_k z_k^{T})$. Defining the estimation error $e_k \triangleq x_k - m_k$, we have
\begin{align}
e_k &= (I-K_k H_k) A_k e_{k-1} + (I-K_k H_k) w_{k-1} - K_k z_k,\\
P_k &= (I-K_k H_k)\big(A_k P_{k-1} A_k^{\!T} + \Sigma_p\big).
\end{align}

⸻



## 1-D Example

Let the scalar linear–Gaussian model be
\begin{aligned}
x_k &= a\,x_{k-1} + w_{k-1},\qquad w_{k-1}\sim\mathscr N(0,q),\\
y_k &= h\,x_k + z_k,\qquad\;\;\;\; z_k\sim\mathscr N(0,r),
\end{aligned}
with prior $x_0\sim \mathscr{N}(m_0,P_0)$. Define $Y_{k-1}=\{y_1,\dots,y_{k-1}\}$.

Prediction:

\begin{aligned}
m_k^- &= a\,m_{k-1},\\[2pt]
P_k^- &= a^2 P_{k-1} + q.
\end{aligned}

Innovation (measurement) model and variance:
\begin{aligned}
v_k &\triangleq y_k - h\,m_k^-,\\[2pt]
S_k &= h^2 P_k^- + r.
\end{aligned}

Kalman gain:
\begin{align}
K_k = \dfrac{P_k^-\,h}{S_k}.
\end{align}

Update:
\begin{aligned}
m_k &= m_k^- + K_k\,v_k
= m_k^- + \frac{P_k^- h}{S_k}\bigl(y_k - h\,m_k^-\bigr),\\[6pt]
P_k &= (1 - K_k h)\,P_k^-
= \Bigl(1 - \frac{P_k^- h^2}{S_k}\Bigr) P_k^- .
\end{aligned}

Predictive measurement distribution (before seeing y_k):
\begin{align}
p(y_k | Y_{k-1})=\mathscr N\bigl(h\,m_k^-,\; h^2 P_k^- + r\bigr).
\end{align}

Posterior-predictive measurement distribution (after filtering on y_k):
\begin{align}
p(y_k\mid Y_k)=\mathscr N\bigl(h\,m_k,\; h^2 P_k + r\bigr).
\end{align}



In [None]:
import numpy as np

def make_cv1d(dt: float = 0.1,
              q: float = 1e-2,
              r: float = 1e-1,
              x0=(0.0, 1.0),
              seed: int | None = None) -> LinearGaussianSystemSyms:
    """
    Build a 1D constant-velocity linear Gaussian system:

        x_k = A x_{k-1} + w_{k-1},      w ~ N(0, Q)
        y_k = H x_k       + z_k,        z ~ N(0, R)

    State: x = [position, velocity]^T  (n=2)
    Measurement: y = position (scalar, p=1)

    Parameters
    ----------
    dt : float
        Sampling period Δt.
    q : float
        Continuous white-acceleration noise intensity (process noise scale).
        Discrete-time Q = q * [[dt^3/3, dt^2/2],
                               [dt^2/2, dt     ]].
    r : float
        Measurement noise std. R = [[r^2]] (scalar variance).
    x0 : tuple[float, float]
        Initial state (position, velocity).
    seed : int | None
        Seed for reproducible randomness.

    Returns
    -------
    LinearGaussianSystemSyms
        System with A, H, Sigma_p (Q), Sigma_m (R), and initial state x0.
    """
    A = np.array([[1.0, dt],
                  [0.0, 1.0]], dtype=float)

    # Measure position only (scalar)
    H = np.array([[1.0, 0.0]], dtype=float)  # shape (1,2)

    # Discrete CV process noise covariance (from white-acceleration model)
    Q = q * np.array([[dt**3/3.0, dt**2/2.0],
                      [dt**2/2.0, dt      ]], dtype=float)

    # Measurement noise covariance (scalar)
    R = np.array([[r**2]], dtype=float)

    x0 = np.asarray(x0, dtype=float)
    if x0.shape != (2,):
        raise ValueError(f"`x0` must be shape (2,), got {x0.shape}.")

    rng = np.random.default_rng(seed)
    return LinearGaussianSystemSyms(A=A, H=H, Sigma_p=Q, Sigma_m=R, x0=x0, rng=rng)

# Must have p=1 in your system (scalar measurement). n can be >1.
sys = make_cv1d(dt=0.1, q=5e-2, r=1.0, x0=(0.0, 0.5), seed=7)  # p=1
# Run with simulated Y (T provided)
sys.animate_measurement_gaussians_scalar(T=80, m0=np.zeros(sys.n), P0=np.eye(sys.n)*100,
                                         frame_ms=120, save_html_path=None, show=True)

# Or if you've already collected Y (shape (T,) or (T,1)):
# _, Y = sys.simulate(T=100)
# sys.animate_measurement_gaussians_scalar(Y=Y, m0=np.zeros(sys.n), P0=np.eye(sys.n)*100,
#                                          save_html_path="kf_scalar_y_gaussians.html", auto_play=False)

#DISCRETE TIME PRE-OBSERVERS ON LIE GROUPS WITH TIME INVARIANT ERROR DYNAMICS


Let $G$ be an $n$-dimensional Lie group with Lie algebra $G$. Let $(g,\zeta)\in G\times G$ and let $\phi:G\times M\to M$ be a left- or right-invariant action on an $m$-dimensional manifold $M$. We consider the kinematic system
\begin{align}
\dot g &= g\cdot \zeta,  \tag{1}\\
y &= \phi_g(\gamma), \tag{2}
\end{align}
where $\zeta(t)\in G$ is a known input and $\gamma\in M$ is a known constant. A standard discrete-time approximation is
\begin{align}
g_k &= g_{k-1},\exp(\Delta T,\zeta_{k-1}), \tag{3}
\end{align}
with sampling period $\Delta T$. (This discretization is classical.)



## For left-invariant outputs

Consider the pre-observer
\begin{align}
\tilde g_k^- &= \tilde g_{k-1},\exp(\Delta T,\zeta_{k-1}), \tag{4}\\
\tilde g_k &= \tilde g_k^-,\exp!\big(\Delta T,L(y_k,\tilde y_k)\big), \tag{5}\\
\tilde y_k &= \phi_{\tilde g_k}(\gamma), \tag{6}
\end{align}
where $L:M\times M\to G$ is the innovation term. Define $u_k\triangleq \exp(\Delta T,\zeta_{k-1})$, the a priori error $e_k^-\triangleq (\tilde g_k^-)^{-1}g_k$, and the a posteriori error $e_k\triangleq \tilde g_k^{-1}g_k$. Then
\begin{align}
e_k^- &= (\tilde g_k^-)^{-1} g_k ;=; u_{k-1}^{-1},\tilde g_{k-1}^{-1} g_{k-1},u_{k-1}
;=; u_{k-1}^{-1},e_{k-1},u_{k-1},\
e_k &= \tilde g_k^{-1} g_k ;=; \exp!\big(-\Delta T,L(y_k,\tilde y_k)\big),e_k^-.
\end{align}
If $L$ is $G$-invariant, i.e., $L!\big(\phi_g(y_1),\phi_g(y_2)\big)=L(y_1,y_2)$ for all $g\in G$, then using $y_k=\phi_{g_k}(\gamma)$ and $\tilde y_k=\phi_{\tilde g_k}(\gamma)$ one obtains $L(y_k,\tilde y_k)=L\big(\phi_{e_k}(\gamma),\gamma\big)$, so the error dynamics are autonomous (time invariant).



## For right-invariant outputs

Consider the pre-observer
\begin{align}
\tilde g_k^- &= \tilde g_{k-1},\exp(\Delta T,\zeta_{k-1}), \tag{7}\\
\tilde g_k &= \exp!\big(\Delta T,L(y_k,\tilde y_k)\big),\tilde g_k^-, \tag{8}\\
\tilde y_k &= \phi_{\tilde g_k}(\gamma). \tag{9}
\end{align}
With $u_k\triangleq \exp(\Delta T,\zeta_{k-1})$ and the right-invariant errors $e_k^-\triangleq g_k(\tilde g_k^-)^{-1}$ and $e_k\triangleq g_k\tilde g_k^{-1}$, the error dynamics satisfy
\begin{align}
e_k^- &= g_k(\tilde g_k^-)^{-1} ;=; g_{k-1},u_{k-1},u_{k-1}^{-1},\tilde g_{k-1}^{-1}
;=; e_{k-1},\\
e_k &= g_k\tilde g_k^{-1} ;=; e_k^-,\exp!\big(-\Delta T,L(y_k,\tilde y_k)\big).
\end{align}
If $L$ is $G$-invariant, one similarly gets $L(y_k,\tilde y_k)=L\big(\phi_{e_k}(\gamma),\gamma\big)$, hence the error dynamics are autonomous.



## The IMU+GNSS sensor fusion problem

A representative application is rigid-body orientation/pose estimation with IMU and GNSS. Gyroscopes measure body-frame angular velocity $\Omega$, accelerometers measure $A_m$ (specific force), and GNSS provides $o,\dot o$. A convenient reformulation introduces
\begin{align}
o_s(t)&\triangleq o(t)+\tfrac{1}{2}gt^2 e_3,\qquad\\
v_s(t)&\triangleq \dot o_s(t)=\dot o(t)+g t\,e_3,\qquad\\
R\,A_m&=\ddot o_s(t)=\ddot o(t)+g e_3,
\end{align}
giving the continuous-time model
\begin{align}
\dot R &= R,\widehat\Omega, \tag{21}\\
\dot v_s &= R,A_m, \tag{22}\\
\dot o_s &= v_s, \tag{23}\\
y_o &= o_s, \tag{24}\\
y_v &= v_s. \tag{25}
\end{align}
Defining the homogeneous state
\begin{align}
X &\triangleq\;
\begin{bmatrix}
R & v_s\\[2pt]
0 & 1
\end{bmatrix},\qquad\\
\zeta &\triangleq
\begin{bmatrix}
\widehat\Omega & A_m\\[2pt]
0 & 0
\end{bmatrix},\qquad\\
\gamma_v &\triangleq\;
\begin{bmatrix}
0_{3\times 1}\\[2pt]
1
\end{bmatrix},
\end{align}
the dynamics/output compactly read
\begin{align}
\dot X &= X,\zeta, \tag{26}\\
y_v &= X,\gamma_v. \tag{27}
\end{align}


# References

[1] K. C. Wolfe, M. Mashner, and G. S. Chirikjian, “Bayesian fusion on Lie groups,” *Journal of Algebraic Statistics*, 2(1):75–97, 2011. [Link](https://rpk.lcsr.jhu.edu/publications/)

[2] S. Bonnable, P. Martin, and E. Salan, “Invariant extended Kalman filter: theory and application to a velocity-aided attitude estimation problem,” *Proc. 48th IEEE CDC/28th CCC*, pp. 1297–1304, Dec 2009. [Link](https://doi.org/10.1109/CDC.2009.5399990)

[3] S. Bonnabel, “Left-invariant extended Kalman filter and attitude estimation,” *Proc. 46th IEEE CDC*, pp. 1027–1032, Dec 2007. [Link](https://doi.org/10.1109/CDC.2007.4434662)

[4] G. Bourmaud, R. Mégret, A. Giremus, and Y. Berthoumieu, “Discrete extended Kalman filter on Lie groups,” *EUSIPCO 2013*, pp. 1–5, Sept 2013. [Link](https://hal.science/hal-00903252/document)

[5] G. S. Chirikjian, “Information theory on Lie groups and mobile robotics applications,” *Proc. 2010 IEEE ICRA*, pp. 2751–2757, May 2010. [Link](https://rpk.lcsr.jhu.edu/publications/)

[6] Y. Wang and G. S. Chirikjian, “Error propagation on the Euclidean group with applications to manipulator kinematics,” *IEEE Trans. Robotics*, 22(4):591–602, Aug 2006. [Link](https://ieeexplore.ieee.org/document/1673946)

[7] M. J. Piggott and V. Solo, “Stochastic numerical analysis for Brownian motion on SO(3),” *Proc. 53rd IEEE CDC*, pp. 3420–3425, Dec 2014. [Link](https://doi.org/10.1109/CDC.2014.7039919)

[8] O. Tuzel, F. Porikli, and P. Meer, “Learning on Lie groups for invariant detection and tracking,” *Proc. 2008 IEEE CVPR*, pp. 1–8, June 2008. [Link](https://doi.org/10.1109/CVPR.2008.4587521)

[9] A. Barrau and S. Bonnabel, “Intrinsic filtering on Lie groups with applications to attitude estimation,” *IEEE Trans. Automatic Control*, 60(2):436–449, Feb 2015. [Link](https://doi.org/10.1109/TAC.2014.2342911)

[10] S. Bonnabel and A. Barrau, “An intrinsic Cramér–Rao bound on SO(3) for (dynamic) attitude filtering,” *Proc. 54th IEEE CDC*, pp. 2158–2163, Dec 2015. [Link](https://dblp.org/rec/conf/cdc/BonnabelB15)

[11] A. Barrau and S. Bonnabel, “The invariant extended Kalman filter as a stable observer,” *IEEE Trans. Automatic Control*, 62(4):1797–1812, Apr 2017. [Link](https://doi.org/10.1109/TAC.2016.2594085)

[12] C. Lageman, J. Trumpf, and R. Mahony, “Gradient-like observers for invariant dynamics on a Lie group,” *IEEE Trans. Automatic Control*, 55(2):367–377, Feb 2010. [Link](https://doi.org/10.1109/TAC.2009.2034937)

[13] S. Bonnabel, P. Martin, and P. Rouchon, “Non-linear symmetry-preserving observers on Lie groups,” *IEEE Trans. Automatic Control*, 54(7):1709–1713, July 2009. [Link](https://doi.org/10.1109/TAC.2009.2020646)

[14] M. Izadi and A. K. Sanyal, “Rigid body attitude estimation based on the Lagrange–d’Alembert principle,” *Automatica*, 50(10):2570–2577, 2014. [Link](https://doi.org/10.1016/j.automatica.2014.08.010)

[15] S. Bonnabel and J. J. Slotine, “A contraction theory-based analysis of the stability of the deterministic extended Kalman filter,” *IEEE Trans. Automatic Control*, 60(2):565–569, Feb 2015. [Link](https://doi.org/10.1109/TAC.2014.2336991)
