In [None]:
import numpy as np

# Linear Quadratic Estimator (Kalman filter)
def lqe(A, C, V, W, X0):
    """
    Linear Quadratic Estimator (Kalman filter).

    Args:
        A: System dynamics matrices.
        C: Measurement matrices.
        V: Measurement noise covariance matrices.
        W: Process noise covariance matrices.
        X0: Initial state covariance matrix.

    Returns:
        K: Kalman gain matrices.
        X0: Time-updated (prior) state covariance matrices.
        X1: Measurement-updated (posterior) state covariance matrices.
    """
    horizon = len(A)
    for t in range(horizon):
        V[t] = V[t] * V[t].T  # Ensure V[t] is symmetric (measurement noise)
        W[t] = W[t] * W[t].T  # Ensure W[t] is symmetric (process noise)
    X0 = [X0[0] * X0[0].T]  # Ensure X0 is symmetric
    X1 = []
    K = []
    for t in range(horizon):
        # Kalman gain
        K.append(X0[t] * C[t].T * np.linalg.pinv(C[t] * X0[t] * C[t].T + V[t])) # take inverse of matrix C*X0*C^T + V
        # Posterior covariance
        X1.append(X0[t] - K[t] * C[t] * X0[t])
        # Time-update covariance
        if t < horizon - 1:
            X0.append(A[t] * X1[t] * A[t].T + W[t])
    return K, X0, X1


# Linear Quadratic Regulator (LQR)
def lqr(A, B, Q, R):
    """
    Linear Quadratic Regulator (LQR).

    Args:
        A: System dynamics matrices.
        B: Control input matrices.
        Q: State cost matrices.
        R: Control cost matrices.

    Returns:
        L: Optimal control gain matrices.
        P: Cost-to-go matrices.
    """
    horizon = len(A)
    L = [np.matrix(np.zeros((2, 2)))]  # Placeholder for control gains
    P = [Q[horizon - 1]]  # Terminal cost
    for t in reversed(range(horizon)):
        # Compute optimal control gain
        L.append(-np.linalg.pinv(R[t] + B[t].T * P[horizon - (t + 1)] * B[t]) * B[t].T * P[horizon - (t + 1)] * A[t])
        # Backward Riccati recursion
        if t > 0:
            P.append(A[t].T * P[horizon - (t + 1)] * (A[t] + B[t] * L[horizon - t]) + Q[t])
    return list(reversed(L)), list(reversed(P))


# Transforms time-invariant into constant time-varying (list of) matrices
def tvar(var, horizon, idx=None):
    if not type(var) is list:
        temp = []
        if not idx:
            for t in range(horizon):
                temp.append(var)
        else:
            for t in range(horizon):
                temp.append(np.matrix(np.zeros(np.shape(var))))
            temp[idx] = var
        var = temp
    return var


