In [1]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.integrate import odeint
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.linear_model import Ridge
from scipy.signal import welch, periodogram
from scipy.spatial.distance import cdist
import json
import itertools
from tqdm import tqdm
import lyapynov
import neurokit2 as nk
from scipy.signal import welch, periodogram
import seaborn as sns

In [2]:
def lorenz_deriv(state, t, sigma=10.0, rho=28.0, beta=8.0/3.0):
    x, y, z = state
    dxdt = sigma * (y - x)
    dydt = x*(rho - z) - y
    dzdt = x*y - beta*z
    return [dxdt, dydt, dzdt]

def generate_lorenz_data(
    initial_state=[1.0, 1.0, 1.0],
    tmax=25.0,
    dt=0.01,
    sigma=10.0,
    rho=28.0,
    beta=8.0/3.0
):
    num_steps = int(tmax / dt) + 1 # +1 to include t=0
    t_vals = np.linspace(0, tmax, num_steps)
    sol = odeint(lorenz_deriv, initial_state, t_vals, args=(sigma, rho, beta))
    return t_vals, sol

In [3]:
def rossler_derivatives(state, t, a=0.2, b=0.2, c=5.7):
    """Compute time derivatives [dx/dt, dy/dt, dz/dt] for the Rössler system."""
    x, y, z = state
    dxdt = -y - z
    dydt = x + a * y
    dzdt = b + z * (x - c)
    return [dxdt, dydt, dzdt]

def generate_rossler_data(
    initial_state=[1.0, 0.0, 0.0],
    tmax=25.0,
    dt=0.01,
    a=0.2,
    b=0.2,
    c=5.7
):
    """
    Numerically integrate Rössler equations x'(t), y'(t), z'(t) using odeint.
    Returns:
       t_vals: array of time points
       sol   : array shape [num_steps, 3] of [x(t), y(t), z(t)]
    """
    num_steps = int(tmax / dt)
    t_vals = np.linspace(0, tmax, num_steps)
    sol = odeint(rossler_derivatives, initial_state, t_vals, args=(a, b, c))
    return t_vals, sol

In [None]:
sns.set_theme()

# Euclidean distance between two vectors
def distance(xe: np.array, xi: np.array) -> float:
    return np.sqrt(np.sum((xi - xe) ** 2))

# Find nearest neighbor for one point
def get_nearest_neighbour(xi: np.array, X: np.ndarray, mu: float, time_steps: int) -> int:
    xes = np.arange(len(X) - time_steps)
    ds = np.array([distance(X[xe], xi) for xe in xes])
    ds = np.where(ds == 0, np.inf, ds)
    index = np.where((X == xi).all(axis=1))[0][0]
    xes_aux = np.abs(xes - index)
    ds = np.where(xes_aux < mu, np.inf, ds)
    return np.argmin(ds)

# Get nearest neighbors for all points
def get_nearest_neighbours(X: np.ndarray, mu: float, time_steps: int) -> list:
    return [get_nearest_neighbour(xi, X, mu, time_steps) for xi in X]

# Mean period estimation using Welch
def mp_welch(ts: np.array) -> float:
    f, Pxx = welch(ts)
    w = Pxx / np.sum(Pxx)
    mean_frequency = np.average(f, weights=w)
    return 1 / mean_frequency

# Mean period estimation using Periodogram
def mp_periodogram(ts: np.array) -> float:
    f, Pxx = periodogram(ts)
    w = Pxx / np.sum(Pxx)
    mean_frequency = np.average(f, weights=w)
    return 1 / mean_frequency

# Log distance divergence for each time step
def expected_log_distance(i: int, X: np.ndarray, j: list) -> float:
    d_ji = np.array([distance(X[j[k] + i], X[k + i]) for k in range(len(X) - i)])
    return np.mean(np.log(d_ji))

