In [None]:
## LDS-DV
import numpy as np
from numpy.linalg import norm, pinv
from scipy.optimize import minimize

def obs(X, U, V, lam, gamma1, gamma2):
    return l(X, U @ V) + f(U, V, lam, gamma1, gamma2)

def f(U, V, lam, gamma1, gamma2):
    tmp = 0
    k = V.shape[0]
    C, E = np.vsplit(U, 2)
    for i in range(k):
        tmp += max(norm(C[:, i])/gamma1, norm(E[:, i])/gamma2) * norm(V[i])
    return lam * tmp

def l(X, UV):
    return np.sum((X - UV)**2)


def loss(U, V, u, v, rho, eta, theta, lam, X):
    UV_updated = (1 - eta) * U @ V + theta * np.outer(u, v)
    return l(X, UV_updated) + lam * ((1 - eta) * rho + theta)
    

def grad(X, U, V):
    return 2 * (U @ V - X)



def optimize_eta_theta(U, V, u, v, rho, lam, X):
    initial_guess = [0.5, 0.5] 
    
    constraints = [
        {"type": "ineq", "fun": lambda x: x[0]},         # η >= 0
        {"type": "ineq", "fun": lambda x: 1 - x[0]},     # η <= 1
        {"type": "ineq", "fun": lambda x: x[1]}          # θ >= 0
    ]
    
    result = minimize(
        lambda x: loss(U, V, u, v, rho, x[0], x[1], lam, X),
        initial_guess,
        constraints=constraints,
        method="SLSQP" 
    )
    
    if result.success:
        eta_opt, theta_opt = result.x
        return eta_opt, theta_opt
    else:
        raise ValueError("Optimization failed: " + result.message)


def proximal_operator_U(U, lam_v):
    U1, U2 = np.hsplit(U, 2)
    norm_U1 = norm(U1)
    norm_U2 = norm(U2)
    alpha = np.maximum(0.5 * (norm_U1 - norm_U2 + lam_v), 0)
    beta = np.maximum(0.5 * (norm_U2 - norm_U1 + lam_v), 0)
    
    if norm_U1 <= norm_U2:
        factor_u1 = np.maximum(1 - alpha / norm_U1, 0)
        factor_u2 = np.maximum(1 - (lam_v - alpha) / norm_U2, 0)
    else:
        factor_u1 = np.maximum(1 - (lam_v - beta) / norm_U1, 0)
        factor_u2 = np.maximum(1 - beta / norm_U2, 0)
    
    prox_u1 = factor_u1 * U1
    prox_u2 = factor_u2 * U2
    
    return prox_u1, prox_u2
    

def fista(X_hat, U, V, u_iter, v_iter, U_acc, V_acc, eta, theta, lam, lr, gamma1, gamma2, max_iter_f):
    res_i = (1 - eta) * U_acc @ V_acc + theta * np.outer(u_iter, v_iter) - X_hat 
    d = len(U) // 2
    w = 1.0
    w_p = 1.0
    obs_p = float('inf')
    for iter in range(max_iter_f):
        U_update = U_acc - 2 * lr * (res_i  @ V_acc.T) 
        V_update = V_acc - 2 * lr * (U_acc.T @ res_i)
        for i in range(k):
            C, E = proximal_operator_U(U_update[:, i], norm(V[i])*lam)
            U_update[:d, i] = C.copy()
            U_update[d:, i] = E.copy()
        V_iter = V_update / (1 + 2 * lam * lr)
        w_p = w
        w = (1 + np.sqrt(1+4*np.power(w, 2))) / 2
        U_acc = U_update + (w_p-1.0) / w * (U_update - U)
        U = U_update.copy()
        
        V_acc = V_iter + (w_p-1.0) / w * (V_iter - V)
        V = V_iter.copy()
        
        obs_i =  obs(X_hat, U, V, lam, gamma1, gamma2)
        
        if obs_i >= obs_p: break
        obs_p = obs_i
        res_i = (1 - eta) * U_acc @ V_acc - X_hat
    
    rho_iter = 0
    for i in range(k):
        rho_iter += max(norm(U[:d, i]), norm(U[d:, i])) + norm(V[i]) 
    rho_iter = rho_iter/2
    return U, V, w, w_p, rho_iter


def optimize(X, lam, gamma1 = 2, gamma2 = 2, k = -1, max_iter = 100, lr=1.e-3, max_iter_f = 1000):
    if k == -1: k = X.shape[1]
    d, T = X.shape
    A =np.eye(k, k)
    C = np.zeros((d, k))
    Phi = np.zeros((k, T-1))
    B = np.ones((k, k))
    D = np.zeros((d, d))
    
    U = np.concatenate((np.eye(d, k), np.eye(d, k)), axis=0)
    V = Phi.copy()
    
    X_hat = np.vstack((X[:, :-1], X[:, 1:]))
    
    rho_iter = 0
    w = 1.0
    w_p = 1.0
    for iter in range(max_iter):
        Q, _, R = np.linalg.svd(grad(X_hat, U, V), full_matrices=False)
        u_iter = Q[:, 0] 
        v_iter = R[0, :]
        eta, theta = optimize_eta_theta(U, V, u_iter, v_iter, rho_iter, lam, X_hat)
        U, V, w, w_p, rho_iter = fista(X_hat, U, V, u_iter, v_iter, np.sqrt(1-eta)*U, np.sqrt(1-eta)*V, eta, theta, lam, lr, gamma1, gamma2, max_iter_f)
    
    Phi = V
    A = (Phi[:, 1:] @ Phi[:, :-1].T) @ pinv(Phi[:, :-1] @ Phi[:, :-1].T)
    C, E = np.vsplit(U, 2)
    return A, C, Phi, Q, R

In [None]:
import numpy as np
import matplotlib.pyplot as plt

def generate_synthetic_lds_data(d, k, T, noise_scale=0.1, random_seed=None):
    if random_seed is not None:
        np.random.seed(random_seed)
    
    A = np.random.randn(k, k)
    A /= np.max(np.abs(np.linalg.eigvals(A))) 
    
    C = np.random.randn(d, k)
    
    z_0 = np.random.randn(k)
    
    Q = noise_scale * np.eye(k)
    R = noise_scale * np.eye(d)
    
    Z = np.zeros((k, T))
    X = np.zeros((d, T))
    Z[:, 0] = z_0
    
    for t in range(1, T):
        Z[:, t] = A @ Z[:, t-1] + np.random.multivariate_normal(np.zeros(k), Q)
        
    for t in range(T):
        X[:, t] = C @ Z[:, t] + np.random.multivariate_normal(np.zeros(d), R)
    
    return A, C, X, Z

d = 5 
k = 3 
T = 100
noise_scale = 0.1
random_seed = 42

A, C, X, Z = generate_synthetic_lds_data(d, k, T, noise_scale, random_seed)
A, C, Phi, Q, R = optimize(X, 1.e-2, k=3)

plt.plot(X.T)
plt.title('Orginal')
plt.show()
plt.plot((C @ Phi).T)
plt.title('Estimated')