In [14]:
#!/usr/bin/env python
import os
import sys
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import matplotlib as mpl
import matplotlib.pyplot as plt

# ---------------------------------------------------------------------
# Paths / imports
# ---------------------------------------------------------------------
sys.path.append("../utility")

from network import KoopmanNet
from dataset import KoopmanDatasetCollector

LOG_CSV = os.path.join("..", "log", "Sep_21", "koopman_results_log.csv")

TARGET_ENV = "Polynomial"
TARGET_ENCODE_DIM = 48
TARGET_TRAIN_SAMPLES = 140000
TARGET_M_VALUES = [50, 100, 200]

# ---------------------------------------------------------------------
# Matplotlib settings (Updated for larger fonts)
# ---------------------------------------------------------------------
mpl.rcParams.update({
    "figure.dpi": 150,
    "savefig.dpi": 300,
    "figure.constrained_layout.use": True,
    # Increased font sizes
    "font.size": 12,
    "axes.titlesize": 18,     # Big headers for columns
    "axes.labelsize": 14,     # Big labels for axes
    "xtick.labelsize": 12,
    "ytick.labelsize": 12,
    "legend.fontsize": 12,
    "lines.linewidth": 1.5,
    "axes.formatter.use_mathtext": True,
    "pdf.fonttype": 42,
    "ps.fonttype": 42,
    "font.family": "serif",
    "font.serif": ["Times New Roman", "Times", "DejaVu Serif"],
})

# ---------------------------------------------------------------------
# Helper functions
# ---------------------------------------------------------------------
def build_layers_from_row(row: pd.Series) -> list[int]:
    state_dim = int(row["state_dim"])
    encode_dim = int(row["encode_dim"])
    hidden_layers = int(row["hidden_layers"])
    hidden_dim = int(row["hidden_dim"])

    layers = [state_dim]
    for _ in range(hidden_layers):
        layers.append(hidden_dim)
    layers.append(encode_dim)
    return layers

def build_model_from_row(row: pd.Series, device: str = "cpu"):
    state_dim = int(row["state_dim"])
    encode_dim = int(row["encode_dim"])
    Nkoopman = state_dim + encode_dim

    u_dim_raw = row["u_dim"]
    u_dim = None if (pd.isna(u_dim_raw) or u_dim_raw is None) else int(u_dim_raw)

    use_residual = bool(row["use_residual"])
    layers = build_layers_from_row(row)

    model = KoopmanNet(layers, Nkoopman, u_dim, use_residual=use_residual).to(device)

    ckpt_path = row["model_path"]
    if not os.path.exists(ckpt_path):
        raise FileNotFoundError(f"Checkpoint not found: {ckpt_path}")

    ckpt = torch.load(ckpt_path, map_location=device)
    if "model" not in ckpt:
        raise KeyError(f"'model' key not in checkpoint: {ckpt_path}")

    state_dict = ckpt["model"]
    model.load_state_dict(state_dict)
    model.eval()
    return model, Nkoopman, u_dim, state_dim

def cov_of_latent_z(model: nn.Module,
                    data_tensor: torch.Tensor,
                    state_dim: int,
                    u_dim: int | None,
                    device: str = "cpu") -> np.ndarray:
    data = data_tensor.to(device)
    steps, traj_num, N = data.shape

    if u_dim is None:
        X0 = data[0, :, :]
        encoded = model.encode(X0)
    else:
        X0 = data[0, :, u_dim:]
        encoded = model.encode(X0)

    z = encoded[:, state_dim:]
    z_mean = z.mean(dim=0, keepdim=True)
    z_centered = z - z_mean
    cov = (z_centered.T @ z_centered) / (z_centered.size(0) - 1)
    return cov.detach().cpu().numpy()

def covariance_to_correlation(cov: np.ndarray, eps: float = 1e-8) -> np.ndarray:
    d = np.sqrt(np.diag(cov) + eps)
    denom = np.outer(d, d) + eps
    return cov / denom

def average_mats(mats: list[np.ndarray]) -> np.ndarray:
    return np.mean(np.stack(mats, axis=0), axis=0)

def mask_diag(mat: np.ndarray) -> np.ndarray:
    m = mat.copy()
    np.fill_diagonal(m, 0.0)
    return m