#  MAIN callable function
def compute_lyapunov_exponent(
    ts: np.ndarray,
    lag: int = 11,
    emb_dim: int = 9,
    t_0: int = 80,
    t_f: int = 150,
    delta_t: float = 0.02,
    method: str = "welch"
) -> tuple:
    """
    Estimate the largest Lyapunov exponent using Rosenstein's algorithm.

    Parameters
    ----------
    ts : np.ndarray
        1D time-series data.
    lag : int
        Time delay (τ).
    emb_dim : int
        Embedding dimension (m).
    t_0 : int
        Start time index for fitting.
    t_f : int
        End time index for fitting.
    delta_t : float
        Time interval between samples.
    method : str
        'welch' or 'periodogram' for estimating mean period.
    show : bool
        If True, plot the divergence graph.

    Returns
    -------
    lle : float
        Largest Lyapunov Exponent.
    corr_dim : float
        Estimated correlation dimension (placeholder).
    """

    if len(ts.shape) != 1:
        raise ValueError("Time series ts must be 1D (e.g., x(t) from Lorenz system).")

    # Mean period
    if method == "welch":
        mu = mp_welch(ts)
    elif method == "periodogram":
        mu = mp_periodogram(ts)
    else:
        raise ValueError(f"Unsupported method: {method}")

    # Time-delay embedding
    J = lag
    m = emb_dim
    N = len(ts)
    M = N - (m - 1) * J
    X = np.empty((M, m))

    for i in range(M):
        idx = np.arange(i, i + (m - 1) * J + 1, J)
        X[i] = ts[idx]

    j = get_nearest_neighbours(X, mu=mu, time_steps=t_f)

    mean_log_distance = np.array([
        expected_log_distance(i, X, j) for i in range(t_0, t_f)
    ])

    time = np.arange(t_0, t_f) * delta_t

    # Linear fit to <ln divergence>
    A = np.vstack([time, np.ones(len(time))]).T
    slope, intercept = np.linalg.lstsq(A, mean_log_distance, rcond=None)[0]

    return slope

In [42]:
def report_mle(pred, true):
    le_p = [compute_lyapunov_exponent(ts=pred[:, i]) for i in range(pred.shape[1])]
    le_t = [compute_lyapunov_exponent(ts=true[:, i]) for i in range(true.shape[1])]
    mle_t = max(le_t)
    mle_p = max(le_p)
    return mle_p, mle_t, abs(mle_p - mle_t)

In [4]:
lambda_max = {
        "Mackey": 0.006100,
        "Lorenz": 0.905600,
        "Rossler": 0.071400,
        "Chen": 0.829600,
        "Chua": 0.428400
        }
# def compute_lyapunov_exponent(chosen_system, trajectory, ground_truth, dt):
#     """
#     Compute the Lyapunov Exponent for a given trajectory using NeuroKit2.

#     Parameters:
#         chosen_system (str): Name of the dynamical system (for comparison with true LEs).
#         trajectory (np.ndarray): Shape (N,) or (N, 3) trajectory.
#         dt (float): Time step used in the simulation.

#     Returns:
#         lyapunov_exponent (float): Estimated max Lyapunov exponent.
#         diff (float): Absolute error from ground truth value.
#     """
#     trajectory = np.asarray(trajectory)

#     # Ensure it's 2D for consistency
#     if trajectory.ndim == 1:
#         trajectory = trajectory.reshape(-1, 1)

#     # # Compute delay and dimension from the first component
#     # delay, _ = nk.complexity_delay(signal=trajectory[:, 0])
#     # dimension, _ = nk.complexity_dimension(signal=trajectory[:, 0], delay=delay)

#     # Compute LE for each dimension
#     les = []
#     for i in range(trajectory.shape[1]):
#         le, _ = nk.complexity_lyapunov(signal=trajectory[:, i])
#         les.append(le)

#     les1 = []
#     for i in range(ground_truth.shape[1]):
#         le, _ = nk.complexity_lyapunov(signal=ground_truth[:, i])
#         les1.append(le)

#     lyapunov_exponent = max(les)
#     lyapunov_exponent1 = max(les1)
#     diff = np.abs(lyapunov_exponent - lyapunov_exponent1)

#     return lyapunov_exponent1, lyapunov_exponent, diff

In [6]:
def scale_spectral_radius(W, target_radius=0.95):
    """
    Scales a matrix W so that its largest eigenvalue magnitude = target_radius.
    """
    eigvals = np.linalg.eigvals(W)
    radius = np.max(np.abs(eigvals))
    if radius == 0:
        return W
    return (W / radius) * target_radius

def augment_state_with_squares(x):
    """
    Given state vector x in R^N, return [ x, x^2, 1 ] in R^(2N+1).
    We'll use this for both training and prediction.
    """
    x_sq = x**2
    return np.concatenate([x, x_sq, [1.0]])  # shape: 2N+1

