In [1]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.integrate import odeint
from sklearn.linear_model import Ridge
from matplotlib.colors import Normalize
import networkx as nx
from scipy.signal import welch
import matplotlib.cm as cm
import seaborn as sns
import torch
import torch.nn as nn
from torch.optim import Adam
from collections import defaultdict
from sklearn.neighbors import NearestNeighbors
# import neurokit2 as nk
import matplotlib.pyplot as plt
import numpy as np
import os
import json
import itertools
from sklearn.preprocessing import MinMaxScaler
from tqdm import tqdm

## LSTM

In [2]:
class LSTMBaseline3D:
    """
    Lightweight single-layer LSTM for 3-dim Lorenz forecasting.
    * hidden_size=32 → ~4.8k trainable parameters
    * fit() trains in teacher-forcing mode
    * predict() produces autoregressive roll-out
    """

    def __init__(self,
                 input_dim:  int = 3,
                 hidden_size: int = 37,
                 output_dim: int = 3,
                 lr: float = 1e-3,
                 epochs: int = 30,
                 device: str = 'cpu',
                 seed: int = 0):
        torch.manual_seed(seed); np.random.seed(seed)

        self.device  = torch.device(device)
        self.epochs  = epochs
        self.model   = nn.LSTM(input_dim, hidden_size,
                               batch_first=True).to(self.device)
        self.head    = nn.Linear(hidden_size, output_dim).to(self.device)
        self.crit    = nn.MSELoss()
        self.optim   = Adam(list(self.model.parameters())+
                            list(self.head.parameters()), lr=lr)
        
    def total_parameters(self):
        total = 0
        for param in list(self.model.parameters()) + list(self.head.parameters()):
            total += param.numel()
        return total

    # ---------------------------------------------------------
    @torch.no_grad()
    def _init_hidden(self, batch_sz=1):
        h0 = torch.zeros(1, batch_sz,
                         self.model.hidden_size,
                         device=self.device)
        c0 = torch.zeros_like(h0)
        return (h0, c0)

    # ---------------------------------------------------------
    def fit(self, x_np: np.ndarray, y_np: np.ndarray):
        """
        x_np shape [T, 3]  (input  at t)
        y_np shape [T, 3]  (target at t)
        """
        x = torch.tensor(x_np, dtype=torch.float32,
                         device=self.device).unsqueeze(0)  # [1,T,3]
        y = torch.tensor(y_np, dtype=torch.float32,
                         device=self.device).unsqueeze(0)

        for _ in range(self.epochs):
            self.optim.zero_grad()
            out, _ = self.model(x, self._init_hidden())
            pred   = self.head(out)
            loss   = self.crit(pred, y)
            loss.backward()
            self.optim.step()

    # ---------------------------------------------------------
    @torch.no_grad()
    def predict(self, init_u: np.ndarray, n_steps: int):
        """
        Autoregressive roll-out.
        init_u : initial 3-vector (last known sample)
        Returns array of shape [n_steps, 3].
        """
        self.model.eval(); self.head.eval()

        inp     = torch.tensor(init_u[None, None, :],
                               dtype=torch.float32, device=self.device)
        h, c    = self._init_hidden()
        preds   = np.empty((n_steps, 3), dtype=np.float32)

        for t in range(n_steps):
            out, (h, c) = self.model(inp, (h, c))
            y           = self.head(out)
            preds[t]    = y.squeeze(0).cpu().numpy()
            inp         = y.detach()    # feed prediction back

        return preds
    
    @torch.no_grad()
    def predict_open_loop(self, x_np: np.ndarray):
        """
        Open-loop prediction using teacher-forced inputs (like during training).
        x_np shape: [T, 3] – input sequence
        Returns:
            preds: [T, 3] – predicted output sequence
        """
        self.model.eval(); self.head.eval()

        x = torch.tensor(x_np, dtype=torch.float32,
                         device=self.device).unsqueeze(0)  # [1, T, 3]
        out, _ = self.model(x, self._init_hidden())
        preds = self.head(out).squeeze(0).cpu().numpy()  # [T, 3]

        return preds

if __name__ == "__main__":
    model = LSTMBaseline3D()
    print(f"Total trainable parameters: {model.total_parameters()}")

# lstm_baseline = LSTMBaseline3D(
#                     hidden_size=38,         # parameter budget ~ 4800
#                     lr=1e-3,
#                     epochs=100,
#                     device='cuda' if torch.cuda.is_available() else 'cpu',
#                     seed=45)
# lstm_baseline.fit(train_input, train_target)