# ---------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------
def main():
    device = "cuda" if torch.cuda.is_available() else "cpu"

    assert os.path.exists(LOG_CSV), f"CSV log not found: {LOG_CSV}"
    log = pd.read_csv(LOG_CSV)

    df = log[
        (log["env_name"] == TARGET_ENV)
        & (log["encode_dim"] == TARGET_ENCODE_DIM)
        & (log["train_samples"] == TARGET_TRAIN_SAMPLES)
    ].copy()
    assert not df.empty, "Filtered dataframe is empty."

    df["use_covariance_loss"] = df["use_covariance_loss"].astype(bool)
    grouped = df.groupby(["m", "use_covariance_loss"])

    latent_corr = {}

    for m_value in TARGET_M_VALUES:
        if (m_value, False) not in grouped.groups or (m_value, True) not in grouped.groups:
            continue

        group_no_cov = grouped.get_group((m_value, False))
        group_with_cov = grouped.get_group((m_value, True))

        print(f"Processing m={m_value}...")

        data_collector = KoopmanDatasetCollector(
            TARGET_ENV, TARGET_TRAIN_SAMPLES, 20000, 20000,
            Ksteps=1, normalize=False, m=m_value,
        )
        _, _, Ktest_data_np = data_collector.get_data()
        Ktest_data = torch.from_numpy(Ktest_data_np).float()
        state_dim_ds = data_collector.state_dim
        u_dim_ds = data_collector.u_dim

        # --- no cov ---
        covs = []
        for _, row in group_no_cov.iterrows():
            model, _, u_dim_r, _ = build_model_from_row(row, device=device)
            cov = cov_of_latent_z(model, Ktest_data, state_dim_ds, u_dim_ds, device=device)
            covs.append(cov)
        latent_corr[(m_value, False)] = covariance_to_correlation(average_mats(covs))

        # --- with cov ---
        covs = []
        for _, row in group_with_cov.iterrows():
            model, _, u_dim_r, _ = build_model_from_row(row, device=device)
            cov = cov_of_latent_z(model, Ktest_data, state_dim_ds, u_dim_ds, device=device)
            covs.append(cov)
        latent_corr[(m_value, True)] = covariance_to_correlation(average_mats(covs))

    # -----------------------------------------------------------------
    # Plotting
    # -----------------------------------------------------------------
    mats_for_scale = [mask_diag(v) for v in latent_corr.values()]
    vmax = max(np.abs(m).max() for m in mats_for_scale) if mats_for_scale else 1.0
    vmax = max(vmax, 1e-3)
    vmin = -vmax

    # Increased figure size slightly to accommodate larger fonts
    fig, axes = plt.subplots(2, 3, figsize=(10, 6), constrained_layout=True)

    for col, m_value in enumerate(TARGET_M_VALUES):
        for row, use_cov in enumerate([False, True]):
            ax = axes[row, col]
            mat = latent_corr.get((m_value, use_cov), None)
            
            if mat is None:
                ax.axis("off")
                continue

            mat_plot = mask_diag(mat)
            im = ax.imshow(mat_plot, vmin=vmin, vmax=vmax, cmap="coolwarm")

            # --- REDUCE REPETITIVE LABELS ---
            
            # 1. Column Headers (Only on top row)
            if row == 0:
                ax.set_title(f"$m = {m_value}$")
            
            # 2. Row Labels (Only on left column)
            if col == 0:
                label = "Without $\mathcal{L}_{cov}$" if not use_cov else "With $\mathcal{L}_{cov}$"
                ax.set_ylabel(f"{label}\nLatent Index")
            else:
                ax.set_ylabel("")
                # Hide y-ticks on internal plots for cleaner look
                ax.set_yticks([]) 

            # 3. X Labels (Only on bottom row)
            if row == 1:
                ax.set_xlabel("Latent Index")
            else:
                ax.set_xlabel("")
                # Hide x-ticks on top row
                ax.set_xticks([])

    # Colorbar
    cbar = fig.colorbar(im, ax=axes.ravel().tolist(), shrink=0.8, aspect=30)
    cbar.set_label("Correlation (off-diagonal)", fontsize=14)
    cbar.ax.tick_params(labelsize=12)

    out_dir = "./latent_cov_heatmaps"
    os.makedirs(out_dir, exist_ok=True)
    pdf_path = os.path.join(out_dir, "Polynomial_m_sweep_heatmap.pdf")
    
    fig.savefig(pdf_path, bbox_inches="tight")
    print("✅ Saved figure to:", pdf_path)
    plt.close(fig)