class CR3D:
    """
    Cycle (ring) reservoir for 3D->3D single-step,
    teacher forcing for training, autoregressive for testing.
    """
    def __init__(self,
                 reservoir_size=300,
                 spectral_radius=0.95,
                 input_scale=1.0,
                 leaking_rate=1.0,
                 ridge_alpha=1e-6,
                 seed=42):
        self.reservoir_size = reservoir_size
        self.spectral_radius = spectral_radius
        self.input_scale = input_scale
        self.leaking_rate = leaking_rate
        self.ridge_alpha = ridge_alpha
        self.seed = seed

        np.random.seed(self.seed)
        W = np.zeros((reservoir_size, reservoir_size))
        for i in range(reservoir_size):
            j = (i+1) % reservoir_size
            W[i, j] = 1.0
        W = scale_spectral_radius(W, self.spectral_radius)
        self.W = W
        
        np.random.seed(self.seed+1)
        self.W_in = (np.random.rand(reservoir_size,3) - 0.5)*2.0*self.input_scale

        self.W_out = None
        self.x = np.zeros(reservoir_size)

    def reset_state(self):
        self.x = np.zeros(self.reservoir_size)

    def _update(self, u):
        pre_activation = self.W @ self.x + self.W_in @ u
        x_new = np.tanh(pre_activation)
        alpha = self.leaking_rate
        self.x = (1.0 - alpha)*self.x + alpha*x_new

    def collect_states(self, inputs, discard=100):
        self.reset_state()
        states = []
        for val in inputs:
            self._update(val)
            states.append(self.x.copy())
        states = np.array(states)
        return states[discard:], states[:discard]

    def fit_readout(self, train_input, train_target, discard=100):
        states_use, _ = self.collect_states(train_input, discard=discard)
        targets_use = train_target[discard:]
        # X_aug = np.hstack([states_use, np.ones((states_use.shape[0],1))])

        # polynomial readout
        X_list = []
        for s in states_use:
            X_list.append(augment_state_with_squares(s))
        X_aug = np.array(X_list)  # shape => [T-discard, 2N+1]

        reg = Ridge(alpha=self.ridge_alpha, fit_intercept=False)
        reg.fit(X_aug, targets_use)
        self.W_out = reg.coef_

    def predict_autoregressive(self, initial_input, n_steps):
        preds = []
        current_in = np.array(initial_input)
        for _ in range(n_steps):
            self._update(current_in)
            # x_aug = np.concatenate([self.x, [1.0]])
            x_aug = augment_state_with_squares(self.x)
            out = self.W_out @ x_aug
            preds.append(out)
            current_in = out
        return np.array(preds)
    
    def predict_open_loop(self, test_input):
        preds = []
        for true_input in test_input:
            self._update(true_input)
            x_aug = augment_state_with_squares(self.x)
            out = self.W_out @ x_aug
            preds.append(out)
        return np.array(preds)