# # one-step roll-out to build an initial vector for auto-regressive mode
# init_vec = train_target[-1]                # last teacher-forced target
# lstm_preds = lstm_baseline.predict(init_vec,
#                                    n_steps=len(test_input))
# lstm_preds_open_loop = lstm_baseline.predict_open_loop(test_input)


Total trainable parameters: 6330


In [3]:
def evaluate_nrmse(all_preds, test_target, horizons):
    """
    Evaluate model performance over multiple prediction horizons
    for teacher-forced single-step forecasting or autoregressive rollout.
    """
    horizon_nrmse = {}
    for horizon in horizons:
        preds = all_preds[:horizon]
        targets = test_target[:horizon]
        squared_errors = (preds - targets) ** 2
        variance = np.var(targets, axis=0)
        variance[variance == 0] = 1e-8  # avoid divide-by-zero
        nrmse = np.sqrt(np.sum(squared_errors) / (horizon * np.sum(variance)))
        horizon_nrmse[horizon] = nrmse
    return horizon_nrmse

In [4]:
def compute_valid_prediction_time(y_true, y_pred, t_vals, threshold, lambda_max, dt):
    """
    Compute the Valid Prediction Time (VPT) and compare it to Lyapunov time T_lambda = 1 / lambda_max.
    
    Parameters
    ----------
    y_true : ndarray of shape (N, dim)
        True trajectory over time.
    y_pred : ndarray of shape (N, dim)
        Model's predicted trajectory over time (closed-loop).
    t_vals : ndarray of shape (N,)
        Time values corresponding to the trajectory steps.
    threshold : float, optional
        The error threshold, default is 0.4 as in your snippet.
    lambda_max : float, optional
        Largest Lyapunov exponent. Default=0.9 for Lorenz.
        
    Returns
    -------
    T_VPT : float
        Valid prediction time. The earliest time at which normalized error surpasses threshold
        (or the last time if never surpassed).
    T_lambda : float
        Lyapunov time = 1 / lambda_max
    ratio : float
        How many Lyapunov times the model prediction remains valid, i.e. T_VPT / T_lambda.
    """
    # 1) Average of y_true
    y_mean = np.mean(y_true, axis=0)  # shape (dim,)
    
    # 2) Time-averaged norm^2 of (y_true - y_mean)
    y_centered = y_true - y_mean
    denom = np.mean(np.sum(y_centered**2, axis=1))  # scalar
    
    # 3) Compute the normalized error delta_gamma(t) = ||y_true - y_pred||^2 / denom
    diff = y_true - y_pred
    err_sq = np.sum(diff**2, axis=1)  # shape (N,)
    delta_gamma = err_sq / denom      # shape (N,)
    
    # 4) Find the first time index where delta_gamma(t) exceeds threshold
    idx_exceed = np.where(delta_gamma > threshold)[0]
    if len(idx_exceed) == 0:
        # never exceeds threshold => set T_VPT to the final time
        T_VPT = t_vals[-1]
    else:
        T_VPT = t_vals[idx_exceed[0]]
    
    # 5) Compute T_lambda and ratio
    T_lambda = 1.0 / lambda_max

    # print(f"\n--- Valid Prediction Time (VPT) with threshold={threshold}, lambda_max={lambda_max} ---")

    T_VPT = (T_VPT - t_vals[0])  # Adjust T_VPT to be relative to the start time
    ratio = T_VPT / T_lambda

    return T_VPT, T_lambda, ratio

In [5]:
def compute_attractor_deviation(predictions, targets, cube_size=(0.1, 0.1, 0.1)):
    """
    Compute the Attractor Deviation (ADev) metric.

    Parameters:
        predictions (numpy.ndarray): Predicted trajectories of shape (n, 3).
        targets (numpy.ndarray): True trajectories of shape (n, 3).
        cube_size (tuple): Dimensions of the cube (dx, dy, dz).

    Returns:
        float: The ADev metric.
    """
    # Define the cube grid based on the range of the data and cube size
    min_coords = np.min(np.vstack((predictions, targets)), axis=0)
    max_coords = np.max(np.vstack((predictions, targets)), axis=0)

    # Create a grid of cubes
    grid_shape = ((max_coords - min_coords) / cube_size).astype(int) + 1

    # Initialize the cube occupancy arrays
    pred_cubes = np.zeros(grid_shape, dtype=int)
    target_cubes = np.zeros(grid_shape, dtype=int)

    # Map trajectories to cubes
    pred_indices = ((predictions - min_coords) / cube_size).astype(int)
    target_indices = ((targets - min_coords) / cube_size).astype(int)

    # Mark cubes visited by predictions and targets
    for idx in pred_indices:
        pred_cubes[tuple(idx)] = 1
    for idx in target_indices:
        target_cubes[tuple(idx)] = 1

    # Compute the ADev metric
    adev = np.sum(np.abs(pred_cubes - target_cubes))
    return adev

