<a href="https://colab.research.google.com/github/tomheston/fragility-metrics/blob/main/notebooks/contingency_rxc.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# @title
# Complete Evidence Framework: r×c Contingency Tables (Independent Samples)
# v.25-NOV-2025
# Multinomial outcome analysis with complete p-fr-nb triplet
#
# Input:
#   r×c contingency table (r rows, c columns)
#   All cells as counts (non-negative integers)
#
# Output:
#   p  = p-value from chi-square test (or Fisher's exact for 2×2 if small N)
#   fr = GFQ (Global Fragility Quotient, stability of classification)
#   nb = RQ (Risk Quotient, distance from independence)
#
# GFI Algorithm:
#   Two-phase guaranteed optimal search:
#   Phase 1: Exact BFS with unit steps (radius-limited)
#   Phase 2: Pure Dijkstra (h=0) with multi-resolution steps
#   Guarantees GFI is the true global minimum
#
# GFQ Formula:
#   GFI = minimum number of cell-to-cell reallocations to flip significance
#   GFQ = GFI / N  (normalized [0,1])
#
# RQ Formula:
#   RRI = (1/k) Σ|O - E|  (raw distance from independence)
#   RQ = RRI / (N/k)  (normalized [0,1])
#   where k = (r-1)(c-1) = independent cells
#         O = observed counts, E = expected under independence
#
# Test Selection:
#   - 2×2 tables: Fisher's exact for small N/cells, else chi-square (no Yates)
#   - r×c tables (r>2 or c>2): Chi-square only
#
# Interpretation:
#   fr ∈ [0,1]: 0 = fragile, 1 = stable
#   nb ∈ [0,1]: 0 = independent, 1 = maximally associated
#
# Limits:
#   GFI computation: N ≤ 50000 (use RQ for larger samples)
#
# IF YOU USE THIS CALCULATOR PLEASE CITE:
# Heston, T. F. (2025). Fragility Metrics Toolkit [Software]. Zenodo.
#   https://doi.org/10.5281/zenodo.17254763
#
# © Thomas F. Heston 2025. CC-BY 4.0

# ----- SciPy availability guard -----
try:
    import scipy
except ImportError:
    try:
        import subprocess
        import sys
        subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "scipy"])
        import scipy
    except Exception:
        print("Please install scipy: pip install scipy")
        raise

import numpy as np
from scipy.stats import chi2_contingency, fisher_exact
from collections import deque
import heapq
import sys

# ========== CONFIGURABLE PARAMETERS ==========
ALPHA = 0.05
N_THRESHOLD = 5000         # Sample size threshold for chi-square vs Fisher (2×2 only)
MIN_CELL_THRESHOLD = 50    # Minimum cell count for chi-square validity (2×2 only)
GFI_THRESHOLD = 50000      # Don't compute GFI for N > this
# =============================================

# ---------- Core Utilities ----------

def test_p(table, use_chi2=False):
    """
    Unified significance test for r×c tables.

    Parameters:
    -----------
    table : 2D array-like
        Contingency table
    use_chi2 : bool
        If True, use chi-square; if False, use Fisher (2×2 only)

    Returns:
    --------
    float : p-value

    Note:
    -----
    For 2×2 tables, uses Fisher's exact (two-sided) or chi-square WITHOUT Yates correction.
    For r×c (r>2 or c>2), always uses chi-square.
    Matches test selection in 2×2 independent binary module.
    """
    table = np.array(table)
    r, c = table.shape

    if r == 2 and c == 2 and not use_chi2:
        # Fisher's exact for 2×2
        _, p = fisher_exact(table, alternative="two-sided")
        return p
    else:
        # Chi-square (no Yates correction to match 2×2 module)
        _, p, _, _ = chi2_contingency(table, correction=False)
        return p


def is_significant(p, alpha=ALPHA):
    """Check if p-value indicates significance."""
    return p <= alpha


def compute_expected(table):
    """Calculate expected counts under independence."""
    table = np.array(table, dtype=float)
    N = table.sum()
    if N == 0:
        return table
    row_totals = table.sum(axis=1, keepdims=True)
    col_totals = table.sum(axis=0, keepdims=True)
    return (row_totals @ col_totals) / N


# ---------- RRI / RQ (Robustness) ----------