In [7]:
class HFRRes3D:
    """
    Hierarchical Fractal Reservoir (HFR) for 3D chaotic systems.
    
    This novel reservoir architecture partitions the chaotic attractor at multiple
    hierarchical scales, combining them in a fractal-like adjacency structure.
    The method is model-free, relying solely on the observed trajectory in R^3,
    and does not require knowledge of any system parameters such as sigma, rho, beta
    for Lorenz63. 
    
    Key Idea:
     1) Define multiple 'scales' of partition of the data's bounding region.
     2) Each scale is subdivided into a certain number of cells (regions).
     3) Each cell at level l has links to both:
        - other cells at the same level (horizontal adjacency),
        - 'child' cells at the finer level l+1 (vertical adjacency).
     4) We gather all cells across levels => a multi-level fractal graph => adjacency => W.
     5) We build a typical ESN from this adjacency, feed data with W_in, run leaky tanh updates,
        then do a polynomial readout for 3D next-step prediction.

    This approach is suitable for chaotic systems whose attractors often exhibit fractal
    self-similarity, thus capturing multi-scale structures in a single reservoir.
    """

    def __init__(self,
                 n_levels=3,             # number of hierarchical levels
                 cells_per_level=None,   # list of number of cells at each level, e.g. [8, 32, 128]
                 spectral_radius=0.95,
                 input_scale=1.0,
                 leaking_rate=1.0,
                 ridge_alpha=1e-6,
                 seed=42):
        """
        Parameters
        ----------
        n_levels       : int, number of hierarchical scales
        cells_per_level: list[int], the number of partitions/cells at each level
                         if None, we auto-generate e.g. 2^(level+2)
        spectral_radius: final scaling for adjacency
        input_scale    : random input scale W_in
        leaking_rate   : ESN leaky alpha
        ridge_alpha    : readout ridge penalty
        seed           : random seed
        """
        self.n_levels        = n_levels
        self.cells_per_level = cells_per_level
        self.spectral_radius = spectral_radius
        self.input_scale     = input_scale
        self.leaking_rate    = leaking_rate
        self.ridge_alpha     = ridge_alpha
        self.seed            = seed

        if self.cells_per_level is None:
            # default scheme e.g. 8, 16, 32 for 3 levels
            self.cells_per_level = [8*(2**i) for i in range(n_levels)]

        # We'll store adjacency W, input W_in, readout W_out, reservoir state x
        self.W     = None
        self.W_in  = None
        self.W_out = None
        self.x     = None
        self.n_levels = len(self.cells_per_level)

        # We'll define a total number of nodes = sum(cells_per_level)
        self.n_nodes = sum(self.cells_per_level)

    def _build_partitions(self, data_3d):
        """
        Build hierarchical partitions for each level.
        We'll store the bounding box for data_3d, then for each level l in [0..n_levels-1]
        run e.g. k-means with K = cells_per_level[l], each point gets a label => we track transitions.

        Return: 
          partitions => list of arrays, partitions[l] => shape (N, ) cluster assignment in [0..cells_per_level[l]-1]
        """
        from sklearn.cluster import KMeans
        N = len(data_3d)
        partitions = []

        for level in range(self.n_levels):
            k = self.cells_per_level[level]
            # cluster
            kmeans = KMeans(n_clusters=k, random_state=self.seed+10*level, n_init='auto')
            kmeans.fit(data_3d)
            labels = kmeans.predict(data_3d)
            partitions.append(labels)

        return partitions

    def _build_hierarchical_adjacency(self, data_3d):
        """
        Build a block adjacency with cross-level links, then scale spectral radius.
        Steps:
          1) Build partitions for each level => partitions[l] in [0..cells_per_level[l]-1]
          2) For each level l, build a transition matrix T_l of shape (cells_per_level[l], cells_per_level[l]).
          3) Link scale l to scale l+1 by figuring out which cluster i at scale l maps to which cluster j at scale l+1
             for each sample t => link i-> j if data_3d[t] is in i at scale l and j at scale l+1.
          4) Combine all transitions in one big adjacency W in R^(n_nodes x n_nodes).
          5) row-normalize W => scale largest eigenvalue => spectral_radius
        """
        partitions = self._build_partitions(data_3d)
        N = len(data_3d)

        # offsets for each level => to index big W
        offsets = []
        running = 0
        for level in range(self.n_levels):
            offsets.append(running)
            running += self.cells_per_level[level]

        # total nodes
        n_tot = self.n_nodes
        # initialize adjacency
        A = np.zeros((n_tot, n_tot))

        # 1) horizontal adjacency in each level
        for level in range(self.n_levels):
            k = self.cells_per_level[level]
            labels = partitions[level]
            # T_l => shape (k, k)
            T_l = np.zeros((k, k))
            for t in range(N-1):
                i = labels[t]
                j = labels[t+1]
                T_l[i,j]+=1
            # row normalize
            row_sum = T_l.sum(axis=1, keepdims=True)
            row_sum[row_sum==0.0] = 1.0
            T_l /= row_sum
            # place T_l into big A
            off = offsets[level]
            A[off:off+k, off:off+k] = T_l

        # 2) vertical adjacency between scale l and l+1
        for level in range(self.n_levels-1):
            k_l   = self.cells_per_level[level]
            k_lp1 = self.cells_per_level[level+1]
            labels_l   = partitions[level]
            labels_lp1 = partitions[level+1]
            # we define adjacency from i in [0..k_l-1] to j in [0..k_lp1-1] if the same sample t belongs to i at level l and j at l+1
            # Count how many times
            Xvert1 = np.zeros((k_l, k_lp1))
            for t in range(N):
                i = labels_l[t]
                j = labels_lp1[t]
                Xvert1[i,j]+=1
            # row normalize
            row_sum = Xvert1.sum(axis=1, keepdims=True)
            row_sum[row_sum==0.0] = 1.0
            Xvert = Xvert1/row_sum
            # place in big A
            off_l   = offsets[level]
            off_lp1 = offsets[level+1]
            A[off_l:off_l+k_l, off_lp1:off_lp1+k_lp1] = Xvert
            # tentative idea, we could also define adjacency from l+1 -> l (parent link), if desired
            # we do the same for the 'child -> parent' link or skip it if we only want forward adjacency
            # For now, let's do symmetrical
            Yvert = Xvert1.T
            col_sum = Yvert.sum(axis=1, keepdims=True)
            col_sum[col_sum==0.0] = 1.0
            Yvert /= col_sum
            A[off_lp1:off_lp1+k_lp1, off_l:off_l+k_l] = Yvert

        # now we have a big adjacency => row normalize again, then scale spectral radius
        row_sum = A.sum(axis=1, keepdims=True)
        row_sum[row_sum==0.0] = 1.0
        A /= row_sum

        A = scale_spectral_radius(A, self.spectral_radius)
        return A

    def fit_readout(self, train_input, train_target, discard=100):
        """
        Main training routine:
          1) Build hierarchical adjacency from fractal partition => self.W
          2) define W_in => shape(n_nodes, 3)
          3) teacher forcing => polynomial readout => solve => self.W_out
        """
        np.random.seed(self.seed)
        # Build adjacency
        W_big = self._build_hierarchical_adjacency(train_input)
        self.W = W_big

        # define W_in => shape(n_nodes,3)
        self.n_nodes = W_big.shape[0]
        self.W_in = (np.random.rand(self.n_nodes,3)-0.5)*2.0*self.input_scale

        # define reservoir state
        self.x = np.zeros(self.n_nodes)

        # gather states => teacher forcing => polynomial => readout
        states_use, _ = self.collect_states(train_input, discard=discard)
        target_use = train_target[discard:]
        X_list= []
        for s in states_use:
            X_list.append( augment_state_with_squares(s) )
        X_aug= np.array(X_list)

        reg= Ridge(alpha=self.ridge_alpha, fit_intercept=False)
        reg.fit(X_aug, target_use)
        self.W_out= reg.coef_

    def collect_states(self, inputs, discard=100):
        """
        Teacher forcing => feed real 3D => gather states => shape => [T-discard, n_nodes].
        returns (states_after_discard, states_discarded).
        """
        self.reset_state()
        states= []
        for val in inputs:
            self._update(val)
            states.append(self.x.copy())
        states= np.array(states)
        return states[discard:], states[:discard]

    def reset_state(self):
        if self.x is not None:
            self.x.fill(0.0)

    def _update(self, u):
        """
        x(t+1)= (1-alpha)x(t)+ alpha tanh( W*x(t)+ W_in*u(t) ).
        """
        alpha= self.leaking_rate
        pre_acts= self.W@self.x + self.W_in@u
        x_new= np.tanh(pre_acts)
        self.x= (1.0- alpha)*self.x+ alpha*x_new

    def predict_autoregressive(self, initial_input, n_steps):
        """
        fully autonomous => feed last predicted => next input
        """
        preds= []
        #self.reset_state()
        current_in= np.array(initial_input)
        for _ in range(n_steps):
            self._update(current_in)
            big_x= augment_state_with_squares(self.x)
            out= self.W_out@big_x
            preds.append(out)
            current_in = out
        return np.array(preds)
    
    def predict_open_loop(self, test_input):
        preds = []
        for true_input in test_input:
            self._update(true_input)
            x_aug = augment_state_with_squares(self.x)
            out = self.W_out @ x_aug
            preds.append(out)
        return np.array(preds)