## TCN

In [6]:
class TCNBaseline3D(nn.Module):
    """
    2-layer causal TCN       (kernel=3, dilation=1 & 2, padding chosen
    so receptive field = 5 time-steps).
    ----------------------
    • input_dim  = 3
    • hidden_dim = 32  → total ≈ 4.9 k parameters
    • output_dim = 3    (one-step prediction)
    """
    def __init__(self,
                 input_dim:  int = 3,
                 hidden_dim: int = 32,
                 output_dim: int = 3,
                 lr: float = 1e-3,
                 epochs: int = 40,
                 device: str = "cpu",
                 seed: int = 0):
        super().__init__()
        torch.manual_seed(seed); np.random.seed(seed)

        k = 3  # kernel
        # layer 1: dilation 1  → pad 2 to keep length
        self.conv1 = nn.Conv1d(input_dim, hidden_dim,
                               kernel_size=k,
                               dilation=1,
                               padding=2,
                               bias=True)
        # layer 2: dilation 2  → pad 4
        self.conv2 = nn.Conv1d(hidden_dim, hidden_dim,
                               kernel_size=k,
                               dilation=2,
                               padding=4,
                               bias=True)
        self.relu  = nn.ReLU()
        self.head  = nn.Conv1d(hidden_dim, output_dim,
                               kernel_size=1, bias=True)

        self.lr, self.epochs = lr, epochs
        self.to(device)
        self.optim = Adam(self.parameters(), lr=lr)
        self.crit  = nn.MSELoss()
        
    def total_parameters(self):
        total = 0
        for param in list(self.model.parameters()) + list(self.head.parameters()):
            total += param.numel()
        return total

    # ---------------------------------------------------------
    def forward(self, x):
        """
        x shape  [B, T, 3]  (batch, time, channels)
        return  [B, T, 3]
        """
        # reshape to Conv1d convention: (B, C, T)
        x = x.permute(0, 2, 1)
        y = self.conv1(x); y = self.relu(y[:, :, :-2])     # remove look-ahead pad
        y = self.conv2(y); y = self.relu(y[:, :, :-4])     # remove look-ahead pad
        out = self.head(y).permute(0, 2, 1)                # back to (B,T,C)
        return out

    # ---------------------------------------------------------
    def fit(self, x_np: np.ndarray, y_np: np.ndarray):
        """
        Teacher-forcing on entire sequence (batch size = 1).
        x_np, y_np shape [T, 3]
        """
        x = torch.tensor(x_np[None], dtype=torch.float32, device=next(self.parameters()).device)
        y = torch.tensor(y_np[None], dtype=torch.float32, device=next(self.parameters()).device)

        for _ in range(self.epochs):
            self.optim.zero_grad()
            pred = self.forward(x)
            loss = self.crit(pred[:, :-1], y[:, 1:])  # predict next step
            loss.backward()
            self.optim.step()

    # ---------------------------------------------------------
    @torch.no_grad()
    def predict(self, init_window: np.ndarray, n_steps: int):
        """
        Autoregressive roll-out.
        init_window : length L≥5, shape [L,3] (latest samples, earliest first)
        Returns      : [n_steps,3]
        """
        device = next(self.parameters()).device
        window = init_window.copy()
        preds  = np.empty((n_steps, 3), dtype=np.float32)

        for t in range(n_steps):
            inp = torch.tensor(window[None], dtype=torch.float32, device=device)
            y   = self.forward(inp)[0, -1].cpu().numpy()
            preds[t] = y
            window   = np.vstack([window[1:], y])  # slide window

        return preds
    
    @torch.no_grad()
    def predict_open_loop(self, x_np: np.ndarray):
        """
        Open-loop prediction (teacher-forced inputs).
        x_np shape: [T, 3]
        Returns:
            preds: [T - 1, 3] – one-step-ahead predictions
        """
        self.eval()
        device = next(self.parameters()).device

        x = torch.tensor(x_np[None], dtype=torch.float32, device=device)  # [1, T, 3]
        preds = self.forward(x)  # [1, T, 3]
        preds = preds[:, :-1]    # predict from t=0 to t=T-2 for target t=1 to t=T-1

        return preds.squeeze(0).cpu().numpy()
    

