# Parallel HAVOK Analysis of a Lorenz System

In [None]:
from dask.distributed import Client, SSHCluster
import dask.array as da
import dask.bag as db
import numpy as np
import scipy as sp
import matplotlib.pyplot as plt
import pysindy as ps
import control
import control.matlab
import pandas as pd
from matplotlib import cm
from matplotlib import colors
import time
from joblib import parallel_backend

In [None]:
cluster_type = 'jupyter'
#cluster_type = 'command_line'

if cluster_type == 'jupyter':
    
    cluster = SSHCluster(
        ["10.67.22.44","10.67.22.117","10.67.22.156"],
        connect_options={"known_hosts": None}
    )
    
    client = Client(cluster)
    
else: client = Client("10.67.22.44:8786")

### Lorenz data creation

In [None]:
def lorenz(t,y,sig,rho,beta):
    dx = sig*(y[1] - y[0])
    dy = y[0]*(rho - y[2]) - y[1]
    dz = y[0]*y[1] - beta*y[2]
    return [dx,dy,dz]

solver = sp.integrate.solve_ivp(fun=lambda t, y: lorenz(t,y,10,28,8/3),
                                 t_span=[0,1500.1], t_eval=np.arange(0,1500.1,0.001), y0=[-8,8,27], method='LSODA', 
                                 dense_output=False,rtol=1e-18,atol=1e-22)

data_ts = solver.y.T
data_ts = data_ts[:,0]

In [None]:
# Saving into common folder

n_partitions = 10
l=100
len_data = len(data_ts)-l

for i in range(n_partitions):
    data_p = data_ts[i*int(len_data/n_partitions):(i+1)*int(len_data/n_partitions)+100]
    data_p.tofile('DATASET/Lorenz/data1M/data1M.{}.csv'.format(i+1), sep = ' ')

### HAVOK

#### Train import
In this case the whole dataset is saved in `n_partitions` files but only a fraction is loaded into a Dask bag and used as training set.

In [None]:
n_train = 6

x_b = db.from_sequence(['DATASET/Lorenz/data1M/data1M.{}.csv'.format(i+1) for i in range(n_train)]).map(np.loadtxt)
x_b

#### Hankel matrix
The first step of the HAVOK analysis consists of the division of the time series into sliding windows of length `l` and the creation of an Hankel matrix `H`. Since data is partitioned, it is convenient to create an Hankel matrix for every chunk with the condition that the components of each $H_i$ are smoothly connected i.e. the last row of $H_i$ must repeat most of the values contained in the first row of $H_{i+1}$. In this way the concatenation of all of the $H_i$ results into a tall and skinny Hankel matrix.<br>
In order to have a simpler algorithm, data has been saved in advance in a way that this condition is verified (the last `l` data contained in each partition are the first data of the subsequent one).

In [None]:
def Hankel(x_ts):
    l=100
    H = np.zeros((len(x_ts),l))
    for i in range(len(x_ts)-l): H[i,:] = x_ts[i:l+i]
    return H

H = x_b.map(Hankel)

#### Parallel SVD
`H` is a tall and skinny matrix so we can perform the parallel SVD. In this case `U` is a matrix with the same dimensions of `H` but in order to perform the HAVOK analysis only the first `r` columns of it are needed.

In [None]:
U,S,Vt = parallel_SVD(H)

r = 15
u = np.concatenate(np.array(U[:,:,:r]),axis=0)

#### Sparse Regression
The sparse regression is performed locally using a method from `pySindy`.

In [None]:
dt = 0.001
time = np.arange(0,len(v)*dt,dt)

differentiation_method = ps.FiniteDifference(order=2)
feature_library = ps.PolynomialLibrary(degree=1)
optimizer = ps.STLSQ(threshold=0.02)

model = ps.SINDy(
    differentiation_method=differentiation_method,
    feature_library=feature_library,
    optimizer=optimizer
)

model.fit(u,t=time)

M = model.coefficients()
M = M[:,1:r+1]
A = M[:r-1,:r-1]
B = M[:r-1,r-1]

df = pd.DataFrame(np.round(M))
df

In [None]:
fig, ax = plt.subplots(figsize = (6,6))

divnorm=colors.TwoSlopeNorm(vmin=np.min(A), vcenter=0., vmax=np.max(A))
cax = ax.matshow(A, cmap= plt.get_cmap('PiYG'), norm=divnorm)
fig.colorbar(cax)

#### Integration of the linear model
Using `A` and `B` and knowing the forcing term, the linear model can be integrated with the `control.StateSpace` method and compared with the embedded time series.<br>
This is basically a numerical integration of a time series, so the computation can be done separately for each partition using the first row of each $U_i$ as initial conditions.

