In [1]:
import numpy as np
import pyomo.environ as pyo
from pyomo.environ import ConcreteModel, Var, ConstraintList, Objective, SolverFactory, value, RangeSet

import torch

import jax
import jax.numpy as jnp
from jax import random, jit
from jax.random import PRNGKey

# -------------- helper libraries -------------- #
import sys
import os
import time
import pickle
import matplotlib.pyplot as plt
import pandas as pd
import importlib

collocation2_path = os.path.abspath(os.path.join('..', '00_utils'))

# Add the directory to sys.path
if collocation2_path not in sys.path:
    sys.path.append(collocation2_path)

from collocation import compute_weights, lagrange_derivative
from interpolation import BarycentricInterpolation
from data_generation import generate_ode_data
from non_parametric_collocation import collocate_data

In [2]:
#---------------------------------------------DATA PARAMS---------------------------------------------#
N = 200
noise_level = 0.2
ode_type, params = "harmonic_oscillator", {"omega_squared": 2}
start_time, end_time = 0, 10
spacing_type = "equally_spaced" # "equally_spaced" or "chebyshev"
initial_state = np.array([0.0, 1.0])

#--------------------------------------------GENERATE DATA--------------------------------------------#
t_ho, y_ho, y_noisy_ho, true_derivatives_ho = generate_ode_data(N, noise_level, ode_type, params, start_time, end_time, spacing_type, initial_state)

# numpy array is required for pyomo
y_noisy_ho = np.array(jnp.squeeze(y_noisy_ho))
t_ho = np.array(jnp.squeeze(t_ho))

#---------------------------------------------------TEST DATA--------------------------------------------#
t_test_ho, y_test_ho, _, _ = generate_ode_data(N*2, noise_level, ode_type, params, start_time, end_time*2, "uniform", initial_state)

#--------------------------------------------NON-PARAMETRIC COLLOCATION--------------------------------------------#
estimated_derivative_ho, estimated_solution_ho = collocate_data(y_noisy_ho, t_ho, 'EpanechnikovKernel', bandwidth=0.5)



In [3]:
import numpy as np
import torch
import torch.nn as nn
from torchdiffeq import odeint
from torch.optim import Adam
from torch.utils.data import DataLoader, TensorDataset

# Define a PyTorch module for the Neural ODE
class NeuralODE_Py(nn.Module):
    def __init__(self, layer_widths):
        super(NeuralODE_Py, self).__init__()
        layers = []
        for i in range(len(layer_widths) - 1):
            layers.append(nn.Linear(layer_widths[i], layer_widths[i + 1]))
            if i < len(layer_widths) - 2:
                layers.append(nn.Tanh())
        self.network = nn.Sequential(*layers)

    def forward(self, t, y):
        return self.network(y)

def create_train_state_py(layer_widths, learning_rate):
    model = NeuralODE_Py(layer_widths=layer_widths)
    optimizer = Adam(model.parameters(), lr=learning_rate)
    return model, optimizer

def ode_system(model, y0, t):
    # t needs to be a tensor and y0 should be of appropriate shape
    t = torch.tensor(t, dtype=torch.float32)
    y0 = torch.tensor(y0, dtype=torch.float32)
    solution = odeint(model, y0, t)
    return solution

def train_py(model, optimizer, t, observed_data, y0, num_epochs=1000):
    criterion = torch.nn.MSELoss()  # Using MSE for simplicity
    t = torch.tensor(t, dtype=torch.float32)
    observed_data = observed_data.clone().detach()
    y0 = torch.tensor(y0, dtype=torch.float32)

    for epoch in range(num_epochs):
        optimizer.zero_grad()
        pred_solution = odeint(model, y0, t)
        loss = criterion(pred_solution, observed_data)
        loss.backward()
        optimizer.step()
        
        if epoch % 100 == 0:
            print(f'Epoch {epoch}, Loss: {loss.item()}')
    return model

def neural_ode_py(model, y0, t):
    """
    Use the trained neural ODE model to solve the ODE from initial condition y0 over time points t.

    Args:
        model (torch.nn.Module): The trained Neural ODE model.
        y0 (torch.Tensor or np.ndarray): Initial condition of the ODE.
        t (np.ndarray): Array of time points at which to solve the ODE.

    Returns:
        torch.Tensor: The solution of the ODE at the specified time points.
    """
    if not isinstance(y0, torch.Tensor):
        y0 = torch.tensor(y0, dtype=torch.float32)
    if not isinstance(t, torch.Tensor):
        t = torch.tensor(t, dtype=torch.float32)
    
    with torch.no_grad():  # Use no_grad() to prevent tracking history during inference
        solution = odeint(model, y0, t)
    return solution


In [4]:
n_points = 200
ode_type = "harmonic_oscillator"
params = {"omega_squared": 2}
start_time=0
end_time=10
spacing_type="equally_spaced"
initial_state = np.array([0.0, 1.0])

y0 = np.array([0.0, 1.0])
layer_widths = [2, 50, 50, 2]
learning_rate = 1e-3

y_pred_results = {}

# 0.0, 0.1, 0.2, 0.5
for i, noise_level in enumerate([0.2]):
    # Generate data for the according noise level
    t, y, y_noisy, true_derivatives = generate_ode_data(n_points, noise_level, ode_type, params, start_time, end_time, spacing_type, initial_state)
    
    t = np.array(t)
    y_noisy = np.array(y_noisy)
    y_noisy = torch.tensor(y_noisy, dtype=torch.float32)

    model, optimizer = create_train_state_py(layer_widths, learning_rate)
    
    start_timer = time.time()
    trained_model = train_py(model, optimizer, t, y_noisy, y0, num_epochs = 1000)
    end_timer = time.time()
    timer = end_timer - start_timer
    print(f"Time elapsed: {timer}")

    # Generate predictions
    y_pred = neural_ode_py(trained_model, y0, t)
    y_pred_results[str(i)] = {"predictions": y_pred, "time_elapsed": timer, "noise_level": noise_level, "y_noisy": y_noisy.detach().numpy()}

: 