In [None]:
%matplotlib nbagg
import math
import numpy as np
import scipy as sp
from scipy import linalg
from scipy import integrate
import matplotlib.pyplot as plt

from actuated_Koopman_EM_4 import ActuatedKoopmanEM

# plt.rcParams['text.usetex'] = True
# plt.rcParams['text.latex.unicode'] = True

# Generate Data from Toy Dynamical System

\begin{equation}
\begin{split}
    \dot{x}_1 &= -\alpha x_1 + u \\
    \dot{x}_2 &= \beta\left( x_1^3 - x_2 \right) \\
    y &= x_2
\end{split}
\end{equation}

\begin{equation}
    \frac{d}{dt}\begin{bmatrix} 1 \\ x_1 \\ x_2 \\ x_1^2 \\ x_1^3 \end{bmatrix} = 
    \begin{bmatrix} 
        0 & 0 & 0 & 0 & 0 \\
        0 & -\alpha & 0 & 0 & 0 \\
        0 & 0 & -\beta & 0 & \beta \\
        0 & 0 & 0 & -2\alpha & 0 \\
        0 & 0 & 0 & 0 & -3\alpha 
    \end{bmatrix}
    \begin{bmatrix} 1 \\ x_1 \\ x_2 \\ x_1^2 \\ x_1^3 \end{bmatrix}
    + u \begin{bmatrix} 
        0 & 0 & 0 & 0 & 0 \\
        1 & 0 & 0 & 0 & 0 \\
        0 & 0 & 0 & 0 & 0 \\
        0 & 2 & 0 & 0 & 0 \\
        0 & 0 & 0 & 3 & 0
    \end{bmatrix}
    \begin{bmatrix} 1 \\ x_1 \\ x_2 \\ x_1^2 \\ x_1^3 \end{bmatrix}
\end{equation}

In [None]:
# state dimension
n = 2

# parameters
alpha = 1.0
beta = 5.0

sig_u = 5.0

sig_noise = 0.01

# time step
dt = 0.01

V0_gt = np.array([[0, 0, 0, 0, 0],
                  [0, -alpha, 0, 0, 0],
                  [0, 0, -beta, 0, beta],
                  [0, 0, 0, -2*alpha, 0],
                  [0, 0, 0, 0, -3*alpha]])
V1_gt = np.array([[0, 0, 0, 0, 0],
                  [1, 0, 0, 0, 0],
                  [0, 0, 0, 0, 0],
                  [0, 2, 0, 0, 0],
                  [0, 0, 0, 3, 0]])

f = lambda x, t: [-alpha*x[0]+u, beta*(np.power(x[0],3) - x[1])]

n_traj = 50
n_t_interval = 50
n_u_interval = 10
n_t = n_u_interval*n_t_interval + 1

Y_data = np.zeros((n_traj, n_t, 1))
U_data = np.zeros((n_traj, n_t-1, 2))
for j in range(n_traj):
    k = 0
    x = np.random.randn(n)
    Y_data[j,k,0] = x[1] + np.sqrt(sig_noise)*np.random.randn()
    for ku in range(n_u_interval):
        u = np.sqrt(sig_u)*np.random.randn()
        sol = sp.integrate.odeint(lambda x, t: [-alpha*x[0]+u, beta*(np.power(x[0],3) - x[1])], 
                                  x, dt*np.arange(0,n_t_interval+1))
        for kt in range(n_t_interval):
            U_data[j,k,0] = 1
            U_data[j,k,1] = u
            k = k + 1
            x = sol[kt+1,:]
            Y_data[j,k,0] = x[1] + np.sqrt(sig_noise)*np.random.randn()

In [None]:
j = np.random.randint(n_traj)

plt.figure()
plt.plot(np.arange(n_t), Y_data[j,:,0], 'b-')
plt.plot(np.arange(n_t-1), U_data[0,:,1], 'g--')
plt.show()

# Implement EM Algorithm

In [None]:
def reflect_eigs(A):
    # relect eigenvalues with magnitude >1 into the unit circle
    lam, Vl, Vr = sp.linalg.eig(A, left=True, right=True)
    w = np.diag(np.dot(np.conj(Vr).T, Vl))
    Vl = Vl/w
    mask = np.absolute(lam) > 1
    lam[mask] = 1.0/lam[mask]
    A_new = np.dot(Vr*lam, np.conj(Vl).T)
    return A_new

In [None]:
# Girko circular law

d = 1000
A = np.random.randn(d,d)/np.sqrt(d)
A_ref = reflect_eigs(A)
lam = np.linalg.eigvals(A)
lam_ref = np.linalg.eigvals(A_ref)

plt.figure()
plt.plot(np.real(lam), np.imag(lam),'bo', label='original')
plt.plot(np.real(lam_ref), np.imag(lam_ref),'kx', label='reflected')
plt.legend()
plt.grid()

plt.show()

In [None]:
z_dim = 5
y_dim = Y_data.shape[2]
u_dim = U_data.shape[2]


## initialization for generators
A0 = reflect_eigs(np.random.randn(z_dim-1,z_dim-1)/np.sqrt(z_dim-1))
A1 = reflect_eigs(np.random.randn(z_dim,z_dim)/np.sqrt(z_dim))

V_matrices = np.zeros((u_dim,z_dim,z_dim))
V_matrices[0,1:,1:] = 0.5 * (A0 - np.eye(z_dim-1))/dt
V_matrices[1,1:,:] = 0.5 * A1[1:,:] / (dt * np.max(np.absolute(U_data[:,:,1])) )


Sig_v = 1.0e-1*np.eye(z_dim-1)

C = np.zeros((y_dim,z_dim-1))
C[0,1] = 1.0

h = np.zeros((y_dim))