if __name__ == "__main__":
    main()

Processing m=50...
Processing m=100...
Processing m=200...
✅ Saved figure to: ./latent_cov_heatmaps/Polynomial_m_sweep_heatmap.pdf


In [1]:
#!/usr/bin/env python
import os
import sys
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import matplotlib as mpl
import matplotlib.pyplot as plt

# ---------------------------------------------------------------------
# Paths / imports
# ---------------------------------------------------------------------
sys.path.append("../utility")

from network import KoopmanNet
from dataset import KoopmanDatasetCollector

LOG_CSV = os.path.join("..", "log", "Sep_21", "koopman_results_log.csv")

TARGET_ENV = "Polynomial"
TARGET_ENCODE_DIM = 48
TARGET_TRAIN_SAMPLES = 140000
TARGET_M_VALUES = [50, 100, 200]

# Quantitative metric threshold (covariance magnitude)
# Counts how many OFF-DIAGONAL entries satisfy |cov_ij| < COV_THRESHOLD
COV_THRESHOLD = 1e-3

# ---------------------------------------------------------------------
# Matplotlib settings (Updated for larger fonts)
# ---------------------------------------------------------------------
mpl.rcParams.update({
    "figure.dpi": 150,
    "savefig.dpi": 300,
    "figure.constrained_layout.use": True,
    "font.size": 12,
    "axes.titlesize": 18,
    "axes.labelsize": 14,
    "xtick.labelsize": 12,
    "ytick.labelsize": 12,
    "legend.fontsize": 12,
    "lines.linewidth": 1.5,
    "axes.formatter.use_mathtext": True,
    "pdf.fonttype": 42,
    "ps.fonttype": 42,
    "font.family": "serif",
    "font.serif": ["Times New Roman", "Times", "DejaVu Serif"],
})

# ---------------------------------------------------------------------
# Helper functions
# ---------------------------------------------------------------------
def build_layers_from_row(row: pd.Series) -> list[int]:
    state_dim = int(row["state_dim"])
    encode_dim = int(row["encode_dim"])
    hidden_layers = int(row["hidden_layers"])
    hidden_dim = int(row["hidden_dim"])

    layers = [state_dim]
    for _ in range(hidden_layers):
        layers.append(hidden_dim)
    layers.append(encode_dim)
    return layers

def build_model_from_row(row: pd.Series, device: str = "cpu"):
    state_dim = int(row["state_dim"])
    encode_dim = int(row["encode_dim"])
    Nkoopman = state_dim + encode_dim

    u_dim_raw = row["u_dim"]
    u_dim = None if (pd.isna(u_dim_raw) or u_dim_raw is None) else int(u_dim_raw)

    use_residual = bool(row["use_residual"])
    layers = build_layers_from_row(row)

    model = KoopmanNet(layers, Nkoopman, u_dim, use_residual=use_residual).to(device)

    ckpt_path = row["model_path"]
    if not os.path.exists(ckpt_path):
        raise FileNotFoundError(f"Checkpoint not found: {ckpt_path}")

    ckpt = torch.load(ckpt_path, map_location=device)
    if "model" not in ckpt:
        raise KeyError(f"'model' key not in checkpoint: {ckpt_path}")

    model.load_state_dict(ckpt["model"])
    model.eval()
    return model, Nkoopman, u_dim, state_dim

@torch.no_grad()
def cov_of_latent_z(model: nn.Module,
                    data_tensor: torch.Tensor,
                    state_dim: int,
                    u_dim: int | None,
                    device: str = "cpu") -> np.ndarray:
    """
    Returns covariance of the latent part z (shape: encode_dim x encode_dim),
    computed from encoded initial states across trajectories.
    """
    data = data_tensor.to(device)
    steps, traj_num, N = data.shape

    if u_dim is None:
        X0 = data[0, :, :]
        encoded = model.encode(X0)
    else:
        X0 = data[0, :, u_dim:]
        encoded = model.encode(X0)

    z = encoded[:, state_dim:]  # latent-only (encode_dim)
    z_centered = z - z.mean(dim=0, keepdim=True)
    cov = (z_centered.T @ z_centered) / (z_centered.size(0) - 1)
    return cov.detach().cpu().numpy()