def compute_rq(table):
    """
    Calculate Risk Quotient (RQ) for r×c table.

    RRI = (1/k) Σ|O - E|
    RQ = RRI / (N/k)

    where k = (r-1)(c-1) = independent cells

    Parameters:
    -----------
    table : 2D array-like
        Contingency table

    Returns:
    --------
    dict : {'RRI': float, 'RQ': float, 'k': int}

    Reference:
    ----------
    FRAGILITY_METRICS v9.7, §4.1

    Note:
    -----
    Uses k = (r-1)(c-1) = independent cells, consistent with framework definition.
    For 2×2: k = 1 (not 4).
    """
    table = np.array(table, dtype=float)
    r, c = table.shape
    N = table.sum()

    if N == 0:
        return {'RRI': None, 'RQ': None, 'k': None}

    # Expected counts under independence
    expected = compute_expected(table)

    # Number of independent cells
    k = (r - 1) * (c - 1)

    if k == 0:
        return {'RRI': None, 'RQ': None, 'k': 0}

    # RRI and RQ
    RRI = np.abs(table - expected).sum() / k
    RQ = RRI / (N / k)

    return {
        'RRI': float(RRI),
        'RQ': float(RQ),
        'k': int(k)
    }


# ---------- GFI / GFQ (Fragility) ----------

def get_step_sizes(N):
    """Determine multi-resolution step sizes based on N."""
    if N < 1000:
        return [1]
    elif N < 5000:
        return [1, 2, 3]
    elif N < 20000:
        return [1, 2, 3, 4]
    else:
        return [1, 2, 3, 4, 5]


def table_to_state(table):
    """Convert table to state tuple (flattened, excluding last cell)."""
    flat = table.flatten()
    return tuple(flat[:-1])


def state_to_table(state, N, shape):
    """Convert state back to table."""
    flat = list(state) + [N - sum(state)]
    return np.array(flat).reshape(shape)


def generate_neighbors_multistep(state, N, shape, step_sizes):
    """
    Generate all neighbors with specified step sizes.

    Parameters:
    -----------
    state : tuple
        Current state (flattened table excluding last cell)
    N : int
        Total sample size
    shape : tuple
        Table shape (r, c)
    step_sizes : list
        List of step sizes to use

    Yields:
    -------
    tuple : (new_state, move, cost)
        move = (src_idx, dst_idx, step)
        cost = step
    """
    tbl = state_to_table(state, N, shape)
    flat = tbl.flatten()
    n_cells = len(flat)

    for step in step_sizes:
        for src in range(n_cells):
            if flat[src] < step:
                continue
            for dst in range(n_cells):
                if dst == src:
                    continue
                new_flat = flat.copy()
                new_flat[src] -= step
                new_flat[dst] += step
                new_state = tuple(new_flat[:-1])
                yield new_state, (src, dst, step), step


