In [3]:
import sys
sys.path.append("../src")

import numpy
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import pprint as pp

import sindy
import sindy_data

In [211]:
class PolynomialLibrary:
    def __init__(self, max_degree=2, cross_terms=True):
        self.max_degree = max_degree
        self.cross_terms = cross_terms
        
    def get_candidates(self, dim, feature_names):
        self.feature_names = feature_names
        return [self.__polynomial(degree_sequence) 
                    for degree in range(1,self.max_degree+1)
                        for degree_sequence in self.__get_degree_sequences(degree, dim)]
    
    
    def __polynomial(self, degree_sequence):
        def fn(X):
            terms = torch.stack( tuple(X[:,i]**d for i,d in enumerate(degree_sequence)), axis=1 )
            return torch.prod(terms, dim=1)
        fn_name = ' '.join(self.__display_term(self.feature_names[i],d) for i,d in enumerate(degree_sequence) if d)    
        return (fn, fn_name)
    
    def __display_term(self, feature_name, d):
        if d == 1:
            return f'{feature_name}'
        return f'{feature_name}^{d}'
    
    def __get_degree_sequences(self, degree, num_terms):
        if num_terms == 1:  return [[degree]]
        if degree == 0:     return [[0 for _ in range(num_terms)]]
        res = []
        for d in reversed(range(degree+1)):
            for seq in self.__get_degree_sequences(degree-d, num_terms-1):
                res.append([d, *seq])
        return res

In [212]:
class TrigLibrary:
    def __init__(self):
        self.max_freq = 1
    
    def get_candidates(self, dim, feature_names):
        self.feature_names = feature_names
        return [trig(i) for trig in [self.__sin, self.__cos] for i in range(dim)]

    def __sin(self,i):
        fn      = lambda X: torch.sin(X[:,i])
        fn_name = f'sin({self.feature_names[i]})'
        return (fn, fn_name)

    def __cos(self,i):
        fn      = lambda X: torch.cos(X[:,i])
        fn_name = f'cos({self.feature_names[i]})'
        return (
            fn,
            fn_name
        )

In [213]:
class SINDy(nn.Module):
    def __init__(
        self,
        X, 
        X_dot=None, 
        libs=None, 
        feature_names=None
    ):
        super(SINDy, self).__init__()

        n,m = X.size()
        if feature_names == None:
            feature_names = [f'x{i+1}' for i in range(m)]

        self.X = X
        self.X_dot = X_dot
        self.feature_names = feature_names
        self.num_features  = m
        self.num_snapshots = n

        self.candidate_terms = [ lambda x: torch.ones(self.num_snapshots) ]
        self.candidate_names = ['1']
        for lib in libs:
            lib_candidates = lib.get_candidates(self.num_features, feature_names)
            for term, name in lib_candidates:
                self.candidate_terms.append(term)
                self.candidate_names.append(name)

        self.SINDy_forward = nn.Linear(
            len(self.candidate_terms), 
            self.num_features, 
            bias=False
        )
        
    def library(self):
        print('library candidate terms:')
        return self.candidate_names
    
    def model_parameters(self):
        params = list(self.parameters())[0]
        return params
    
    def theta(self,X):
        return torch.stack(tuple(f(X) for f in self.candidate_terms), axis=1)

    def forward(self):
        theta_X = self.theta(self.X)
        
        # X_dot_predict = f(X) = Θ(X)Ξ = Θ(X)[ ξ1, ξ2, ..., ξn ]
        X_dot_predict = self.SINDy_forward(theta_X)
        return X_dot_predict

In [225]:
t = torch.linspace(0,2,100)

x = 10 * torch.exp(- 0.5 * t)
y = -2 * torch.exp(3 * t)

x_dot = - 0.5 * x
y_dot = 3 * y

X = torch.stack((x,y), dim=-1)
X_dot = torch.stack((x_dot,y_dot), dim=-1)
X[:5]

X_dot[:5,:]
###################################################################################




libs  = [
  PolynomialLibrary(max_degree=1),
  TrigLibrary()
]

sindy = SINDy(
    X, 
    X_dot=X_dot, 
    libs=libs,
    feature_names=['x', 'y']
)

print(sindy.library())
print(sindy.model_parameters())

