In [None]:
!pip install torchdiffeq systems

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
import sys
import os
from torchdiffeq import odeint
from scipy.integrate import solve_ivp
from sklearn.preprocessing import StandardScaler

project_root = os.path.abspath(os.path.join(os.getcwd(), '..', 'src'))
if project_root not in sys.path:
    sys.path.insert(0, project_root)

from systems.lotka_volterra import simulate as simulate_lv
from systems.lorenz import simulate as simulate_lorenz
from utils.compute_rsme import compute_rmse_over_time

In [3]:
# Neural ODE Model for Lotka-Volterra System
class LVNeuralODE(nn.Module):
    def __init__(self, hidden_dim=64):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(2, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, 2)
        )

    def forward(self, t, z):
        return self.net(z)

# Training loop
def train(model, t_eval, z_data, n_epochs=200, lr=0.01):
    scaler = StandardScaler()
    z_data_np = z_data.numpy()
    z_data_scaled = scaler.fit_transform(z_data_np)
    z_data_scaled = torch.tensor(z_data_scaled, dtype=torch.float32)

    optimizer = optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.MSELoss()

    for epoch in range(n_epochs):
        optimizer.zero_grad()
        z0 = z_data_scaled[0]
        z_pred = odeint(model, z0, t_eval, method='dopri5')
        loss = loss_fn(z_pred, z_data_scaled)
        loss.backward()
        optimizer.step()
        if epoch % (n_epochs // 10) == 0:
            print(f"Epoch {epoch}, Loss = {loss.item():.6f}")
    return model, scaler

In [6]:
# Neural ODE model for 3D Lorenz system
class LZNeuralODE(nn.Module):
    def __init__(self, hidden_dim=128):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(3, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, 3)
        )

    def forward(self, t, z):
        return self.net(z)