In [8]:
grid = {
    "cells_per_level": [[5, 10, 15, 20, 25, 35, 45, 50, 95]],
    "spectral_radius": [0.92],
    "input_scale": [1],
    "leaking_rate": [0.9],
    "ridge_alpha": [1e-8]
}

In [9]:
def run_grid_search(model_class, param_grid, model_name,
                    output_path="grid_search_results.json"):
    combos = list(itertools.product(*param_grid.values()))
    param_keys = list(param_grid.keys())
    print(f"\n== Initial grid search for {model_name} with {len(combos)} combinations ==")

    results = []

    for comb in tqdm(combos, desc="Grid Search"):
        params = dict(zip(param_keys, comb))
        ldev_scores = []
        l_max = []
        diff_list = []
        ldev_scores_open_loop = []
        diff_list_open_loop = []
        l_max_open_loop = []

        for initial_state in [[1.0, 1.0, 1.0], [1.0, 2.0, 3.0], [2.0, 1.5, 4.0]]:
            tmax = 250
            dt = 0.02
            t_vals, lorenz_traj = generate_lorenz_data(
                initial_state=initial_state,
                tmax=tmax,
                dt=dt
            )

            washout = 2000
            t_vals = t_vals[washout:]
            lorenz_traj = lorenz_traj[washout:]

            scaler = MinMaxScaler()
            # scaler = StandardScaler()
            scaler.fit(lorenz_traj)
            lorenz_traj = scaler.transform(lorenz_traj)

            T_data = len(lorenz_traj)
            for train_frac in [0.7, 0.75, 0.8]:
                train_end = int(train_frac * (T_data - 1))
                train_input = lorenz_traj[:train_end]
                train_target = lorenz_traj[1:train_end + 1]
                test_input = lorenz_traj[train_end:-1]
                test_target = lorenz_traj[train_end + 1:]
                n_test_steps = len(test_input)
                initial_in = test_input[0]

                for seed in np.arange(1, 6):
                    model = model_class(**params, seed=seed)
                    model.fit_readout(train_input, train_target, discard=100)
                    preds = model.predict_autoregressive(initial_in, n_test_steps)
                    preds_open_loop = model.predict_open_loop(test_input)
                    l_max_true, l_max_pred, diff = compute_lyapunov_deviation(test_target, preds)
                    l_max_true_open_loop, l_max_pred_open_loop, diff_open_loop = compute_lyapunov_deviation(test_target, preds_open_loop)
                    ldev_scores.append(l_max_pred)
                    l_max.append(l_max_true)
                    ldev_scores_open_loop.append(l_max_pred_open_loop)
                    diff_list.append(diff)
                    l_max_open_loop.append(l_max_true_open_loop)
                    diff_list_open_loop.append(diff_open_loop)

        mean_ldev = float(np.mean(ldev_scores))
        std_ldev = float(np.std(ldev_scores))
        mean_diff = float(np.mean(diff_list))
        std_diff = float(np.std(diff_list))
        mean_ldev_open_loop = float(np.mean(ldev_scores_open_loop))
        std_ldev_open_loop = float(np.std(ldev_scores_open_loop))
        mean_diff_open_loop = float(np.mean(diff_list_open_loop))
        std_diff_open_loop = float(np.std(diff_list_open_loop))

        results.append({
            "params": params,
            "seed_scores_L_max_calculated": ldev_scores,
            "lambda_max_system": l_max,
            "diff_from_L_max_true": diff_list,
            "mean_diff": mean_diff,
            "std_diff": std_diff,
            "mean_L_calc": mean_ldev,
            "std_L_calc": std_ldev,
            "seed_scores_L_max_calculated_open_loop": ldev_scores_open_loop,
            "mean_L_calc_open_loop": mean_ldev_open_loop,
            "std_L_calc_open_loop": std_ldev_open_loop,
            "seed_scores_diff_from_lambda_max_open_loop": diff_list_open_loop,
            "mean_diff_open_loop": mean_diff_open_loop,
            "std_diff_open_loop": std_diff_open_loop,
        })

    with open(output_path, "w") as f:
        json.dump(results, f, indent=2)
    print(f"\nAll results saved to `{output_path}`")

    return results