library candidate terms:
['1', 'x', 'y', 'sin(x)', 'sin(y)', 'cos(x)', 'cos(y)']
Parameter containing:
tensor([[ 0.2261,  0.2323,  0.0628, -0.3528, -0.2212,  0.0820,  0.1814],
        [-0.3231, -0.0957,  0.1266, -0.1699, -0.2336, -0.0842,  0.1476]],
       requires_grad=True)


In [228]:
norm_2 = lambda X: torch.linalg.norm(X)

loss_fn = lambda X, X_pred: norm_2(X - X_pred)
optimizer = torch.optim.Adam(sindy.parameters(), lr=1e-3)

In [229]:
epochs = 10000

for t in range(epochs):
    
    X_dot_pred = sindy()
    loss = loss_fn(X_dot, X_dot_pred)
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    if t % 1000 == 0:
        print(loss)

print(sindy.library())
sindy.model_parameters()

tensor(4432.2881, grad_fn=<CopyBackwards>)
tensor(2011.3049, grad_fn=<CopyBackwards>)
tensor(139.3008, grad_fn=<CopyBackwards>)
tensor(50.0098, grad_fn=<CopyBackwards>)
tensor(14.3307, grad_fn=<CopyBackwards>)
tensor(4.2473, grad_fn=<CopyBackwards>)
tensor(0.2830, grad_fn=<CopyBackwards>)
tensor(0.1685, grad_fn=<CopyBackwards>)
tensor(0.1179, grad_fn=<CopyBackwards>)
tensor(0.1006, grad_fn=<CopyBackwards>)
library candidate terms:
['1', 'x', 'y', 'sin(x)', 'sin(y)', 'cos(x)', 'cos(y)']


Parameter containing:
tensor([[-3.2458e-05, -5.0001e-01,  1.2664e-05,  1.0294e-05, -1.2325e-05,
          1.6318e-05, -1.2232e-05],
        [-6.8776e-03,  8.7697e-04,  3.0000e+00, -7.9391e-04,  9.5288e-05,
          1.2906e-03,  1.2643e-04]], requires_grad=True)

In [42]:
x1 = torch.tensor([
    [1,2],
    [3,4]
])
x2 = x1 * 10
[*x for x in [x1,x2]]

SyntaxError: iterable unpacking cannot be used in comprehension (<ipython-input-42-07de3f337327>, line 6)

In [230]:
class Test(nn.Module):
    def __init__(self):
        super(Test, self).__init__()
        self.Test_forward = nn.Linear(
            10, 
            2, 
            bias=False
        )

    def forward(self, X):
        X_predict = self.Test_forward(X)
        return X_predict

In [240]:
X = torch.randn(1000,10)
X

tensor([[-0.9974,  1.2344,  0.9949,  ..., -1.7614,  0.1701, -0.3766],
        [ 0.3847,  1.0268,  0.3578,  ..., -0.9790,  0.5058, -1.0094],
        [-0.6239, -0.6261,  1.4587,  ...,  0.1791, -0.3129,  0.0420],
        ...,
        [-0.4159,  1.2797,  0.1997,  ..., -0.1224, -0.4279, -0.7940],
        [ 1.4913, -0.2421, -1.5575,  ..., -1.4920, -0.1720,  1.0254],
        [-0.6490, -1.7032,  1.5738,  ..., -0.4284,  0.1467, -0.5945]])

In [243]:
model = Test()

X_pred = model(X)
print(f'X_pred shape: \n{X_pred.size()}')
X_pred

X_pred shape: 
torch.Size([1000, 2])


tensor([[-0.4246,  0.5870],
        [-0.0685, -0.9148],
        [-0.1895,  0.6369],
        ...,
        [-0.4202,  0.4014],
        [-0.3995,  0.3701],
        [ 0.6987, -0.5694]], grad_fn=<MmBackward0>)

In [2]:
t = torch.linspace(0,2,100)

x = 10 * torch.exp(- 0.5 * t)
y = -2 * torch.exp(3 * t)

x_dot = - 0.5 * x
y_dot = 3 * y

X = torch.stack((x,y), dim=-1)
X_dot = torch.stack((x_dot,y_dot), dim=-1)
dX = torch.gradient(X[:5],spacing = (t,))
X[:5]

print(X_dot[:5,:])
print(dX)

RuntimeError: The size of tensor a (98) must match the size of tensor b (3) at non-singleton dimension 0