# Training loop with normalization
def train(model, t_eval, z_data, n_epochs=2000, lr=1e-3):
    scaler = StandardScaler()
    z_data_np = z_data.numpy()
    z_data_scaled = scaler.fit_transform(z_data_np)
    z_data_scaled = torch.tensor(z_data_scaled, dtype=torch.float32)

    optimizer = optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.MSELoss()

    for epoch in range(n_epochs):
        optimizer.zero_grad()
        z0 = z_data_scaled[0]
        z_pred = odeint(model, z0, t_eval, method='dopri5')
        loss = loss_fn(z_pred, z_data_scaled)
        loss.backward()
        optimizer.step()
        if epoch % (n_epochs // 10) == 0:
            print(f"Epoch {epoch}, Loss = {loss.item():.6f}")
    return model, scaler

In [4]:
def load_single_trajectory(npz_path, trajectory_index=0):
    """
    Loads a single trajectory and reshapes for PyDMD.
    Returns:
        - trajectory (n_features, n_time_steps)
        - initial condition (z0)
    """
    data = np.load(npz_path, allow_pickle=True)["trajectories"]
    traj_dict = data[trajectory_index]
    traj = traj_dict["traj"]
    z0 = traj_dict["z0"]
    t = traj_dict["t"]
    return traj, t, z0

In [5]:
def sample_arrows_by_distance(traj, min_dist):
    """Return indices where the distance from the last arrow exceeds min_dist."""
    arrow_indices = [0]
    last = traj[0]
    for i in range(1, len(traj)):
        dist = np.linalg.norm(traj[i] - last)
        if dist >= min_dist:
            arrow_indices.append(i)
            last = traj[i]
    return arrow_indices

## Use Neural ODE on Lotka-Volterra

In [None]:
# Load entire dataset
all_data = np.load("data/lotka_volterra.npz", allow_pickle=True)["trajectories"]
M = len(all_data)
NUM_EPOCHS = 20
LEARNING_RATE = 0.01

true_trajectories = []
pred_trajectories = []

for i in range(M):
    print(f"\n=== Training on trajectory {i+1}/{M} ===")
    traj_data = all_data[i]
    
    z_data_np = traj_data["traj"]
    z0_np = traj_data["z0"]
    t_eval_np = traj_data["t"]

    z_data = torch.tensor(z_data_np, dtype=torch.float32)
    t_eval = torch.tensor(t_eval_np, dtype=torch.float32)

    # Define and train model
    model_neural = LVNeuralODE()
    trained_neural, scaler = train(model_neural, t_eval, z_data, n_epochs=NUM_EPOCHS, lr=LEARNING_RATE)

    # Predict
    with torch.no_grad():
        z0_scaled = torch.tensor(scaler.transform(z_data[[0]].numpy()), dtype=torch.float32)[0]
        pred_neural_scaled = odeint(trained_neural, z0_scaled, t_eval, method='dopri5')
        pred_neural = torch.tensor(scaler.inverse_transform(pred_neural_scaled.numpy()), dtype=torch.float32)

    true_trajectories.append(z_data.numpy())
    pred_trajectories.append(pred_neural.numpy())

    # Plot comparison for this trajectory
    fig, axs = plt.subplots(2, 1, figsize=(10, 5), constrained_layout=True)

    axs[0].plot(t_eval.numpy(), z_data[:, 0], 'k--', label='True x')
    axs[0].plot(t_eval.numpy(), pred_neural[:, 0], 'b', label='Neural x')
    axs[0].set_ylabel("x(t)")
    axs[0].set_title(f"Trajectory {i+1} - Prey Dynamics")
    axs[0].legend()

    axs[1].plot(t_eval.numpy(), z_data[:, 1], 'k--', label='True y')
    axs[1].plot(t_eval.numpy(), pred_neural[:, 1], 'b', label='Neural y')
    axs[1].set_xlabel("Time")
    axs[1].set_ylabel("y(t)")
    axs[1].set_title(f"Trajectory {i+1} - Predator Dynamics")
    axs[1].legend()

    plt.tight_layout()
    plt.show()

    # --- Compute and plot per-trajectory RMSE ---
    rmse_i = np.sqrt(np.sum((pred_neural.numpy() - z_data.numpy())**2, axis=1))  # shape: (T,)

    plt.figure(figsize=(8, 4))
    plt.plot(t_eval.numpy(), rmse_i, color='red', label='RMSE')
    plt.title(f"Trajectory {i+1} - RMSE Over Time")
    plt.xlabel("Time")
    plt.ylabel("RMSE")
    plt.grid(True)
    plt.tight_layout()
    plt.legend()
    plt.show()

rmse_t = compute_rmse_over_time(true_trajectories, pred_trajectories)

# ✅ Plot
plt.figure(figsize=(8, 4))
plt.plot(t_eval_np, rmse_t, label="RMSE over Time (M=10)")
plt.xlabel("Time")
plt.ylabel("RMSE[t]")
plt.title("Neural ODE RMSE Across 10 Trajectories")
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()

## Use Neural ODE on Lorenz System

In [None]:
data = np.load("data/lorenz.npz", allow_pickle=True)["trajectories"]
M = len(data)
NUM_EPOCHS = 20
LEARNING_RATE = 1e-3

true_trajectories = []
pred_trajectories = []

for i in range(M):
    print(f"\n=== Lorenz Trajectory {i+1}/{M} ===")
    traj_data = data[i]
    z_data_np = traj_data["traj"]
    t_eval_np = traj_data["t"]

    z_data = torch.tensor(z_data_np, dtype=torch.float32)
    t_eval = torch.tensor(t_eval_np, dtype=torch.float32)

    # Train model
    model_neural = LZNeuralODE()
    trained_neural, scaler = train(model_neural, t_eval, z_data, n_epochs=NUM_EPOCHS, lr=LEARNING_RATE)

    # Predict
    with torch.no_grad():
        z0_scaled = torch.tensor(scaler.transform(z_data[[0]].numpy()), dtype=torch.float32)[0]
        pred_scaled = odeint(trained_neural, z0_scaled, t_eval, method='dopri5')
        pred = torch.tensor(scaler.inverse_transform(pred_scaled.numpy()), dtype=torch.float32)

    # Store for RMSE aggregate
    true_trajectories.append(z_data.numpy())
    pred_trajectories.append(pred.numpy())

    # Plot x/y/z dynamics
    fig, axs = plt.subplots(3, 1, figsize=(10, 8), constrained_layout=True)
    axs[0].plot(t_eval_np, z_data_np[:, 0], 'k--', label='True x')
    axs[0].plot(t_eval_np, pred[:, 0], 'b', label='Neural x')
    axs[0].set_ylabel("x(t)")
    axs[0].legend()
    axs[0].set_title(f"Trajectory {i+1} - x(t)")

    axs[1].plot(t_eval_np, z_data_np[:, 1], 'k--', label='True y')
    axs[1].plot(t_eval_np, pred[:, 1], 'b', label='Neural y')
    axs[1].set_ylabel("y(t)")
    axs[1].legend()
    axs[1].set_title(f"Trajectory {i+1} - y(t)")

    axs[2].plot(t_eval_np, z_data_np[:, 2], 'k--', label='True z')
    axs[2].plot(t_eval_np, pred[:, 2], 'b', label='Neural z')
    axs[2].set_ylabel("z(t)")
    axs[2].legend()
    axs[2].set_title(f"Trajectory {i+1} - z(t)")
    axs[2].set_xlabel("Time")

    plt.show()

    # Plot RMSE for this trajectory
    rmse_i = np.sqrt(np.sum((pred.numpy() - z_data_np) ** 2, axis=1))
    plt.figure(figsize=(8, 4))
    plt.plot(t_eval_np, rmse_i, color='purple')
    plt.title(f"RMSE Over Time – Trajectory {i+1}")
    plt.xlabel("Time")
    plt.ylabel("RMSE")
    plt.grid(True)
    plt.tight_layout()
    plt.show()

    # --- 3D Quiver Plot with Directional Arrows ---
    fig = plt.figure(figsize=(10, 7))
    ax = fig.add_subplot(111, projection='3d')

    min_arrow_spacing = 2.0
    pred_np = pred.numpy()

    arrow_indices_true = sample_arrows_by_distance(z_data_np, min_arrow_spacing)
    arrow_indices_pred = sample_arrows_by_distance(pred_np, min_arrow_spacing)

    for j in arrow_indices_true[:-1]:
        p0 = z_data_np[j]
        p1 = z_data_np[j + 1]
        direction = p1 - p0
        ax.quiver(*p0, *direction, color='r', length=1.0, normalize=True, linewidth=2.5)

    for j in arrow_indices_pred[:-1]:
        p0 = pred_np[j]
        p1 = pred_np[j + 1]
        direction = p1 - p0
        ax.quiver(*p0, *direction, color='orange', length=1.0, normalize=True, linewidth=1.5)

    ax.plot(z_data_np[:, 0], z_data_np[:, 1], z_data_np[:, 2], 'k--', label='True trajectory')
    ax.plot(pred_np[:, 0], pred_np[:, 1], pred_np[:, 2], 'b', label='Neural ODE prediction')

    ax.set_xlabel("x(t)")
    ax.set_ylabel("y(t)")
    ax.set_zlabel("z(t)")
    ax.set_title(f"Trajectory {i+1}: Lorenz 3D Trajectories with Arrows")
    ax.legend()

    plt.tight_layout()
    plt.show()

# === Overall RMSE Across All M ===
rmse_over_time = compute_rmse_over_time(true_trajectories, pred_trajectories)

plt.figure(figsize=(8, 4))
plt.plot(t_eval_np, rmse_over_time, label="Mean RMSE (M=10)", color='crimson')
plt.xlabel("Time")
plt.ylabel("RMSE")
plt.title("RMSE Over Time Across All Lorenz Trajectories")
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()