<a href="https://colab.research.google.com/github/openscilabs/isda/blob/main/validation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ISDA - Independent Structural-Dimensionality Analysis

This notebook serves as a comprehensive validation suite for the Independent Structural-Dimensionality Analysis (ISDA) framework.
It systematically evaluates the algorithm's efficacy across a spectrum of synthetic benchmarks, ranging from canonical correlation patterns—including linear redundancies and latent manifolds—to complex Multi-Objective Problems (MOPs). The analysis verifies ISDA's capability to correctly identify intrinsic dimensionality and preserve the topological fidelity of the Pareto frontier.

In [None]:
# Install ISDA 
!pip install --upgrade git+https://github.com/openscilabs/isda.git

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import isda

# Reload for development iteration
import importlib
importlib.reload(isda)

print("ISDA imported successfully.")

## 1. Data Generators & Utilities

### General Helpers

In [None]:
def _truth(name, intrinsic_dim_expected, blocks_expected, notes=""):
    return {
        "name": name,
        "intrinsic_dim_expected": int(intrinsic_dim_expected),
        "blocks_expected": blocks_expected,
        "notes": notes,
    }

def _mk_block_names(start, size):
    # start is 1-based
    return [f"f{i}" for i in range(start, start + size)]

def _repeat_with_small_noise(base, rng, noise):
    # base: (N,) -> returns perturbed (N,)
    return base + noise * rng.normal(size=base.shape[0])

def _mop_df(Y):
    return pd.DataFrame(Y, columns=[f"f{i+1}" for i in range(Y.shape[1])])

### Validation Utilities
Custom function to evaluate reconstruction fidelity on benchmark datasets.

In [None]:
def evaluate_reduced_model_fidelity(results_dict):
    """
    Evaluates reconstruction fidelity and summarizes ISDA performance.
    Adapted for local notebook usage.
    """
    results_summary = []
    for name, data in results_dict.items():
        # data is now a dictionary wrapping an ISDAResult object and truth
        result_obj = data.get("result_obj")
        truth = data.get("truth", {})
        
        Y = result_obj.Y
        mis = result_obj.best_mis
        mis_indices = mis["mis_indices"] if mis else []
        
        # Calculate fidelity (F_real)
        if not mis_indices:
            fidelity = 0.0
            ses = 0.0
        else:
            # Use stored SES results if available, else recalculate
            if result_obj.ses_results:
                fidelity = result_obj.ses_results["F_real"]
                ses = result_obj.ses_results.get("ses", 0.0)
            else:
                ses_out = isda.calculate_ses(Y, mis_indices, n_perm=1, return_details=True)
                fidelity = ses_out["F_real"]
                ses = ses_out.get("ses", 0.0)

        expected_dim = truth.get("intrinsic_dim_expected", None)
        mis_size = len(mis_indices)
        
        # Metrics from ISDA result
        regime_name = result_obj.regime.name if result_obj.regime else "N/A"
        alpha_min = result_obj.alpha_min
        alpha_max = result_obj.alpha_max
        compactness = getattr(result_obj, "min_compactness", 1.0)

        # Status heuristic
        status = "Bad"
        # If F_real is high, it's good
        if fidelity >= 0.9:
            status = "OK"
        # If dimensionality matches exactly (even if Fidelity is tricky e.g. noise)
        elif expected_dim and mis_size == expected_dim:
            status = "OK"
        # If strictly noise
        elif str(regime_name) == "SIGNAL_BELOW_NOISE":
            status = "Noise"

        entry = {
            "Case": name,
            "Alpha Min": f"{alpha_min:.2e}",
            "Alpha Max": f"{alpha_max:.2e}",
            "Regime": regime_name,
            "Expected Dim": expected_dim,
            "MIS Size": mis_size,
            "Fidelity (F_real)": f"{fidelity:.4f}",
            "SES": f"{ses:.4f}",
            "Min Compactness": f"{compactness:.4f}",
            "Status": status
        }
        results_summary.append(entry)

    df_summary = pd.DataFrame(results_summary)
    
    # Reorder columns
    cols = [
        "Case", "Regime", "Alpha Min", "Alpha Max", "Expected Dim", 
        "MIS Size", "Fidelity (F_real)", "SES", "Min Compactness", "Status"
    ]
    # Filter only columns that exist
    df_summary = df_summary[cols]

    return df_summary