def covariance_to_correlation(cov: np.ndarray, eps: float = 1e-8) -> np.ndarray:
    d = np.sqrt(np.diag(cov) + eps)
    denom = np.outer(d, d) + eps
    return cov / denom

def average_mats(mats: list[np.ndarray]) -> np.ndarray:
    return np.mean(np.stack(mats, axis=0), axis=0)

def count_small_offdiag_cov(cov: np.ndarray, thresh: float) -> tuple[int, int, float]:
    """
    Counts OFF-DIAGONAL entries with |cov_ij| < thresh.
    Returns: (count, total_offdiag, fraction)
    """
    if cov.ndim != 2 or cov.shape[0] != cov.shape[1]:
        raise ValueError(f"cov must be square 2D array, got shape {cov.shape}")
    n = cov.shape[0]
    off_mask = ~np.eye(n, dtype=bool)
    vals = np.abs(cov[off_mask])
    total = vals.size
    cnt = int(np.sum(vals < thresh))
    frac = float(cnt / total) if total > 0 else 0.0
    return cnt, total, frac

# ---------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------
def main():
    device = "cuda" if torch.cuda.is_available() else "cpu"

    assert os.path.exists(LOG_CSV), f"CSV log not found: {LOG_CSV}"
    log = pd.read_csv(LOG_CSV)

    df = log[
        (log["env_name"] == TARGET_ENV)
        & (log["encode_dim"] == TARGET_ENCODE_DIM)
        & (log["train_samples"] == TARGET_TRAIN_SAMPLES)
    ].copy()
    assert not df.empty, "Filtered dataframe is empty."

    df["use_covariance_loss"] = df["use_covariance_loss"].astype(bool)
    grouped = df.groupby(["m", "use_covariance_loss"])

    # Store BOTH covariance and correlation
    latent_cov: dict[tuple[int, bool], np.ndarray] = {}
    latent_corr: dict[tuple[int, bool], np.ndarray] = {}
    cov_small_stats: dict[tuple[int, bool], tuple[int, int, float]] = {}

    for m_value in TARGET_M_VALUES:
        if (m_value, False) not in grouped.groups or (m_value, True) not in grouped.groups:
            print(f"Skipping m={m_value} (missing with/without cov-loss runs).")
            continue

        group_no_cov = grouped.get_group((m_value, False))
        group_with_cov = grouped.get_group((m_value, True))

        print(f"Processing m={m_value}...")

        data_collector = KoopmanDatasetCollector(
            TARGET_ENV, TARGET_TRAIN_SAMPLES, 20000, 20000,
            Ksteps=1, normalize=False, m=m_value,
        )
        _, _, Ktest_data_np = data_collector.get_data()
        Ktest_data = torch.from_numpy(Ktest_data_np).float()
        state_dim_ds = data_collector.state_dim
        u_dim_ds = data_collector.u_dim

        # --- no cov ---
        covs = []
        for _, row in group_no_cov.iterrows():
            model, _, _, _ = build_model_from_row(row, device=device)
            covs.append(cov_of_latent_z(model, Ktest_data, state_dim_ds, u_dim_ds, device=device))
        cov_avg = average_mats(covs)
        latent_cov[(m_value, False)] = cov_avg
        latent_corr[(m_value, False)] = covariance_to_correlation(cov_avg)
        cov_small_stats[(m_value, False)] = count_small_offdiag_cov(cov_avg, COV_THRESHOLD)

        # --- with cov ---
        covs = []
        for _, row in group_with_cov.iterrows():
            model, _, _, _ = build_model_from_row(row, device=device)
            covs.append(cov_of_latent_z(model, Ktest_data, state_dim_ds, u_dim_ds, device=device))
        cov_avg = average_mats(covs)
        latent_cov[(m_value, True)] = cov_avg
        latent_corr[(m_value, True)] = covariance_to_correlation(cov_avg)
        cov_small_stats[(m_value, True)] = count_small_offdiag_cov(cov_avg, COV_THRESHOLD)

    # -----------------------------------------------------------------
    # Print quantitative summary (per m, with/without cov-loss)
    # -----------------------------------------------------------------
    print("\n=== Quantitative covariance-sparsity metric (OFF-DIAGONAL) ===")
    print(f"Threshold: |cov_ij| < {COV_THRESHOLD:g}")
    for m_value in TARGET_M_VALUES:
        a = cov_small_stats.get((m_value, False), None)
        b = cov_small_stats.get((m_value, True), None)
        if a is None or b is None:
            continue
        cnt0, tot0, frac0 = a
        cnt1, tot1, frac1 = b
        delta = frac1 - frac0
        print(
            f"m={m_value:>3} | "
            f"no cov-loss: {cnt0}/{tot0} ({frac0:.2%}) | "
            f"with cov-loss: {cnt1}/{tot1} ({frac1:.2%}) | "
            f"Δ={delta:.2%}"
        )

    # -----------------------------------------------------------------
    # Plotting (Correlation heatmaps, diagonal INCLUDED)
    # -----------------------------------------------------------------
    # Correlation should be in [-1, 1]. Use fixed scale for easy comparison.
    vmin, vmax = -1.0, 1.0

    fig, axes = plt.subplots(2, 3, figsize=(10, 6), constrained_layout=True)

    im = None
    for col, m_value in enumerate(TARGET_M_VALUES):
        for row, use_cov in enumerate([False, True]):
            ax = axes[row, col]
            mat = latent_corr.get((m_value, use_cov), None)

            if mat is None:
                ax.axis("off")
                continue

            # NO diagonal masking anymore
            im = ax.imshow(mat, vmin=vmin, vmax=vmax, cmap="coolwarm")

            # Column Headers (Only on top row)
            if row == 0:
                ax.set_title(f"$m = {m_value}$")

            # Row Labels (Only on left column)
            if col == 0:
                label = "Without $\mathcal{L}_{cov}$" if not use_cov else "With $\mathcal{L}_{cov}$"
                ax.set_ylabel(f"{label}\nLatent Index")
            else:
                ax.set_ylabel("")
                ax.set_yticks([])

            # X Labels (Only on bottom row)
            if row == 1:
                ax.set_xlabel("Latent Index")
            else:
                ax.set_xlabel("")
                ax.set_xticks([])

            # Quantitative annotation (uses COVARIANCE, off-diagonal)
            stats = cov_small_stats.get((m_value, use_cov), None)
            if stats is not None:
                cnt, tot, frac = stats
                ax.text(
                    0.98, 0.02,
                    f"$|cov|<{COV_THRESHOLD:g}$\n{cnt}/{tot} ({frac:.1%})",
                    transform=ax.transAxes,
                    ha="right", va="bottom",
                    fontsize=10,
                    bbox=dict(boxstyle="round,pad=0.25", facecolor="white", alpha=0.75, edgecolor="none")
                )

    # Colorbar
    if im is not None:
        cbar = fig.colorbar(im, ax=axes.ravel().tolist(), shrink=0.8, aspect=30)
        cbar.set_label("Correlation", fontsize=14)
        cbar.ax.tick_params(labelsize=12)

    out_dir = "./latent_cov_heatmaps"
    os.makedirs(out_dir, exist_ok=True)
    pdf_path = os.path.join(out_dir, "Polynomial_m_sweep_heatmap.pdf")

    fig.savefig(pdf_path, bbox_inches="tight")
    print("\n✅ Saved figure to:", pdf_path)
    plt.close(fig)

if __name__ == "__main__":
    main()


pybullet build time: Jan 29 2025 23:16:28


Processing m=50...
Processing m=100...
Processing m=200...

=== Quantitative covariance-sparsity metric (OFF-DIAGONAL) ===
Threshold: |cov_ij| < 0.001
m= 50 | no cov-loss: 154/2256 (6.83%) | with cov-loss: 1004/2256 (44.50%) | Δ=37.68%
m=100 | no cov-loss: 22/2256 (0.98%) | with cov-loss: 440/2256 (19.50%) | Δ=18.53%
m=200 | no cov-loss: 4/2256 (0.18%) | with cov-loss: 244/2256 (10.82%) | Δ=10.64%

✅ Saved figure to: ./latent_cov_heatmaps/Polynomial_m_sweep_heatmap.pdf