In [10]:
def run_grid_search(model_class, param_grid, model_name,
                    output_path="grid_search_results.json"):
    combos = list(itertools.product(*param_grid.values()))
    param_keys = list(param_grid.keys())
    print(f"\n== Initial grid search for {model_name} with {len(combos)} combinations ==")

    results = []

    for comb in tqdm(combos, desc="Grid Search"):
        params = dict(zip(param_keys, comb))
        ldev_scores = []
        l_max = []
        diff_list = []

        for initial_state in [[1.0, 1.0, 1.0], [1.0, 2.0, 3.0], [2.0, 1.5, 4.0]]:
            tmax = 250
            dt = 0.02
            t_vals, lorenz_traj = generate_lorenz_data(
                initial_state=initial_state,
                tmax=tmax,
                dt=dt
            )

            washout = 2000
            t_vals = t_vals[washout:]
            lorenz_traj = lorenz_traj[washout:]

            scaler = MinMaxScaler()
            scaler.fit(lorenz_traj)
            lorenz_traj = scaler.transform(lorenz_traj)

            T_data = len(lorenz_traj)
            for train_frac in [0.7, 0.75, 0.8]:
                train_end = int(train_frac * (T_data - 1))
                train_input = lorenz_traj[:train_end]
                train_target = lorenz_traj[1:train_end + 1]
                test_input = lorenz_traj[train_end:-1]
                test_target = lorenz_traj[train_end + 1:]
                n_test_steps = len(test_input)
                initial_in = test_input[0]

                for seed in np.arange(1, 6):
                    model = model_class(**params, seed=seed)
                    model.fit_readout(train_input, train_target, discard=100)
                    preds = model.predict_autoregressive(initial_in, n_test_steps)
                    l_max_true, l_max_pred, diff = compute_lyapunov_deviation(test_target, preds)
                    ldev_scores.append(l_max_pred)
                    l_max.append(l_max_true)
                    diff_list.append(diff)

        mean_ldev = float(np.mean(ldev_scores))
        std_ldev = float(np.std(ldev_scores))
        mean_diff = float(np.mean(diff_list))
        std_diff = float(np.std(diff_list))

        results.append({
            "params": params,
            "seed_scores_L_max_calculated": ldev_scores,
            "lambda_max_system": l_max,
            "diff_from_L_max_true": diff_list,
            "mean_diff": mean_diff,
            "std_diff": std_diff,
            "mean_L_calc": mean_ldev,
            "std_L_calc": std_ldev,
        })

    with open(output_path, "w") as f:
        json.dump(results, f, indent=2)
    print(f"\nAll results saved to `{output_path}`")

    return results