### Canonical Structure Test Suite (qualitative calibration)

In [None]:
def run_cases(cases_list, N=300):
    results = {}
    for name, gen in cases_list:
        print(f"\n{'='*80}")
        print(f"Running {name}...")
        print(f"{'='*80}")
        Y, truth = gen(N=N)
        # Execute ISDA analysis (using defaults: caution=0.5)
        result = isda.analyze(Y, caution=0.5, run_ses=True, name=name)
        
        # Print detailed report
        print(result.summary())
        
        # Plot Graph
        try:
            result.plot()
            plt.show()
        except Exception as e:
            print(f"Plotting failed: {e}")
        
        results[name] = {
            "result_obj": result,
            "truth": truth
        }
    return results

In [None]:

# --- Battery 1: Standard Correlation Cases ---

def make_case1_independence(N=1000, M=20, seed=123):
    rng = np.random.default_rng(seed)
    Y = rng.normal(size=(N, M))
    cols = [f"f{i+1}" for i in range(M)]
    df = pd.DataFrame(Y, columns=cols)
    truth = _truth(
        name="Case 1 - Total independence",
        intrinsic_dim_expected=M,
        blocks_expected=[[c] for c in cols],
        notes="Each objective is independent (Gaussian noise)."
    )
    return df, truth

def make_case2_total_redundancy(N=1000, M=20, seed=123):
    rng = np.random.default_rng(seed)
    latent = rng.normal(size=(N, 1))
    noise = rng.normal(scale=0.05, size=(N, M))
    Y = latent + noise
    cols = [f"f{i+1}" for i in range(M)]
    df = pd.DataFrame(Y, columns=cols)
    truth = _truth(
        name="Case 2 - Total redundancy",
        intrinsic_dim_expected=1,
        blocks_expected=[cols],
        notes="A single latent; all objectives are noisy copies."
    )
    return df, truth

def make_case3_block_structure(N=1000, M=20, seed=123):
    rng = np.random.default_rng(seed)
    assert M == 20
    latent_blocks = rng.normal(size=(N, 4))
    Y = np.zeros((N, M))
    for b in range(4):
        for j in range(5):
            idx = 5*b + j
            Y[:, idx] = latent_blocks[:, b] + rng.normal(scale=0.2, size=N)
    cols = [f"f{i+1}" for i in range(M)]
    df = pd.DataFrame(Y, columns=cols)
    blocks = [
        [f"f{i}" for i in range(1, 6)],
        [f"f{i}" for i in range(6, 11)],
        [f"f{i}" for i in range(11, 16)],
        [f"f{i}" for i in range(16, 21)],
    ]
    truth = _truth(
        name="Case 3 - Blocks (4 x 5)",
        intrinsic_dim_expected=4,
        blocks_expected=blocks,
        notes="4 independent latents; each generates 5 objectives."
    )
    return df, truth

def make_case4_two_big_blocks(N=1000, M=20, seed=123):
    rng = np.random.default_rng(seed)
    assert M == 20
    latent_blocks = rng.normal(size=(N, 2))
    Y = np.zeros((N, M))
    for i in range(10):
        Y[:, i] = latent_blocks[:, 0] + rng.normal(scale=0.2, size=N)
    for i in range(10, 20):
        Y[:, i] = latent_blocks[:, 1] + rng.normal(scale=0.2, size=N)
    cols = [f"f{i+1}" for i in range(M)]
    df = pd.DataFrame(Y, columns=cols)
    truth = _truth(
        name="Case 4 - Blocks (2 x 10)",
        intrinsic_dim_expected=2,
        blocks_expected=[
            [f"f{i}" for i in range(1, 11)],
            [f"f{i}" for i in range(11, 21)],
        ],
        notes="2 independent latents; each generates 10 objectives."
    )
    return df, truth

def make_case5_chain_structure(N=1000, M=20, seed=123):
    rng = np.random.default_rng(seed)
    Y = np.zeros((N, M))
    Y[:, 0] = rng.normal(size=N)
    for j in range(1, M):
        Y[:, j] = Y[:, j-1] + rng.normal(scale=0.2, size=N)
    cols = [f"f{i+1}" for i in range(M)]
    df = pd.DataFrame(Y, columns=cols)
    truth = _truth(
        name="Case 5 - Chain",
        intrinsic_dim_expected=M,
        blocks_expected=[cols],
        notes="Sequential dependency."
    )
    return df, truth