In [5]:
t = torch.linspace(0,2,20)
f1 = t**2
f2 = torch.exp(-2 * t)
df1 = 2 * t
df2 = -2 * torch.exp(-2 * t)

f = torch.stack((f1,f2), dim=-1).T
f
df = torch.gradient(f, spacing=(t,t))
df_actual = torch.stack((df1, df2), dim=-1)

print(df)
print(df_actual)

RuntimeError: The size of tensor a (18) must match the size of tensor b (0) at non-singleton dimension 0

In [None]:
# class CandidateLibraryGenerator:
#     def __init__(
#       self, 
#       num_snapshots,
#       num_state_variables,
#       max_polynomial_degree = 2,
#       cross_terms = True, 
#       trig = False, 
#       feature_names = None
#     ):
#         if feature_names == None:
#             feature_names = [f'x{i+1}' for i in range(num_snapshots)] 

#         if len(feature_names) != num_state_variables:
#             raise ValueError('number of feature_names does not match number of features')

#         self.feature_names       = feature_names         # feature names
#         self.num_snapshots       = num_snapshots         # number of rows in X
#         self.num_state_variables = num_state_variables   # number of cols in X

#         constant_fn = {
#           'fn': lambda x: torch.ones(self.num_snapshots),
#           'fn_name': '1'
#         }

#         polynomial_terms = self.__get_polynomial_terms(max_polynomial_degree)
#         trig_terms = self.__get_trig_terms(num_state_variables) if trig else []

#         candidates         = [constant_fn, *polynomial_terms, *trig_terms]
#         candidate_fns      = [candidate['fn']      for candidate in candidates]
#         candidate_fn_names = [candidate['fn_name'] for candidate in candidates]

#         self.candidate_functions      = candidate_fns         # 1-d python list of candidate functions
#         self.candidate_function_names = candidate_fn_names    # 1-d python list of candidate function names
#         self.num_candidate_functions  = len(candidate_fns)    # num columns in theta(X)

#     def __call__(self, X):
#         if not isinstance(X, torch.Tensor):
#             raise ValueError('Input X must be of type torch.Tensor') 

#         if X.dim() != 2:
#             raise ValueError('Input X does is not tensor of dim 2.')

#         if X.size() != (self.num_snapshots, self.num_state_variables):
#             raise ValueError(
#                 f'Bad dimensions. Input must be a {self.num_snapshots} x {self.num_state_variables} matrix.'
#             )

#         return torch.stack(tuple(fn(X) for fn in self.candidate_functions), axis=1)

#     '''
#     private methods_____________________________________________
#     '''
#     def __get_polynomial_terms(self, max_degree):
#         return [self.__polynomial(degree_sequence) 
#             for degree in range(1,max_degree+1)
#                 for degree_sequence in self.__get_degree_sequences(degree, self.num_state_variables)]

#     def __get_trig_terms(self, n):
#         return [trig(i) for trig in [self.__sin, self.__cos] for i in range(n)]

#     def __sin(self,i):
#         return {
#             'fn': lambda X: torch.sin(X[:,i]),
#             'fn_name': f'sin({self.feature_names[i]})'
#         }

#     def __cos(self,i):
#         return {
#             'fn': lambda X: torch.cos(X[:,i]),
#             'fn_name': f'cos({self.feature_names[i]})'
#         }

#     def __polynomial(self, degree_sequence):
#         def fn(X):
#             terms = torch.stack( tuple(X[:,i]**d for i,d in enumerate(degree_sequence)), axis=1 )
#             return torch.prod(terms, dim=1)

#         fn_name = ' '.join(f'{self.feature_names[i]}^{d}' for i,d in enumerate(degree_sequence) if d)    
#         return {
#             'fn': fn,
#             'fn_name': fn_name
#         }

#     def __get_degree_sequences(self, degree, num_terms):
#         if num_terms == 1:  return [[degree]]
#         if degree == 0:     return [[0 for _ in range(num_terms)]]
#         res = []
#         for d in reversed(range(degree+1)):
#             for seq in self.__get_degree_sequences(degree-d, num_terms-1):
#                 res.append([d, *seq])
#         return res

#     def __repr__(self):
#         return f'CandidateLibraryGenerator({self.num_snapshots}, {self.num_state_variables})'