In [None]:
B = np.reshape(B,(r-1,1)) # control.StateSpace requires B with shape (r-1,1)
sys = control.StateSpace(A,B,np.eye(r-1),0*B)
result = u.map(lambda x: control.matlab.lsim(sys,x[:len(x),-1],np.arange(0,len(x)*dt,dt),x[0,:-1]))\
            .map(extract_Y).compute()

y = np.concatenate(result,axis=0)

As one can see, the linear model perfectly describes the embedded time series. It is also evident that there is a correlation between the forcing term and the switching of the lobe.

In [None]:
fig, ax = plt.subplots(2,1, figsize = (16,8))
ax[0].plot(time[:50000],u[:50000,0],c='black',label='embedded time series')
ax[0].plot(time[:50000],y[:50000,0],c='red',label='linear model',lw=1)
ax[0].set_ylabel('u1',rotation=0)
ax[0].legend(loc='upper right')
ax[1].plot(time[:50000],u[:50000,-1],c='black',lw=1)
ax[1].set_ylabel('f',rotation=0)
ax[1].set_xlabel('time [s]')

#### Online prediction
Since `A` and `B` are constant, once they have been computed using a training set, they can be used to integrate the rest of the dataset to make predictions about the lobe switching. Still, the integration requires the advance knowledge of the forcing term but this can be calculated by convoluting `V` with a slinding window of length `l` of the incoming data.<br>

In [None]:
# load the rest of the data into a Dask bag

n_test = n_partitions - n_train
test_b = db.from_sequence(['DATASET/Lorenz/data1M/data1M.{}.csv'.format(n_train+i+1) for i in range(n_test)])\
            .map(np.loadtxt)
test_b

In [None]:
def Convolution(x,V1,q,r):
    U = np.zeros((len(x)-100,r))
    for i in range(len(x)-100):
        U[i,:] = np.dot(x[i:i+q],V1.T)
    U = U[:,:r]
    return U

def Normalize(u_conv,S,r):
    for i in range(r): u_conv[:,i] = u_conv[:,i]/S[i]
    return u_conv
   
def extract_Y(M):
    Y = M[:][0]
    return Y

V1 = Vt[:r,:]
u_conv = test_b.map(lambda x: Convolution(x,V1,100,r)).map(lambda x: Normalize(x,S,r))

sys_conv = control.StateSpace(A,B,np.eye(r-1),0*B)
result = u_conv.map(lambda x: control.matlab.lsim(sys_conv,x[:len(x),-1],np.arange(0,len(x)*dt,dt),x[0,:-1]))\
            .map(extract_Y).compute()

y_pred = np.concatenate(result,axis=0)

In [None]:
fig, ax = plt.subplots(2,1, figsize = (16,8))
ax[0].plot(time[:50000],y_pred[:50000,0],c='black')
ax[0].set_ylabel('u1',rotation=0)
ax[0].legend(loc='upper right')
ax[1].plot(time[:50000],u_conv[:50000,r-1],c='black',lw=1)
ax[1].set_ylabel('f',rotation=0)
ax[1].set_xlabel('time [s]')

By setting a threshold in the forcing term it is possible to predict the chaotic behaviour of the Lorenz system i.e. the lobe switching.

In [None]:
skip = 500
thresh = 0.0019
mask = np.abs(u[:,-1]) > thresh

i = 0
prec = i
while i < len(mask) - skip:
    if mask[i] == True: 
        mask[i:i+skip] = [True for i in range(skip)]
        i += skip
    i += 1

not_mask = [not x for x in mask]

u_lin = np.copy(u_conv[:,0])
u_lin[mask] = np.nan
u_for = np.copy(u_conv[:,0])
u_for[not_mask] = np.nan

ur_lin = np.copy(u_conv[:,-1])
ur_lin[mask] = np.nan
ur_for = np.copy(u_conv[:,-1])
ur_for[not_mask] = np.nan

fig, ax = plt.subplots(2,1,figsize=(16,8))
ax[0].plot(time[:50000],u_lin[:50000],c='black',lw=1)
ax[0].plot(time[:50000],u_for[:50000],c='red',lw=1)
ax[0].axhline(0,lw=0.5,c="dimgrey")
ax[0].set_ylabel('u1',rotation=0)
ax[1].plot(time[:50000],ur_lin[:50000],c='black',lw=1)
ax[1].plot(time[:50000],ur_for[:50000],c='red',lw=1)
ax[1].axhline(thresh_r,lw=0.5,c="darkred",ls='--')
ax[1].axhline(-thresh_r,lw=0.5,c="darkred",ls='--')
ax[1].set_ylabel('f',rotation=0)
ax[1].set_xlabel('time [s]',rotation=0)
ax[1].set_ylim(-0.025,0.025)

### Closing session

In [None]:
client.shutdown()

In [None]:
client.close()