if __name__ == "__main__":
    model = LSTMBaseline3D()
    print(f"Total trainable parameters: {model.total_parameters()}")


# tcn = TCNBaseline3D(hidden_dim=32, epochs=50, lr=1e-3, device="cpu", seed=46)
# tcn.fit(train_input, train_target)

# # initial window must be >4 samples:
# init_win = test_input[:5].copy()
# tcn_preds = tcn.predict(init_win, n_steps=len(test_target))
# tcn_preds_open_loop = tcn.predict_open_loop(test_input)

Total trainable parameters: 6330


### Canonical Datasets

In [7]:
def lorenz_deriv(state, t, sigma=10.0, rho=28.0, beta=8.0/3.0):
    x, y, z = state
    dxdt = sigma * (y - x)
    dydt = x*(rho - z) - y
    dzdt = x*y - beta*z
    return [dxdt, dydt, dzdt]

def generate_lorenz_data(
    initial_state=[1.0, 1.0, 1.0],
    tmax=25.0,
    dt=0.01,
    sigma=10.0,
    rho=28.0,
    beta=8.0/3.0
):
    num_steps = int(tmax / dt) + 1 # +1 to include t=0
    t_vals = np.linspace(0, tmax, num_steps)
    sol = odeint(lorenz_deriv, initial_state, t_vals, args=(sigma, rho, beta))
    return t_vals, sol

def rossler_derivatives(state, t, a=0.2, b=0.2, c=5.7):
    """Compute time derivatives [dx/dt, dy/dt, dz/dt] for the Rössler system."""
    x, y, z = state
    dxdt = -y - z
    dydt = x + a * y
    dzdt = b + z * (x - c)
    return [dxdt, dydt, dzdt]

def generate_rossler_data(
    initial_state=[1.0, 0.0, 0.0],
    tmax=25.0,
    dt=0.01,
    a=0.2,
    b=0.2,
    c=5.7
):
    """
    Numerically integrate Rössler equations x'(t), y'(t), z'(t) using odeint.
    Returns:
       t_vals: array of time points
       sol   : array shape [num_steps, 3] of [x(t), y(t), z(t)]
    """
    num_steps = int(tmax / dt)
    t_vals = np.linspace(0, tmax, num_steps)
    sol = odeint(rossler_derivatives, initial_state, t_vals, args=(a, b, c))
    return t_vals, sol

def chen_deriv(state, t, a=35.0, b=3.0, c=28.0):
    """
    Computes derivatives [dx/dt, dy/dt, dz/dt] for Chen system:
      dx/dt = a*(y - x)
      dy/dt = (c - a)*x + c*y - x*z
      dz/dt = x*y - b*z
    """
    x, y, z = state
    dxdt = a*(y - x)
    dydt = (c - a)*x + c*y - x*z
    dzdt = x*y - b*z
    return [dxdt, dydt, dzdt]

def generate_chen_data(
    initial_state=[1.0, 1.0, 1.0],
    tmax=50.0,
    dt=0.01,
    a=35.0,
    b=3.0,
    c=28.0
):
    """
    Integrates Chen's system from 'initial_state' up to time 'tmax' with step size 'dt'.
    Returns:
      t_vals: time array of length T
      sol   : array shape [T, 3], the trajectory [x(t), y(t), z(t)]
    """
    num_steps = int(tmax / dt)
    t_vals = np.linspace(0, tmax, num_steps)
    sol = odeint(chen_deriv, initial_state, t_vals, args=(a, b, c))
    return t_vals, sol

In [8]:
grid = {
    "input_dim": [3],
    "hidden_size": [37],
    "output_dim": [3],
    "lr": [1e-3],
    "epochs": [50],
    "device": ['cuda'],
}

