### Dependencies

In [1]:
import sys, os

import numpy as np
import torch as T
from scipy.integrate import solve_ivp

from dataclasses import dataclass

import torch_optimizer as optim_all

import matplotlib.pyplot as plt
plt.style.use("seaborn-v0_8-whitegrid")

### Physical Model

In [3]:
def build_true_model(x, t, omega=1., zeta=0.05, F0=1., forcing_freq=1.2, friction_force_ratio=0.5):
    """
    A function that gets the displacement, velocity and time as an input, and returns the true vector field output (velocity and acceleration)

    Parameters
    ----------
    x : ndarray 

    t : ndarray
    
    omega : float
    
    zeta : float
    
    F0 : float

    forcing_freq : float

    friction_force_ratio : float

    Returns
    -------
    ndarray
        an array with the two vector field values, for the given input
    """
    return [
        x[1],
        - 2 * omega * zeta * x[1]
        - omega ** 2 * x[0]
        - friction_force_ratio * np.sign(x[1])
        + F0 * np.cos(forcing_freq * t)
    ]

### Coefficients dictionary

In [4]:
class CoeffsDictionary(T.nn.Module):
    """
    A class for initializing, storing, and updating the ksi coefficients
    These coefficients are the linear weights of a neural network
    The class inherits from the torch.nn.Module
    """
    def __init__(self, n_combinations):

        super(CoeffsDictionary, self).__init__()
        self.linear = T.nn.Linear(n_combinations, 1, bias=False)
        # Setting the weights to zeros
        self.linear.weight = T.nn.Parameter(0 * self.linear.weight.clone().detach())
    
    def forward(self, x):
        
        return self.linear(x)

### Features function

In [90]:
def apply_features(x, t, poly_order=2, phases=(), sgn_flag=False, torch_flag=True):
    """
    Applies the feature candidates to the given data

    Parameters
    ----------
    x : torch.Tensor

    t : torch.Tensor
    
    poly_order : int
    
    phases : iterable
    
    sgn_flag : bool
    
    torch_flag : bool

    Returns
    -------
    numpy.ndarray or torch.Tensor
    
    """
    ret = T.column_stack(
        (
            *[T.cos(ph * t) for ph in phases], # trigonometric features
            *[T.sign(x[:, k]) for k in (0, 1)], # signum features
            *[T.ones(size=(x.shape[0],)), x[:, 0], x[:, 1],  
                        x[:, 0] ** 2, x[:, 0] * x[:, 1], x[:, 1] ** 2, 
                        x[:, 0] ** 3, x[:, 0] ** 2 * x[:, 1], x[:, 0] * x[:, 1] ** 2, x[:, 1] ** 3,]# polynomial features
        )
    )

    if torch_flag:
        return ret
    else:
        return np.array(ret)

### 4th order Runge-Kutta constraint function

In [91]:
def apply_rk4_SparseId(x, LibsCoeffs, times, timesteps, Params, paramSize, phases):
    """
    A function that applies the fourth order Runge-Kutta scheme to the given data in order to derive the ones in the following timestep
    During this process the approximate derivatives are used

    Parameters
    ----------
    x : torch.Tensor

    
    """
    d1 = apply_features(x, times, phases=phases)
    k1 = T.column_stack((x[:, 1].unsqueeze(1), (- 2 * Params.zeta * Params.omega * x[:, 1] - Params.omega ** 2 * x[:, 0]).unsqueeze(1) + T.cos(Params.forcing_freq * times) + LibsCoeffs(d1)))
    
    xtemp = x + 0.5 * timesteps * k1
    d2 = apply_features(xtemp, times + 0.5 * timesteps, phases=phases)
    k2 = T.column_stack((xtemp[:, 1].unsqueeze(1), (- 2 * Params.zeta * Params.omega * xtemp[:, 1] - Params.omega ** 2 * xtemp[:, 0]).unsqueeze(1) + T.cos(Params.forcing_freq * (times + 0.5 * timesteps)) + LibsCoeffs(d2)))

    xtemp = x + 0.5 * timesteps * k2
    d3 = apply_features(xtemp, times + 0.5 * timesteps, phases=phases)
    k3 = T.column_stack((xtemp[:, 1].unsqueeze(1), (- 2 * Params.zeta * Params.omega * xtemp[:, 1] - Params.omega ** 2 * xtemp[:, 0]).unsqueeze(1) + T.cos(Params.forcing_freq * (times + 0.5 * timesteps)) + LibsCoeffs(d3)))

    xtemp = x + timesteps * k3
    d4 = apply_features(xtemp, times + timesteps, phases=phases)
    k4 = T.column_stack((xtemp[:, 1].unsqueeze(1), (- 2 * Params.zeta * Params.omega * xtemp[:, 1] - Params.omega ** 2 * xtemp[:, 0]).unsqueeze(1) + T.cos(Params.forcing_freq * (times + 0.5 * timesteps)) + LibsCoeffs(d4)))

    return x + (1 / 6) * (k1 + 2 * k2 + 2 * k3 + k4) * timesteps

### Learning function