def compute_gfi_gfq(table, alpha=ALPHA, store_path=False):
    """
    Calculate Global Fragility Index (GFI) and Quotient (GFQ) for r×c table.

    Two-phase guaranteed optimal algorithm:
    Phase 1: Exact BFS with unit steps (radius-limited, guarantees optimality if found)
    Phase 2: Pure Dijkstra (h=0) with multi-resolution steps (guarantees optimality)

    MATHEMATICAL GUARANTEES:
    - When GFI is returned (not None): EXACT minimum-cost path
    - When None returned: honest "GFI > max_depth" lower bound
    - This is pure Dijkstra, NOT A* with heuristic
    - h=0 is always admissible and guarantees correct result

    Time complexity: O(N³ log N) worst case
    Space complexity: O(N³) worst case

    Parameters:
    -----------
    table : 2D array-like
        Contingency table
    alpha : float
        Significance level
    store_path : bool
        If True, store witness path (memory intensive)

    Returns:
    --------
    dict : Contains GFI, GFQ, baseline_p, final_p, test_used

    Reference:
    ----------
    FRAGILITY_METRICS v9.7, §3.3
    Algorithm ported from 2×2 independent binary module
    """
    table = np.array(table, dtype=int)
    r, c = table.shape

    # Validate
    if np.any(table < 0):
        raise ValueError("All cells must be non-negative")

    N = table.sum()

    if N == 0:
        return {
            "GFI": None, "GFQ": None,
            "baseline_p": None, "baseline_state": None,
            "final_p": None, "witness_path": [],
            "test_used": None,
            "note": "Empty table."
        }

    if N > GFI_THRESHOLD:
        # Determine test type for baseline p
        if r == 2 and c == 2:
            use_chi2 = (N > N_THRESHOLD) and (np.min(table) >= MIN_CELL_THRESHOLD)
        else:
            use_chi2 = True

        base_p = test_p(table, use_chi2)
        base_state_str = "significant" if is_significant(base_p, alpha) else "non-significant"

        return {
            "GFI": None, "GFQ": None,
            "baseline_p": base_p,
            "baseline_state": base_state_str,
            "final_p": None, "witness_path": [],
            "test_used": "chi2" if use_chi2 else "fisher_exact",
            "note": f"GFI not computed for N > {GFI_THRESHOLD}. Use RQ instead."
        }

    # Determine test type
    if r == 2 and c == 2:
        use_chi2 = (N > N_THRESHOLD) and (np.min(table) >= MIN_CELL_THRESHOLD)
    else:
        use_chi2 = True

    # Get baseline state
    base_p = test_p(table, use_chi2)
    base_state = is_significant(base_p, alpha)
    target_state = not base_state

    # Adaptive parameters based on N
    if N < 500:
        PHASE1_RADIUS = 10  # Small tables: thorough phase 1
    elif N < 5000:
        PHASE1_RADIUS = 8
    else:
        PHASE1_RADIUS = 5  # Large tables: quick phase 1, rely on Dijkstra

    step_sizes = get_step_sizes(N)

    # Dynamic max depth based on N
    if N < 5000:
        max_depth = 1000
    elif N < 20000:
        max_depth = 500
    elif N < 50000:
        max_depth = 300
    else:
        max_depth = 200

    # ====================
    # PHASE 1: Exact BFS with unit steps
    # ====================
    start = table_to_state(table)
    visited_phase1 = {start: 0}
    queue_phase1 = deque([(start, 0, [] if store_path else None)])

    phase1_result = None
    states_explored_p1 = 0

    while queue_phase1:
        state, cost, path = queue_phase1.popleft()
        states_explored_p1 += 1

        if cost > PHASE1_RADIUS:
            break

        # Test current state
        tbl = state_to_table(state, N, table.shape)
        p_now = test_p(tbl, use_chi2)

        if is_significant(p_now, alpha) == target_state:
            phase1_result = {
                "GFI": cost,
                "GFQ": cost / N if N else None,
                "baseline_p": base_p,
                "baseline_state": ("significant" if base_state else "non-significant"),
                "final_p": p_now,
                "witness_path": path if store_path else [],
                "test_used": "chi2" if use_chi2 else "fisher_exact",
                "note": f"Phase 1 exact (explored {states_explored_p1} states)"
            }
            break

        # Generate unit-step neighbors only
        for neighbor, move, neighbor_cost in generate_neighbors_multistep(state, N, table.shape, [1]):
            new_cost = cost + neighbor_cost

            if neighbor not in visited_phase1 or new_cost < visited_phase1[neighbor]:
                visited_phase1[neighbor] = new_cost
                new_path = (path + [move]) if store_path else None
                queue_phase1.append((neighbor, new_cost, new_path))

    if phase1_result:
        return phase1_result

    # ========================
    # PHASE 2: Pure Dijkstra (h=0)
    # ========================

    # Priority queue: (g_score, counter, state, path)
    # NO HEURISTIC - pure Dijkstra with h=0
    # Using h=0 guarantees optimality (any heuristic based on FI would overestimate)
    counter = 0
    g_scores = {start: 0}

    pq = [(0, counter, start, [] if store_path else None)]  # f = g (no h)
    visited_phase2 = set()
    states_explored_p2 = 0

    while pq:
        g_score, _, state, path = heapq.heappop(pq)

        if state in visited_phase2:
            continue

        visited_phase2.add(state)
        states_explored_p2 += 1

        # Check depth limit - CONTINUE not BREAK
        if g_score > max_depth:
            continue  # Skip this node, keep searching others

        # Test current state
        tbl = state_to_table(state, N, table.shape)
        p_now = test_p(tbl, use_chi2)

        if is_significant(p_now, alpha) == target_state:
            total_explored = states_explored_p1 + states_explored_p2
            return {
                "GFI": g_score,
                "GFQ": g_score / N if N else None,
                "baseline_p": base_p,
                "baseline_state": ("significant" if base_state else "non-significant"),
                "final_p": p_now,
                "witness_path": path if store_path else [],
                "test_used": "chi2" if use_chi2 else "fisher_exact",
                "note": f"Dijkstra optimal (explored {total_explored} states)"
            }

        # Generate neighbors with multiple step sizes
        for neighbor, move, move_cost in generate_neighbors_multistep(state, N, table.shape, step_sizes):
            tentative_g = g_score + move_cost

            # Only process if this is a better path
            if neighbor not in g_scores or tentative_g < g_scores[neighbor]:
                g_scores[neighbor] = tentative_g

                # Pure Dijkstra: f = g (no heuristic)
                f_score = tentative_g

                counter += 1
                new_path = (path + [move]) if store_path else None
                heapq.heappush(pq, (f_score, counter, neighbor, new_path))

    # Search exhausted
    total_explored = states_explored_p1 + states_explored_p2

    return {
        "GFI": None,
        "GFQ": None,
        "baseline_p": base_p,
        "baseline_state": ("significant" if base_state else "non-significant"),
        "final_p": base_p,
        "witness_path": [],
        "test_used": "chi2" if use_chi2 else "fisher_exact",
        "note": f"GFI > {max_depth} (searched {total_explored} states, depth limited)"
    }