def make_case6_mixed_structure(N=1000, M=20, seed=123):
    rng = np.random.default_rng(seed)
    assert M == 20
    Y = np.zeros((N, M))
    # First 10: independent
    Y[:, :10] = rng.normal(size=(N, 10))
    # Last 10: two latents
    latent1 = rng.normal(size=N)
    latent2 = rng.normal(size=N)
    for j in range(10, 15):
        Y[:, j] = latent1 + rng.normal(scale=0.2, size=N)
    for j in range(15, 20):
        Y[:, j] = latent2 + rng.normal(scale=0.2, size=N)
    cols = [f"f{i+1}" for i in range(M)]
    df = pd.DataFrame(Y, columns=cols)
    truth = _truth(
        name="Case 6 - Mixed (indep + latents)",
        intrinsic_dim_expected=12,
        blocks_expected=[[f"f{i}"] for i in range(1, 11)] + [[f"f{i}" for i in range(11, 16)], [f"f{i}" for i in range(16, 21)]],
        notes="f1..f10 independent; f11..f15 latent1; f16..f20 latent2."
    )
    return df, truth

def make_case7_pure_conflict_groups(N=1000, M=20, noise=0.05, seed=123, **kwargs):
    rng = np.random.default_rng(seed)
    if M < 2:
        raise ValueError("M must be >= 2")
    M_pos = (M + 1) // 2
    M_neg = M - M_pos
    x = rng.normal(size=N)
    Y_pos = np.column_stack([x + noise * rng.normal(size=N) for _ in range(M_pos)])
    Y_neg = np.column_stack([(-x) + noise * rng.normal(size=N) for _ in range(M_neg)])
    Y = np.column_stack([Y_pos, Y_neg])
    cols = [f"f{i+1}" for i in range(M)]
    Y = pd.DataFrame(Y, columns=cols)
    truth = {
        "name": f"Case 7 - Structural conflict (anti-corr) 2-groups",
        "intrinsic_dim_expected": 2,
        "blocks_expected": [cols[:M_pos], cols[M_pos:]],
        "notes": "Conflict groups (+x and -x).",
    }
    return Y, truth

battery1 = [
    ("Case 1 - Total independence", make_case1_independence),
    ("Case 2 - Total redundancy", make_case2_total_redundancy),
    ("Case 3 - Blocks (4 x 5)", make_case3_block_structure),
    ("Case 4 - Blocks (2 x 10)", make_case4_two_big_blocks),
    ("Case 5 - Chain", make_case5_chain_structure),
    ("Case 6 - Mixed (indep + latents)", make_case6_mixed_structure),
    ("Case 7 - Structural conflict (anti-corr) with groups", make_case7_pure_conflict_groups),
]

battery2 = [
    ("MOP-A — Monotonic redundancy", mopA_monotonic_redundancy),
    ("MOP-B — Trade-off + redundancies", mopB_tradeoff_with_redundancies),
    ("MOP-C — Latent blocks", mopC_latent_blocks_4x5),
    ("MOP-D — Pure conflict groups", mopD_pure_conflict_groups),
    ("MOP-E — Partial redundancy + noise", mopE_partial_redundancy_noisy),
    ("MOP-F — Regime switching", mopF_regime_switching),
]

print("\n=== RUNNING STANDARD CORRELATION BATTERY ===")
battery1_results = run_cases(battery1)

battery1_fidelity_df = evaluate_reduced_model_fidelity(battery1_results)
print("\n--- ISDA Reconstruction Fidelity Evaluation for Canonical Cases ---")
print(battery1_fidelity_df.to_markdown(index=False))

print("\n\n=== RUNNING MOP BENCHMARK BATTERY ===")
mop_results = run_cases(battery2)

mop_fidelity_df = evaluate_reduced_model_fidelity(mop_results)
print("\n--- ISDA Reconstruction Fidelity Evaluation for MOP Cases ---")
print(mop_fidelity_df.to_markdown(index=False))