In [9]:
def run_grid_search(model_class, param_grid, model_name,
                    output_path="grid_search_results.json", f=generate_chen_data, lambda_max=0.9):
    combos = list(itertools.product(*param_grid.values()))
    param_keys = list(param_grid.keys())
    print(f"\n== Initial grid search for {model_name} with {len(combos)} combinations ==")

    results = []
    # horizons = list(range(10, 1001, 10))
    horizons = [200, 400, 600, 800, 1000]
    

    for comb in tqdm(combos, desc="Grid Search"):
        params = dict(zip(param_keys, comb))
        # seed_scores_vpt = []
        horizon_nrmse_all = {h: [] for h in horizons}
        # adev_scores = []
        # ldev_scores = []

        for initial_state in [[1.0, 1.0, 1.0], [1.0, 2.0, 3.0], [2.0, 1.5, 4.0]]:
            tmax = 250
            dt = 0.02
            t_vals, lorenz_traj = f(
                initial_state=initial_state,
                tmax=tmax,
                dt=dt
            )

            washout = 2000
            t_vals = t_vals[washout:]
            lorenz_traj = lorenz_traj[washout:]

            scaler = MinMaxScaler()
            scaler.fit(lorenz_traj)
            lorenz_traj = scaler.transform(lorenz_traj)

            T_data = len(lorenz_traj)
            for train_frac in [0.7, 0.75, 0.8]:
                train_end = int(train_frac * (T_data - 1))
                train_input = lorenz_traj[:train_end]
                train_target = lorenz_traj[1:train_end + 1]
                test_input = lorenz_traj[train_end:-1]
                test_target = lorenz_traj[train_end + 1:]
                n_test_steps = len(test_input)
                initial_in = test_input[0]

                for seed in np.arange(1, 5):
                    model = model_class(**params, seed=seed)
                    model.fit(train_input, train_target)
                    preds = model.predict(initial_in, n_test_steps)

                    # T_VPT_s, _, ratio = compute_valid_prediction_time(test_target, preds, t_vals, 0.4, lambda_max, dt)
                    # seed_scores_vpt.append(ratio)

                    horizon_nrmse = evaluate_nrmse(preds, test_target, horizons)
                    for h in horizons:
                        horizon_nrmse_all[h].append(horizon_nrmse[h])

                    # adev = compute_attractor_deviation(preds, test_target)
                    # adev_scores.append(adev)

                    # ldev = compute_lyapunov_exponent("Lorenz", preds, dt)
                    # ldev_scores.append(ldev)

        # mean_vpt = float(np.mean(seed_scores_vpt))
        # std_vpt = float(np.std(seed_scores_vpt))
        mean_nrmse_dict = {str(h): float(np.mean(horizon_nrmse_all[h])) for h in horizons}
        std_nrmse_dict  = {str(h): float(np.std(horizon_nrmse_all[h]))  for h in horizons}
        # mean_adev = float(np.mean(adev_scores))
        # std_adev = float(np.std(adev_scores))
        # mean_ldev = float(np.mean(ldev_scores))
        # std_ldev = float(np.std(ldev_scores))

        results.append({
            "params": params,
            # "seed_scores_T_VPT": seed_scores_vpt,
            # "mean_T_VPT": mean_vpt,
            # "std_T_VPT": std_vpt,
            "mean_NRMSEs": mean_nrmse_dict,
            "std_NRMSEs": std_nrmse_dict,
            # "mean_ADev": mean_adev,
            # "std_ADev": std_adev,
            # "mean_LDev": mean_ldev,
            # "std_LDev": std_ldev
        })

    with open(output_path, "w") as f:
        json.dump(results, f, indent=2)
    print(f"\nAll results saved to `{output_path}`")

    return results

In [10]:
run_grid_search(LSTMBaseline3D, grid, "lstm", output_path="lstm.json", f=generate_chen_data, lambda_max=0.829)


== Initial grid search for lstm with 1 combinations ==


Grid Search: 100%|██████████| 1/1 [00:42<00:00, 42.55s/it]


All results saved to `lstm.json`





[{'params': {'input_dim': 3,
   'hidden_size': 37,
   'output_dim': 3,
   'lr': 0.001,
   'epochs': 50,
   'device': 'cuda'},
  'mean_NRMSEs': {'200': 1.1498553747982223,
   '400': 1.1106524375085538,
   '600': 1.1055218707096879,
   '800': 1.1042270550905167,
   '1000': 1.098894237887603},
  'std_NRMSEs': {'200': 0.08139917303893686,
   '400': 0.05698845168499877,
   '600': 0.05411126447914175,
   '800': 0.05169483255488391,
   '1000': 0.050448357124515245}}]

In [11]:
grid = {
    "input_dim": [3],
    "hidden_dim": [32],
    "output_dim": [3],
    "lr": [1e-3],
    "epochs": [50],
    "device": ['cuda'],
}

In [12]:
# run_grid_search(TCNBaseline3D, grid, "tcn", output_path="tcn.json", f=generate_lorenz_data, lambda_max=0.9)