# ---------- Complete Evidence Calculator ----------

def contingency_table_complete(table, alpha=ALPHA):
    """
    Calculate complete p-fr-nb triplet for r×c contingency table.

    Parameters:
    -----------
    table : 2D array-like
        Contingency table
    alpha : float
        Significance level

    Returns:
    --------
    dict : Complete evidence assessment
    """
    table = np.array(table, dtype=int)
    r, c = table.shape
    N = table.sum()

    # Determine test type
    if r == 2 and c == 2:
        use_chi2 = (N > N_THRESHOLD) and (np.min(table) >= MIN_CELL_THRESHOLD)
    else:
        use_chi2 = True

    # p-value
    p_value = test_p(table, use_chi2)

    # fr (GFQ)
    if N <= GFI_THRESHOLD:
        gfi_result = compute_gfi_gfq(table, alpha=alpha, store_path=False)
        gfi = gfi_result.get("GFI")
        gfq = gfi_result.get("GFQ")
        gfi_note = gfi_result.get("note", "")
    else:
        gfi = None
        gfq = None
        gfi_note = f"GFI not computed for N > {GFI_THRESHOLD}"

    # nb (RQ)
    rq_result = compute_rq(table)

    # Expected counts for validation
    expected = compute_expected(table)

    return {
        'p': p_value,
        'fr': gfq,
        'nb': rq_result['RQ'],
        'GFI': gfi,
        'RRI': rq_result['RRI'],
        'k_independent': rq_result['k'],
        'table_shape': (r, c),
        'N_total': N,
        'test_used': "chi2" if use_chi2 else "fisher_exact",
        'expected_counts': expected,
        'min_expected': np.min(expected),
        'gfi_note': gfi_note
    }


# ---------- Interpretation Functions ----------

def interpret_fragility(fr):
    """Interpret GFQ (fragility quotient)."""
    if fr is None:
        return "not computed"
    if fr < 0.01:
        return "extremely fragile"
    elif fr < 0.05:
        return "very fragile"
    elif fr < 0.10:
        return "fragile"
    elif fr < 0.25:
        return "mildly stable"
    elif fr < 0.40:
        return "moderate stability"
    else:
        return "very stable"


def interpret_robustness(nb):
    """Interpret RQ (robustness)."""
    if nb is None:
        return "not computed"
    if nb < 0.05:
        return "at independence (no association)"
    elif nb < 0.10:
        return "near independence (minimal association)"
    elif nb < 0.25:
        return "moderate distance from independence"
    elif nb < 0.50:
        return "clear separation from independence"
    else:
        return "far from independence (strong association)"


# ---------- Summary Interface ----------