# Main LQG class definition
class LQG:
    def __init__(self, horizon):
        self.horizon = horizon
        self.var = {'A': None, 'B': None, 'C': None,
                    'P': None, 'Q': None, 'R': None,
                    'V': None, 'W': None,
                    'X0': None, 'X1': None}

    def define(self, string, val):
        for char in string:
            if char in 'ABCRVW':
                self.var[char] = tvar(val, self.horizon)
            elif char == 'Q':
                self.var['Q'] = tvar(val, self.horizon, -1)
            elif char == 'X':
                self.var['X0'] = tvar(val, 1)

    def kalman(self):
        # Perform Kalman filtering
        self.var['K'], self.var['X0'], self.var['X1'] = \
            lqe(self.var['A'], self.var['C'], self.var['V'], self.var['W'], self.var['X0'])

    def control(self):
        # Perform LQR
        self.var['L'], self.var['P'] = \
            lqr(self.var['A'], self.var['B'], self.var['Q'], self.var['R'])
            # L -> list of K (optimal feedback gain) in LQR, P -> list of P (cost-to-go) in LQR

    def sample(self, n=1, x0=None, x=None, u=None, v=None, w=None):
        """
        Simulates the system dynamics.

        Args:
            n: Number of samples (trajectories).
            x0: Initial state mean.
            x: Initial state.
            u: Control input.
            v: Measurement noise.
            w: Process noise.

        Returns:
            Dictionary containing simulated states, controls, measurements, etc.
        """
        y = None

        a = np.shape(self.var['A'][0])
        if self.var['X0'] is None:
            self.var['X0'] = [np.matrix(np.zeros(np.shape((a, a))))]

        if not self.var['C'] is None:
            c = np.shape(self.var['C'][0])
            self.kalman()
            obs = True
        else:
            obs = False

        if not self.var['B'] is None:
            self.control()
            ctrl = True
        else:
            ctrl = False

        if x0 is None:  # Default zero
            x0 = [np.matrix(np.zeros((a[0], n)))]
        elif min(np.shape(x0)) == 1 or type(x0) is list:  # Provided mean
            x0 = np.reshape(np.matrix(x0), (max(np.shape(x0)), 1))
            x0 = [np.tile(x0, (1, n))]
        assert np.shape(x0[0]) == (a[0], n)

        if x is None:  # Sample initial error
            e = np.random.randn(a[0], n)
            x = [x0[0] + self.var['X0'][0] * e]
        else:  # Provided initial states
            assert np.shape(x) == (a[0], n)
            x = [x]
            e = np.linalg.pinv(self.var['X0'][0]) * (x[0] - x0[0])

        if v is None and obs:  # Measurement noise
            v = [np.random.randn(c[0], n)]
            for t in range(self.horizon - 1):
                v.append(np.random.randn(c[0], n))
        if w is None:  # Process noise
            w = [np.random.randn(a[0], n)]
            for t in range(self.horizon - 1):
                w.append(np.random.randn(a[0], n))

        if obs:
            y = [self.var['C'][0] * x[0] + self.var['V'][0] * v[0]]
            x1 = [x0[0] + self.var['K'][0] * (y[0] - self.var['C'][0] * x0[0])]
        else:
            x1 = [x0[0]]
        if u is None and ctrl:
            u = [self.var['L'][0] * x1[0]]
        for t in range(self.horizon - 1):
            if ctrl:
                x0.append(self.var['A'][t] * x1[t] + self.var['B'][t] * u[t])
                x.append(self.var['A'][t] * x[t] + self.var['B'][t] * u[t] + self.var['W'][t] * w[t])
            else:
                x0.append(self.var['A'][t] * x1[t])
                x.append(self.var['A'][t] * x[t] + self.var['W'][t] * w[t])
            if obs:
                y.append(self.var['C'][t] * x[t] + self.var['V'][t] * v[t])
                x1.append(x0[t] + self.var['K'][t] * (y[t] - self.var['C'][t] * x0[t]))
            else:
                x1.append(x0[t])
            if ctrl:
                if len(u) <= t + 1:
                    u.append(self.var['L'][t] * x1[t])

        noise = {'x': e, 'v': v, 'w': w}
        kf = {'x0': x0, 'x1': x1}
        data = {'x': x, 'y': y, 'u': u, 'kf': kf, 'noise': noise, 'cost': {}}

        return data

In [6]:
import numpy as np

# full LQG
system = LQG(5)  # initialize LQG system with time horizon T = 5
system.define('ABCQRVWX', np.matrix(np.eye(2)))  # define matrices
data = system.sample(10)  # simulate 10 system runs

print(data['noise'])

# no feedback
#system = lqg.LQG(5)  # initialize LQG system with time horizon T = 5
#system.define('ABQRVX', np.matrix(np.eye(2)))  # define matrices
#data = system.sample(10)  # simulate 10 system runs

# no control
#system = lqg.LQG(5)  # initialize LQG system with time horizon T = 5
#system.define('ACVWX', np.matrix(np.eye(2)))  # define matrices
#data = system.sample(10)  # simulate 10 system runs

{'x': array([[-1.32750336,  0.27977708, -0.31964266, -0.4634541 , -0.45710812,
         0.0628693 , -1.20905454,  0.16052032, -0.39418152, -1.87415098],
       [ 0.69612232,  1.24927391,  0.71365569, -1.2470122 ,  0.28260325,
         0.62439828,  0.53075233,  0.35530797, -1.07922032,  0.57032485]]), 'v': [array([[-0.31565026,  0.1409607 , -0.52274299,  0.4826443 , -0.36392084,
         0.44431019, -0.25723533,  0.36213719, -0.20299799, -0.00800599],
       [-0.80767658,  0.60113137,  0.15766269, -0.00309834,  0.63376456,
         0.22043848, -0.73637728,  0.23033062, -0.39949209,  1.28801076]]), array([[-1.70446547, -0.04170829,  0.35651822, -1.25519801, -0.29951005,
         0.64525823,  0.03049398, -0.67381377, -1.10413436, -1.15945157],
       [ 0.37438906, -1.13990567, -1.67884966, -0.10709554, -0.10091484,
        -0.49001116, -0.03956326, -1.76987922,  0.86703271,  0.19500796]]), array([[-0.22478063,  0.37402891,  0.83029541, -0.79750638,  1.25089541,
         1.91097345, -0.261