Sig_w = 1.0e-1*np.eye(y_dim)

mu_0 = np.zeros(z_dim-1)

Sig_0 = np.eye(z_dim-1)

n_t_train = 250
Y_train = np.ascontiguousarray(Y_data[:,:n_t_train,:])
U_train = np.ascontiguousarray(U_data[:,:n_t_train-1,:])

EM_model = ActuatedKoopmanEM(Y_train, dt*U_train, V_matrices, C, h, Sig_v, Sig_w, mu_0, Sig_0)

# save the model
fname = 'models/toy_model/model_dim{:d}'.format(z_dim)
save_EM_model(EM_model, fname, Y_train, dt*U_train)

fname = 'models/toy_model/init_model_dim{:d}'.format(z_dim)
save_EM_model(EM_model, fname, Y_train, dt*U_train)

## Fit one EM step at a time

In [None]:
# EM step

EM_model.run_EM_step(explicit_time_step = True, optimize_IC=True, \
                     optimize_observation_map=False, optimize_process_noise=True, \
                     compute_log_likelihood=False, bfgs_iter=1)

# Make predictions on the data
Y_pred, Sig_y_pred, Z_pred, Sig_z_pred = EM_model.predict_dynamics(z_0 = np.copy(EM_model.muhats[:,0,:]), 
                                                                   Sig_0 = np.copy(EM_model.Sighats_kk[:,0,:,:]), 
                                                                   u_data = dt*np.copy(U_data), 
                                                                   explicit_time_step = True)
# plot the predictions
j = np.random.randint(n_traj)
s = 0 # observation index

plt.figure()
# plot predicted data
plt.plot(np.arange(n_t), Y_data[j,:,s], 'k-')
plt.plot(np.arange(1,n_t), Y_pred[j,:,s], 'b-')
# 2-sigma confidence envelope
plt.plot(np.arange(1,n_t), Y_pred[j,:,s] + 2*np.sqrt(Sig_y_pred[j,:,s,s]), 'b--')
plt.plot(np.arange(1,n_t), Y_pred[j,:,s] - 2*np.sqrt(Sig_y_pred[j,:,s,s]), 'b--')

plt.axvline(x=n_t_train, ymin=0, ymax=1)
plt.grid()
plt.show()

## Run many steps of EM

In [None]:
L = EM_model.L
delta_L = np.inf
iter = 0
while iter < 100 and delta_L > 1.0e-5*np.absolute(L):
    if iter%10 == 0:
        EM_model.run_EM_step(explicit_time_step = True, optimize_IC=True, \
                         optimize_observation_map=False, optimize_process_noise=True, \
                         compute_log_likelihood=True, bfgs_iter=10)
        delta_L = EM_model.L - L
        L = EM_model.L
    else:
        EM_model.run_EM_step(explicit_time_step = True, optimize_IC=True, \
                         optimize_observation_map=False, optimize_process_noise=True, \
                         compute_log_likelihood=False, bfgs_iter=10)
    
    iter = iter + 1

# Make Predictions

In [None]:
Y_pred, Sig_y_pred, Z_pred, Sig_z_pred = EM_model.predict_dynamics(z_0 = np.copy(EM_model.muhats[:,0,:]), 
                                                                   Sig_0 = np.copy(EM_model.Sighats_kk[:,0,:,:]), 
                                                                   u_data = dt*np.copy(U_data), 
                                                                   explicit_time_step = True)

In [None]:
j = np.random.randint(n_traj)
s = 0 # observation index

fig, axs = plt.subplots(2)
# plot predicted data
axs[0].plot(np.arange(n_t)*dt, Y_data[j,:,s], 'k-')
axs[0].plot(np.arange(1,n_t)*dt, Y_pred[j,:,s], 'b-')
# 2-sigma confidence envelope
axs[0].plot(np.arange(1,n_t)*dt, Y_pred[j,:,s] + 2*np.sqrt(Sig_y_pred[j,:,s,s]), 'b--')
axs[0].plot(np.arange(1,n_t)*dt, Y_pred[j,:,s] - 2*np.sqrt(Sig_y_pred[j,:,s,s]), 'b--')
axs[0].axvline(x=n_t_train*dt, ymin=0, ymax=1)

axs[1].plot(np.arange(n_t-1)*dt, U_data[j,:,1], 'k-')
axs[1].axvline(x=n_t_train*dt, ymin=0, ymax=1)

fig.tight_layout()

# plt.savefig('figures/toy_model/EM_fit_traj_{:d}.png'.format(j), dpi=None, facecolor='w', edgecolor='w', \
#             transparent=False, bbox_inches=None, pad_inches=0.0)
# plt.savefig('figures/toy_model/EM_fit_traj_{:d}.eps'.format(j), dpi=None, facecolor='w', edgecolor='w', \
#             transparent=False, bbox_inches=None, pad_inches=0.0)
plt.show()

In [None]:
lam_gt = np.linalg.eigvals(V0_gt)
lam_mdl = np.linalg.eigvals(EM_model.V_matrices[0,:,:])

plt.figure()
plt.plot(np.real(lam_mdl), np.imag(lam_mdl),'bo',label='model')
plt.plot(np.real(lam_gt), np.imag(lam_gt),'kx',label='ground truth')
plt.legend()
plt.grid()

# plt.savefig('figures/toy_model/EM_model_drift_eigs.png', dpi=None, facecolor='w', edgecolor='w', \
#             transparent=False, bbox_inches=None, pad_inches=0.0)
# plt.savefig('figures/toy_model/EM_model_drift_eigs.eps', dpi=None, facecolor='w', edgecolor='w', \
#             transparent=False, bbox_inches=None, pad_inches=0.0)

plt.show()