def contingency_summary(result):
    """Display complete p-fr-nb evidence assessment."""
    def fmt_f(x):
        return "NA" if x is None or not np.isfinite(x) else f"{x:.6f}"

    def fmt_i(x):
        return "NA" if x is None else str(int(x))

    r, c = result['table_shape']
    print(f"Table dimensions: {r} × {c}")
    print(f"N_total = {fmt_i(result['N_total'])}")
    print(f"Test used: {result['test_used']}")
    print(f"Min expected count: {fmt_f(result['min_expected'])}")

    if result['min_expected'] < 5 and result['test_used'] == 'chi2':
        print("⚠ Warning: Some expected counts < 5 (chi-square may be unreliable)")
    print()

    # Complete evidence triplet
    print("=" * 50)
    print("COMPLETE EVIDENCE ASSESSMENT (p-fr-nb)")
    print("=" * 50)
    print(f"p = {fmt_f(result['p'])}")
    print(f"fr (GFQ) = {fmt_f(result['fr'])}")
    print(f"nb (RQ) = {fmt_f(result['nb'])}")
    print("=" * 50)
    print()

    # Interpretations
    interp_fr = interpret_fragility(result['fr'])
    interp_nb = interpret_robustness(result['nb'])

    print("INTERPRETATION:")
    print(f"Fragility: {interp_fr}")
    print(f"Robustness: {interp_nb}")
    print()

    # Additional metrics
    print(f"GFI (raw count) = {fmt_i(result['GFI'])}")
    print(f"RRI (raw distance) = {fmt_f(result['RRI'])}")
    print(f"Independent cells (k) = {fmt_i(result['k_independent'])}")

    if result['gfi_note']:
        print(f"Note: {result['gfi_note']}")

    # Significance classification
    is_sig = result['p'] <= ALPHA
    print()
    print(f"Classification: {'SIGNIFICANT' if is_sig else 'NON-SIGNIFICANT'} at α={ALPHA}")


# ---------- CLI Entry Point ----------

def main():
    print("Complete Evidence Framework: r×c Contingency Tables")
    print("=" * 50)
    print("Calculate p-fr-nb triplet for multinomial outcomes")
    print()

    # Get table dimensions
    r = int(input("Number of rows (r): ").strip())
    c = int(input("Number of columns (c): ").strip())

    if r < 2 or c < 2:
        print("Error: Need at least 2×2 table")
        sys.exit(1)

    print()
    print(f"Enter all {r*c} cell counts (row by row):")
    print()

    # Get cell values
    table = []
    for i in range(r):
        row = []
        for j in range(c):
            val = int(input(f"  Cell ({i+1},{j+1}): ").strip())
            if val < 0:
                print("Error: Cell counts must be non-negative")
                sys.exit(1)
            row.append(val)
        table.append(row)

    print()
    print("Input table:")
    table_array = np.array(table)
    print(table_array)
    print()

    # Calculate complete evidence
    result = contingency_table_complete(table, alpha=ALPHA)
    contingency_summary(result)


if __name__ == "__main__":
    main()


== BW Ticker Drawdown Check ==

+--------+---------+----------+------------+--------+
| Ticker | Current | 12W High | Drawdown % | Status |
+--------+---------+----------+------------+--------+
|  BITX  |  52.12  |  60.65   |   14.06%   |   OK   |
|  MSTX  |  35.69  |  47.23   |   24.43%   |   OK   |
|  PLTR  | 127.72  |  133.17  |   4.09%    |   OK   |
|  SOXL  |  19.18  |  20.94   |   8.38%    |   OK   |
|  TQQQ  |  74.21  |  74.21   |   0.00%    |   OK   |
+--------+---------+----------+------------+--------+

== Closest OTM ==

+------------+--------+--------+--------+---------+-----------+--------------+-----+---------------+
| Expiration | Ticker |  Spot  | Strike | Premium | CashYield | AssignedGain | DTE | MeetsCriteria |
+------------+--------+--------+--------+---------+-----------+--------------+-----+---------------+
| 2025-06-13 |  BITX  | 52.12  |  53.0  |  1.59   |   3.01%   |    4.75%     |  5  |      Yes      |
| 2025-06-13 |  MSTX  | 35.69  |  36.0  |  1.57   |   4.3