In [95]:
def learn_sparse_model(coeffs, train_set, times, params, param_size, phases=(1.1, 1.2, 1.3), lr_reduction=10):
    """"
    A function that calculates which ksi coefficients lead to optimal prediction
    The updating of the coefficients is performed in a deep learning fashion

    Parameters
    ----------
    coeffs : CoeffsDictionary object

    train_set : torch.Tensor

    times : torch.Tensor

    params : parameters dataclass

    param_size : int

    phases : tuple

    lr_reduction : int

    Returns
    -------
    coeffs

    loss_track
    """
    # Define optimizer
    opt_func = optim_all.RAdam(
        coeffs.parameters(), lr=params.lr, weight_decay=params.weightdecay
    )
    # Define loss function
    criteria = T.nn.MSELoss()
    # pre-allocate memory for loss_fuction
    loss_track = np.zeros((params.num_iter, params.num_epochs))

    # Training 

    for p in range(params.num_iter):
        for g in range(params.num_epochs):
            coeffs.train()

            opt_func.zero_grad()

            loss_new = T.autograd.Variable(T.tensor([0.0], requires_grad=True))
            weights = 2 ** (-0.5 * T.linspace(0, 0, 1))

            timesteps_i = T.tensor(np.diff(times, axis=0)).float()
            y_total = train_set

            # One forward step predictions

            y_pred = apply_rk4_SparseId(y_total[:-1], coeffs, times[:-1], timesteps=timesteps_i, Params=params, paramSize=param_size, phases=phases)

            # One backward step predictions
            y_pred_back = apply_rk4_SparseId(y_total[1:], coeffs, times[1:], timesteps=-timesteps_i, Params=params, paramSize=param_size, phases=phases)

            loss_new += criteria(y_pred, y_total[1:]) + weights[0] * criteria(
                y_pred_back, y_total[:-1]
            )

            # loss_new /= y[0].shape[0]
            loss_track[p, g] += loss_new.item()
            loss_new.backward()
            opt_func.step()

            sys.stdout.write("\r [Iter %d/%d] [Epoch %d/%d] [Training loss: %.2e] [Learning rate: %.2e]" % (p + 1, params.num_iter, g + 1, params.num_epochs, loss_track[p, g], opt_func.param_groups[0]["lr"],))

        # Removing the coefficients smaller than tol and set gradients w.r.t. them to zero
        # so that they will not be updated in the iterations
        Ws = coeffs.linear.weight.detach().clone()
        Mask_Ws = (Ws.abs() > params.tol_coeffs).type(T.float)
        coeffs.linear.weight = T.nn.Parameter(Ws * Mask_Ws)

        coeffs.linear.weight.register_hook(lambda grad: grad.mul_(Mask_Ws))
        new_lr = opt_func.param_groups[0]["lr"] / lr_reduction
        opt_func = optim_all.RAdam(coeffs.parameters(), lr=new_lr, weight_decay=params.weightdecay)

    return coeffs, loss_track

#### Random seed

In [93]:
randSeed = 42

T.manual_seed(randSeed)
np.random.seed(seed=randSeed)

### Parameters dataclass

In [94]:
@dataclass
class parameters:
    bs: int = 1
    num_epochs: int = 1000
    num_iter = 3
    lr: float = 1e-2
    save_model_path: str = "./Results/Refactored/"
    weightdecay: float = 0.0
    timefinal: float = 50.0
    timestep: float = 5e-3
    normalize: bool = False
    tol_coeffs: float = 5e-2
    noise_level: float = 1e-1
    noisy_input_flag: bool = False
    omega: float = 1.0
    zeta: float = 0.05
    omega_noise: float = 1e-1
    zeta_noise: float = 2e-1
    model: str = "friction" # "free", "forced", "friction"
    forcing_freq: float = 1.2
    friction_ratio: float = 0.5
    
Params = parameters()

if Params.noisy_input_flag:
    Params.omega = np.random.normal(loc=Params.omega, scale=Params.omega_noise)
    Params.zeta = np.random.normal(loc=Params.zeta, scale=Params.zeta_noise)

os.makedirs(os.path.dirname(Params.save_model_path), exist_ok=True)

### Generate (noisy) measurements - Training Data

In [None]:
ts = np.arange(0, Params.timefinal, Params.timestep)

# Initial condition and simulation time
x0 = [0.1, 0.1]

# Solve the equation
sol = solve_ivp(lambda t, x: build_true_model(x, t, friction_force_ratio=Params.friction_ratio, forcing_freq=Params.forcing_freq), t_span=[ts[0], ts[-1]], y0=x0, t_eval=ts)

x = np.transpose(sol.y)

# Generate noisy measurements
x = np.random.normal(loc=x, scale=Params.noise_level * np.abs(x), size=x.shape)

In [None]:
# Define dataloaders
train_dset = T.tensor(x).float()
times = T.tensor(ts).unsqueeze(1).float()

phases = np.arange(1., 2.01, 0.1)
no_of_terms = apply_features(train_dset[:2], times[:2], phases=phases).shape[1]

Coeffs = CoeffsDictionary(no_of_terms)

# # Learning Coefficients
Coeffs, loss_track = learn_sparse_model(Coeffs, train_dset, times, Params, paramSize=no_of_terms, phases=phases)
Learned_Coeffs = Coeffs.linear.weight.detach().clone().t().numpy()

Learned_Coeffs