In [1]:
import numpy as np
from scipy import sparse
import main_functions as mf
import sys
np.set_printoptions(threshold=sys.maxsize)
import plotly.express as px
import plotly.graph_objects as go
from controlsignalpath import ControlSignalPath
from sklearn.decomposition import NMF

## Synthetic Data

We will generate dynamics with a single A (latent state) and a single control input.

In [2]:
dyns = mf.create_slds(K=1, D_obs=4, D_control=1)
X = dyns.generate(T=1000).squeeze()
X1 = X[:-1,:]
X2 = X[1:,:]

In [3]:
controls = dyns.u_[:,:1]
px.line(controls, title='True Control Signal').update_layout(xaxis_title='time', yaxis_title='input', legend_title='control signal', showlegend=False).show()

## Initialization

In [4]:
def init_U(X1, X2):
    err = (X2 - (X2/X1)*X1).real
    nnmf = NMF(n_components=D_control, init='random', random_state=0)
    W = nnmf.fit_transform(np.abs(err))
    W_max = W.max(axis=1, keepdims=True)
    U_0 = np.divide(W,W_max, where=W_max!=0, out=W)
    return U_0

In [12]:
num_iter = 100 # number of iterations of the algorithm
T, D_obs = X1.shape # dimension of the observation space x time points
D_control = 1 # dimension of the control space
control_density = 0.1 # density of control inputs
all_err = np.zeros((num_iter, 2)) # (num_iter, 2) for 2 errors in each iteration

# initialize U by applying NMF on the frist iteration without control and take the residuals as U
U_0 = init_U(X1, X2) # initial control input

all_U = np.zeros((num_iter, T, D_control)) # all control inputs
all_U[0] = U_0
all_A = np.zeros((num_iter, D_obs, D_obs)) # all A matrices
all_B = np.zeros((num_iter, D_obs, D_control)) # all B matrices
sparsity_pattern = np.zeros_like(U_0, dtype=bool) # sparsity pattern of U

In [6]:
px.line(U_0, title='True Control Signal').update_layout(xaxis_title='time', yaxis_title='input', legend_title='control signal', showlegend=False).show()

## Algorithm

In [11]:
def exact_dmdc(dat, U):
    #least squares DMDc
    X2 = dat #[1:,:]
    print(X2.shape)
    print(dat[:-1,:].shape)
    print(U.shape)
    D_obs = X2.shape[1]
    
    AB = np.linalg.lstsq(np.hstack((dat[:-1,:], U)), X2, rcond=None)[0]
    print(AB.shape)
    A = AB[:, :D_obs]
    B = AB[:, D_obs:]
    return A.T, B.T

In [10]:
from pydmd import DMDc

In [13]:
for i in range(1, num_iter):
    print(f'Iteration {i + 1}/{num_iter}')
    # solve for A and B
    AB = np.linalg.lstsq(np.hstack([X1, all_U[i-1]]), X2, rcond=None)[0]
    A = AB[:D_obs, :].T
    B = AB[D_obs:, :].T

    # compute error
    all_err[i] = np.linalg.norm(A @ X1.T + B @ U_0.T - X2.T, 2)

    if i > 0:
        all_A[i-1] = A
        all_B[i-1] = B

    # solve for U 

    U = np.linalg.lstsq(B, X1.T - A @ X2.T, rcond=None)[0].T

    # update sparsity pattern
    U_nonsparse = U.copy()
    U_nonsparse[~sparsity_pattern] = 0

    U[sparsity_pattern] = 0
    re_up_pattern = np.zeros_like(sparsity_pattern) # re-updated sparsity pattern
    has_converged = False

    for j in range(D_control):
        tmp = U[:, j]
        try:
            threshold = max(np.quantile(tmp[tmp > 0], control_density),
                            np.min(tmp[tmp > 0])) # threshold for sparsity pattern, if control_density is 0.1, then we take the 0.1 quantile of the positive values of the control input as the threshold unless it is smaller than the smallest positive value
        except IndexError:
            pass
        above_threshold = np.abs(tmp) > threshold
        if np.sum(above_threshold) == 0: 
            has_converged = True # if all control inputs are equal to or below the threshold, then the algorithm has converged
        else:
            has_converged = False
        sparsity_pattern[:, j] = ~(above_threshold) # update sparsity pattern, if control input is below threshold
        
    # is this correct
    if has_converged:
        break
    U[sparsity_pattern] = 0 # set control inputs below threshold to 0
    all_U[i] = U
    all_err[i, 1] = np.linalg.norm(A @ X1.T + B @ U.T - X2.T, 'fro') # compute error
    

if has_converged:
    for n in range(num_iter):
        all_U[n] = all_U[i-1]
        all_A[n] = all_A[i-1]
        all_B[n] = all_B[i-1]
else:
    # ask Charlie about this X1,X2 seem to have different dimensions than U 
    #all_A[-1], all_B[-1] = exact_dmdc(X, all_U[-1])
    
    print('not converged')
    dmdc = DMDc(svd_rank=2)
    dmdc.fit(X, all_U[-1].T)
    
    #dmdc = dmd.dmdc(X1.T, X2.T, all_U[-1].T, svd_rank=2)
    #all_A[-1], all_B[-1] = 


Iteration 2/100
Iteration 3/100
Iteration 4/100
Iteration 5/100
Iteration 6/100
Iteration 7/100
Iteration 8/100
Iteration 9/100
Iteration 10/100
Iteration 11/100
Iteration 12/100
Iteration 13/100
Iteration 14/100
Iteration 15/100
Iteration 16/100
Iteration 17/100


In [15]:
dmdc

NameError: name 'dmdc' is not defined

# Calculate the best control signal

In [13]:
control_signal = ControlSignalPath(X, all_A, all_B, all_U, all_err, num_iter)

In [7]:
control_signal.calc_best_control_signal()
best_U = control_signal.U

TypeError: ControlSignalPath.llf_() takes 3 positional arguments but 4 were given

## Visualize

In [17]:
X3 = np.zeros((T, D_obs, 1))
initial_conditions = np.random.randn(A[0].shape[0])
X3[0] = np.array(initial_conditions).reshape(-1, 1)
for i in range(1, T-1):
            X3[i] = A @ X3[i-1] + \
                B @ U[i-1, :].reshape(-1, 1)

In [24]:
dynamics = np.vstack([X3[:, k, 0].flatten() for k in range(D_obs)])
fig = px.line(dynamics.T, title='SLDS with controls Dynamics', labels={'index': 'time', 'value': 'state (e.g. F)'})
fig.add_traces(go.Scatter(y = dyns.z_*max(dynamics[0]), mode='lines', marker=dict(color='#000', size=8), name='subsystem'))
fig.show()