## Reconstruction errors

This notebook generates the data in the error plots for the reconstructed coefficient matrices $C$, using both the integral and differential forms, and solving the least squares problem without (LS) and with (STLS) regularisation.

The corresponding figures in the paper are 

* Figure 4 (M1 without noise): set `model = 'M1'` and `noise_bool = 'off'`
* Figure 7 (M1 with noise): set `model = 'M1'` and `noise_bool = 'on'`
* Figure 9 (M20 without noise): set `model = 'M20'` and `noise_bool = 'off'`
* Figure 14 (M20 with noise): set `model = 'M20'` and `noise_bool = 'on'`
* Figure 15 (Van de Vusse without noise): set `model = 'VdV'` and `noise_bool = 'off'`
* Figure 19 (Van de Vusse with noise): set `model = 'VdV'` and `noise_bool = 'on'`

Each run of the notebook will save data in a dedicated subfolder. After this, the plots can be generated with `Reconstruction_errors_plotter.ipynb`.

# Import modules

In [1]:
import mechanisms as mech
import graphsindy as gs

import os
from multiprocessing import Pool, cpu_count
from tqdm import tqdm
import time
import numpy as np
import scipy.integrate as integrate
import matplotlib.pyplot as plt
from numpy.linalg import svd, cond
import math

# General parameters

In [2]:
# Number of trials
NUM_TRIALS = 100

# Simulation parameters
rtol = 1e-10
tmax = 20
lastpoint = 1050
step = 50
NPTS = np.arange(50, lastpoint, step)

# Spline interpolation parameters
spline_deg = 3
spline_type = 'not-a-knot'

# Choose model
#model = 'M1'
model = 'M20'
#model = 'VdV'

# CRN parameters
min_k, max_k = 5e-2, 1.0

if model == 'M1':
    w = 6
if model == 'M20':
    w = 8
if model == 'VdV':
    w = 4
    
# Noise parameters
# on
noise_bool       = "on"
noise            = '1e-4'
instrument_noise = float(noise)

# off
#noise_bool = "off"
#instrument_noise = 0.0

In [3]:
# Create/open directory for saving trials data
if noise_bool == "off":
    # Parent directory — shared by all runs of same model
    parent_folder = f"{model}_errbnd_noise-{noise_bool}"
    os.makedirs(parent_folder, exist_ok=True)

    # Individual run directory
    run_folder = f"tmax-{tmax}_nptsmax-{lastpoint}_step-{step}"
    SAVE_DIR = os.path.join(parent_folder, run_folder)
    os.makedirs(SAVE_DIR, exist_ok=True)

if noise_bool == "on":
    # Parent directory — shared by all runs of same model
    parent_folder = f"{model}_errbnd_noise-{noise_bool}"
    os.makedirs(parent_folder, exist_ok=True)
    
    # Individual run directory
    run_folder = f"tmax-{tmax}_nptsmax-{lastpoint}_step-{step}_noise-{noise}"
    SAVE_DIR = os.path.join(parent_folder, run_folder)
    os.makedirs(SAVE_DIR, exist_ok=True)

In [4]:
# Trial function
def run_trial(dummy_arg):
    # Generate a random seed for this trial
    rng = np.random.default_rng()
    seed = rng.integers(1, 10000)
    rng = np.random.default_rng(seed)
    
    # Create filename based on seed
    trial_file = os.path.join(SAVE_DIR, f"trial_{seed}.npz")
    
    # Skip trial if seed already done
    if os.path.exists(trial_file):
        print(f"[Seed {seed}] Skipping — already exists.")
        return None

    # Generate random kinetic constants for this trial
    # and construct model instance with these constants
    if model == "M1":
        constants = rng.uniform(min_k, max_k, size = 4)
        Model = mech.M1(constants)

    if model == "M20":
        constants = rng.uniform(min_k, max_k, size = 6)
        Model = mech.M20(constants)

    if model == "VdV":
        constants = rng.uniform(min_k, max_k, size = 3)
        Model = mech.VdV(constants)

    # Construct the exact coefficient matrix of the network
    # and recover the number of species and quadratic combinations 
    C_ex = Model.ExactCoeffMatrix()
    M = Model.M 
    N = Model.N

    # Generate random (w,M) matrix of initial values for this trial
    initial_values = rng.random((w, M))

    # Arrays for saving results
    ERR_C_int_ls, ERR_C_int_stls = [], []
    ERR_C_dif_ls, ERR_C_dif_stls = [], []

    # Iterate varying number of time-steps
    for npts in NPTS:
        t_eval = np.linspace(0, tmax, npts)
        # Simulation
        X = np.zeros((M, 0))
        for i in range(w):
            sol = integrate.solve_ivp(fun=lambda t, U: gs.matrix_ODE(t, U, C_ex),
                                      t_span=t_eval[[0, -1]],
                                      y0=initial_values[i],
                                      t_eval=t_eval,
                                      rtol=rtol,
                                      atol=rtol)

            # Add instrument's Gaussian noise to the simulated data
            for j in range(M):
                xi       = rng.normal(0, instrument_noise, size = sol.y[j].shape[0])
                sol.y[j] += xi

            X = np.hstack((X, sol.y))

        K, L  = gs.build_int_dif_matrix(t_eval, w, spline_deg, spline_type)

        D, _  = gs.build_dictionary(Model.labels, X)
        IVP   = gs.build_ivp_matrix(npts, X)
        D_int = D @ K
        X_0   = X - IVP
        X_dif = X @ L
        
        threshold  = np.min(constants) / 2
        
        C_int_stls = gs.itls(D_int, X_0, threshold=threshold)
        C_dif_stls = gs.itls(D, X_dif, threshold=threshold)
        
        C_int_ls = X_0 @ np.linalg.pinv(D_int)
        C_dif_ls = X_dif @ np.linalg.pinv(D)
        
        err_C_int_stls = np.linalg.norm(C_int_stls - C_ex, 2)
        err_C_dif_stls = np.linalg.norm(C_dif_stls - C_ex, 2)
        
        err_C_int_ls = np.linalg.norm(C_int_ls - C_ex, 2)
        err_C_dif_ls = np.linalg.norm(C_dif_ls - C_ex, 2)

        ERR_C_int_stls.append(err_C_int_stls)
        ERR_C_int_ls.append(err_C_int_ls)
        
        ERR_C_dif_stls.append(err_C_dif_stls)
        ERR_C_dif_ls.append(err_C_dif_ls)
        
    # Save the trial results using the seed in the filename
    np.savez(trial_file,
             ERR_C_int_stls = np.array(ERR_C_int_stls),
             ERR_C_int_ls   = np.array(ERR_C_int_ls),
             ERR_C_dif_stls = np.array(ERR_C_dif_stls),
             ERR_C_dif_ls   = np.array(ERR_C_dif_ls),
             seed=seed)
    
    return seed

In [5]:
# Main loop with multiprocessing 
if __name__ == "__main__":
    start = time.time()
    NUM_WORKERS = 30  # Pc has 40 CPUs
        
    with Pool(processes=NUM_WORKERS) as pool:
        # Wrap imap_unordered with tqdm to show progress
        seeds = []
        for result in tqdm(pool.imap_unordered(run_trial, range(NUM_TRIALS)), total=NUM_TRIALS):
            if result is not None:
                seeds.append(result)

    np.save(os.path.join(SAVE_DIR, "trial_seeds.npy"), np.array(seeds))

    print(f"All done in {time.time() - start:.2f} seconds.")

100%|██████████| 1/1 [00:26<00:00, 26.49s/it]

All done in 26.78 seconds.



