In [1]:
import netCDF4 as nc
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import seaborn as sns
from dotenv import load_dotenv
from scipy.spatial import cKDTree

import os
import glob
import pandas as pd
import json

from utils.db_tools import get_db, filter_df, make_animation, get_data, metrics_grid, plot_grid

In [27]:
import numpy as np
from scipy.fft import fft, fftfreq


def classify_brusselator_trajectory(
    data,
    u_ss,
    v_ss,
    blowup_threshold=1e6,
    convergence_eps=1e-3,
    deriv_eps=1e-3,
    osc_threshold=0.1,
):
    """
    Classify a Brusselator trajectory into one of five categories.

    Args:
        data: Trajectory array of shape [1, time, x, y] (u and v interleaved).
        u_ss, v_ss: Steady-state values (constant across space).
        blowup_threshold: Threshold for detecting blowup.
        convergence_eps: Tolerance for convergence to SS.
        deriv_eps: Tolerance for near-zero time derivatives.
        osc_threshold: Minimum relative spectral power for oscillations.

    Returns:
        String indicating the classification result.
    """
    # Extract u and v (remove dummy dimension)
    u = data[0, :, :, 0::2]  # shape [time, x, y//2]
    v = data[0, :, :, 1::2]

    # Check blowup
    if np.max(u) > blowup_threshold or np.max(v) > blowup_threshold:
        return "BU"

    # Deviation from SS over time (L2 norm)
    time_steps = u.shape[0]
    dev_u = np.linalg.norm(u - u_ss, axis=(1, 2))  # [time]
    dev_v = np.linalg.norm(v - v_ss, axis=(1, 2))
    total_dev = dev_u + dev_v  # [time]

    # Check convergence to SS
    last_dev = total_dev[-int(0.1 * time_steps) :]  # Last 10% of steps
    # print(np.mean(last_dev))
    if np.mean(last_dev) < convergence_eps and np.all(np.diff(last_dev) <= 0):
        return "SS"

    # Time derivatives (finite difference)
    du = np.diff(u, axis=0)  # shape [time-1, x, y//2]
    dv = np.diff(v, axis=0)
    deriv_norm = np.linalg.norm(du, axis=(1, 2)) + np.linalg.norm(dv, axis=(1, 2))

    # Check convergence to different state
    last_deriv = deriv_norm[-int(0.1 * len(deriv_norm)) :]
    print(np.mean(last_deriv))
    if np.mean(last_deriv) < deriv_eps:
        return "DS"

    # Oscillations: FFT of spatially averaged u(t)
    u_avg = np.mean(u, axis=(1, 2))  # [time]
    v_avg = np.mean(v, axis=(1, 2))

    # Compute FFT power spectrum for u
    n = len(u_avg)
    freq = fftfreq(n)
    fft_u = np.abs(fft(u_avg - u_ss)) / n
    fft_u[0] = 0  # Ignore DC component (steady state)

    # Dominant frequency check
    dominant_power = np.max(fft_u)
    total_power = np.sum(fft_u)
    if dominant_power > osc_threshold * total_power:
        return "OSC"

    # If none of the above
    return "?"


In [28]:
def classify_trajectories(
    df,
    start_frame,
    blowup_threshold=1e6,
    convergence_eps=1e-3,
    deriv_eps=1e-3,
    osc_threshold=0.1,
) -> pd.DataFrame:
    """
    Classify runs based on behavior: inserts column 'category' into DataFrame.
    Possible categories: 'steady_state', 'interesting', 'divergent_or_unknown'.
    Args:
        df: DataFrame containing run metadata.
        steady_threshold: Threshold for ||du/dt|| to classify as steady.
        osc_threshold: Threshold for oscillatory behavior.
        dev_threshold: Threshold for deviation from the steady state.

    Returns:
        Updated DataFrame with classification labels.
    """

    if len(df) == 0:
        return None
    df = df.sort_values(by=["A", "B"])
    categories = []
    for i, row in df.iterrows():
        Nt = row["n_snapshots"]
        assert start_frame < Nt, "start_frame must be less than Nt"

        ds = nc.Dataset(row["filename"])
        data = ds.variables["data"][:]  # Assume shape [time, spatial, ...]
        A = row.A
        B = row.B
        categories.append(classify_brusselator_trajectory(data, A, B/A, blowup_threshold, convergence_eps, deriv_eps, osc_threshold))
    df["category"] = categories
    return df

In [29]:
model = "bruss"
run_id = "abd_test"
load_dotenv()
data_dir = os.getenv("DATA_DIR")
output_dir = os.getenv("OUT_DIR")
output_dir = os.path.join(output_dir, model, run_id)
os.makedirs(output_dir, exist_ok=True)
df0 = get_db(os.path.join(data_dir, model, run_id))
len(df0)

378

In [5]:
df = df0.copy()

In [33]:
# np.linalg.norm(get_data()[0,-1,:,0::2] - 0.1) / (128**2)
df1 = filter_df(df, 5, 25)
df1 = classify_trajectories(df1, 0, convergence_eps=128/100)

0.0


In [17]:
df = filter_df(df, Du=1, Dv=4)
df = classify_trajectories(df, 0, convergence_eps=128/100);

In [25]:
for A in df.A.unique():
    for B in filter_df(df,A).B.unique():
        print(f"{filter_df(df,A,B).iloc[0]['category']:3}", end=" ")
    print("")

SS  SS  SS  SS  SS  SS  SS  
SS  SS  SS  OSC ?   ?   ?   
SS  SS  OSC ?   ?   ?   ?   
SS  SS  ?   ?   ?   ?   ?   
SS  SS  SS  OSC ?   ?   DS  
SS  SS  SS  SS  SS  DS  ?   


In [34]:
metrics_grid(df, 10, var1="A", var2="B", metric="dt", filename="dt-Du_1-Dv_4.png")

array([[<Axes: title={'center': 'A=0.100\nB = 0.125'}>,
        <Axes: title={'center': 'A=0.100\nB = 0.175'}>,
        <Axes: title={'center': 'A=0.100\nB = 0.200'}>,
        <Axes: title={'center': 'A=0.100\nB = 0.250'}>,
        <Axes: title={'center': 'A=0.100\nB = 0.300'}>,
        <Axes: title={'center': 'A=0.100\nB = 0.400'}>,
        <Axes: title={'center': 'A=0.100\nB = 0.500'}>],
       [<Axes: title={'center': 'A=0.500\nB = 0.625'}>,
        <Axes: title={'center': 'A=0.500\nB = 0.875'}>,
        <Axes: title={'center': 'A=0.500\nB = 1.000'}>,
        <Axes: title={'center': 'A=0.500\nB = 1.250'}>,
        <Axes: title={'center': 'A=0.500\nB = 1.500'}>,
        <Axes: title={'center': 'A=0.500\nB = 2.000'}>,
        <Axes: title={'center': 'A=0.500\nB = 2.500'}>],
       [<Axes: title={'center': 'A=1.000\nB = 1.250'}>,
        <Axes: title={'center': 'A=1.000\nB = 1.750'}>,
        <Axes: title={'center': 'A=1.000\nB = 2.000'}>,
        <Axes: title={'center': 'A=1.000\nB = 