In [43]:
def run_single_case(model_class, param_grid, model_name,
                    output_path="grid_search_results.json"):
    """
    Runs one train/test split on the Lorenz dataset, trains the given model_class
    with the first combination of param_grid, and computes the max-Lyapunov exponents
    for the true vs. predicted time series along the second coordinate (index 1).
    The result dictionary is written to `output_path` and also returned.
    """
    # 1) Take the first parameter combination
    comb = list(itertools.product(*param_grid.values()))[0]
    params = dict(zip(param_grid.keys(), comb))
    print(f"\n== Running single test case for {model_name} with parameters: {params} ==")

    # 2) Fixed initial condition, train/test split, and seed
    initial_state = [1.0, 1.0, 1.0]
    train_frac = 0.2
    seed = 1

    # 3) Generate Lorenz data (you already have this function elsewhere)
    tmax = 250
    dt = 0.02
    t_vals, lorenz_traj = generate_lorenz_data(
        initial_state=initial_state,
        tmax=tmax,
        dt=dt
    )

    # 4) Drop washout, then scale to [0, 1]
    washout = 2000
    t_vals = t_vals[washout:]
    lorenz_traj = lorenz_traj[washout:]

    scaler = MinMaxScaler()
    scaler.fit(lorenz_traj)
    lorenz_traj = scaler.transform(lorenz_traj)

    # 5) Split into train vs. test
    T_data = len(lorenz_traj)
    train_end = int(train_frac * (T_data - 1))
    train_input = lorenz_traj[:train_end]
    train_target = lorenz_traj[1 : train_end + 1]
    test_input = lorenz_traj[train_end : -1]
    test_target = lorenz_traj[train_end + 1 : ]
    n_test_steps = len(test_input)
    initial_in = test_input[0]

    # 6) Instantiate, train, and predict with the model
    model = model_class(**params, seed=seed)
    model.fit_readout(train_input, train_target, discard=100)
    preds = model.predict_autoregressive(initial_in, n_test_steps)

    # 7) Compute Lyapunov exponents along the second coordinate (index 1):
    #    We pass dt=0.02 to match how the dataset was generated.
    l_max_true, l_max_pred, ldev = report_mle(pred=preds, true=test_target)

    # 8) Save results
    result = {
        "params": params,
        "lambda_max_system":    l_max_true,
        "lambda_max_prediction": l_max_pred,
        "lyapunov_deviation":   ldev
    }

    with open(output_path, "w") as f:
        json.dump([result], f, indent=2)

    print(f"\nResult saved to `{output_path}`")
    return result

In [None]:
run_single_case(HFRRes3D, grid, "HFR", output_path="hfr_single_case.json")


== Running single test case for HFR with parameters: {'cells_per_level': [5, 10, 15, 20, 25, 35, 45, 50, 95], 'spectral_radius': 0.92, 'input_scale': 1, 'leaking_rate': 0.9, 'ridge_alpha': 1e-08} ==
