In [5]:
#!/usr/bin/env python3
# star_network_identify.py
# ---------------------------------------------------------------
from __future__ import annotations
from decimal import Decimal, getcontext
import numpy as np
import mpmath as mp
from typing import Callable, Tuple
from sklearn.mixture import GaussianMixture

# ---------- Global High Precision Settings ----------
getcontext().prec = 200
mp.mp.dps = getcontext().prec
TWOPI = mp.mpf('6.283185307179586476925286766559')
SIGMA_H2 = 0.5  # ∫ h^2(0,y) dm(y)

# ------------------------------------------------------------------
#  I. High‑Precision Network System (unchanged)
# ------------------------------------------------------------------
class GraphSystemDecimal:
    def __init__(self, A: np.ndarray, alpha: str = '0.25',
                 local_map: Callable[[Decimal], Decimal] | None = None,
                 coupling_fn: Callable[[Decimal, Decimal], Decimal] | None = None,
                 seed: int = 0):
        self.A = np.asarray(A, dtype=float)
        self.N = self.A.shape[0]
        self.Delta = self.A.sum(axis=0).max()
        self.alpha = Decimal(alpha)
        self.local_map = local_map or (lambda x: (Decimal(2) * x) % 1)
        self.coupling = coupling_fn or self._default_coupling
        self.rng = np.random.default_rng(seed)
        self.reset()

    @staticmethod
    def _default_coupling(xs: Decimal, xt: Decimal) -> Decimal:
        """Default sinusoidal diffusive coupling."""
        v = -mp.sin(TWOPI * mp.mpf(str(xs))) + mp.sin(TWOPI * mp.mpf(str(xt)))
        return Decimal(str(v))

    def _coupling_term(self):
        """Compute coupling increment for each node."""
        incr = [Decimal(0)] * self.N
        for j in range(self.N):
            if self.A[j].sum() == 0:
                continue
            for i in range(self.N):
                if self.A[j, i]:
                    incr[i] += self.coupling(self.x[j], self.x[i])
        d = Decimal(str(self.Delta))
        return [v / d for v in incr]

    def step(self):
        """One time‑step update of the coupled map lattice."""
        xn = [self.local_map(x) for x in self.x]
        coup = self._coupling_term()
        xn = [(xi + self.alpha * ci) % 1 for xi, ci in zip(xn, coup)]
        self.x = xn
        self.t += 1
        return xn

    def reset(self):
        """Reset states with a fresh random initial condition."""
        self.x = [Decimal(str(v)) for v in self.rng.random(self.N)]
        self.t = 0

    def run(self, T: int, discard: int = 0):
        """Simulate for *T* steps and return the trajectory after discarding
        the first *discard* transients."""
        traj = np.zeros((self.N, max(0, T - discard)))
        for k in range(T):
            xt = self.step()
            if k >= discard:
                traj[:, k - discard] = [float(v) for v in xt]
        return traj


# ------------------------------------------------------------------
#  II. Star Graph Generators (three variants)
# ------------------------------------------------------------------

def graph_A(N: int):
    """One central hub (node N‑1) connected to all leaves."""
    A = np.zeros((N, N))
    A[np.arange(N - 1), N - 1] = 1
    return A


def graph_B(N: int):
    """Two hubs (nodes N‑2 and N‑1), every leaf connected to *both* hubs."""
    A = np.zeros((N, N))
    leaves = np.arange(N - 2)
    A[leaves, N - 1] = 1
    A[leaves, N - 2] = 1
    return A


def graph_C(N: int):
    """Two hubs, but leaves are split into two halves and each half connects to only one hub."""
    A = np.zeros((N, N))
    half = N // 2
    A[np.arange(half - 1), N - 2] = 1
    A[np.arange(half - 1, N - 2), N - 1] = 1
    return A


# ------------------------------------------------------------------
#  III. GMM‑based Hub Detection & Core Statistics
# ------------------------------------------------------------------

def moddiff(u):
    """Modulo‑1 difference folded into (‑0.5, 0.5]."""
    return ((u + 0.5) % 1) - 0.5


def compute_strength(traj):
    """Empirical approximation of the theoretical error ‖x_{t+1} - 2x_t‖."""
    x, x1 = traj[:, :-1], traj[:, 1:]
    return np.abs(moddiff(x1 - 2 * x)).mean(axis=1)


def gmm_hubs(S, seed=0):
    """
    Use a 2‑component Gaussian Mixture Model to separate hubs (larger error strength) from leaves.
    
    Parameters
    ----------
    S : the mean error for N nodes
    seed : 
    
    Return
    ------
    
    """
    
    g = GaussianMixture(2, random_state=seed).fit(S.reshape(-1, 1))
    return g.predict(S.reshape(-1, 1)) == np.argmax(g.means_)


def beta_var(x: np.ndarray) -> float:
    """Regression‑based estimator of variance of residual y = x_{t+1} - 2x_t + β sin(2π x_t)."""
    y = moddiff(x[1:] - 2 * x[:-1])
    s = -np.sin(2 * np.pi * x[:-1])
    beta = -(y @ s) / (s @ s)
    resid = y + beta * s
    return resid.var()


# ------------------------------------------------------------------
#  IV‑a  Single Segment → Classify A vs (B/C)
# ------------------------------------------------------------------

def classify_A_and_BC(traj: np.ndarray, N: int) -> str:
    """Return 'A_N' if the network has exactly one hub, otherwise return 'B_N and C_N' (two‑hub candidates)."""
    S = compute_strength(traj)
    hubs = np.where(gmm_hubs(S))[0]
    if hubs.size == 1:
        return "A_N"
    return "B_N and C_N"


# ------------------------------------------------------------------
#  IV‑b  Compute "mean hub variance" for one segment
# ------------------------------------------------------------------

def average_hub_variance(traj: np.ndarray) -> float:
    """Compute the mean of β‑residual variances over the two hubs of a B/C star graph."""
    S = compute_strength(traj)
    hubs = np.where(gmm_hubs(S))[0]
    if hubs.size != 2:
        raise RuntimeError("This segment does not correspond to a B/C graph (number of hubs ≠ 2)")
    return float(np.mean([beta_var(traj[i]) for i in hubs]))

# ------------------------------------------------------------------
#  IV‑c **Key addition**: Two segments → compare variances → classify B vs C
# ------------------------------------------------------------------
def classify_B_vs_C(traj_first: np.ndarray, traj_second: np.ndarray) -> Tuple[str, float, float]:
    """
    Parameters
    ----------
    traj_first, traj_second : ndarray
        Two trajectories with the same number of nodes, each generated from either a type‑B or type‑C star graph.

    Returns
    -------
    (label, var_first, var_second) : Tuple[str, float, float]
        label is 'first_is_B' or 'first_is_C'.
        Lower hub variance ⇒ higher in‑degree ⇒ type‑B.
    """
    var1 = average_hub_variance(traj_first)
    var2 = average_hub_variance(traj_second)
    if var1 < var2:  # smaller variance ⇒ larger in‑degree ⇒ B graph
        return "first_is_B", var1, var2
    else:
        return "first_is_C", var1, var2
# ------------------------------------------------------------------
#  V. Demo
# ------------------------------------------------------------------
if __name__ == "__main__":
    N, T, discard = 10, 6000, 600
    alpha = '0.25'

    # 1) Demonstrate single‑segment classification A/B/C
    for gname, maker in [("A_N", graph_A), ("B_N", graph_B), ("C_N", graph_C)]:
        traj = GraphSystemDecimal(maker(N), alpha=alpha, seed=hash(gname) % 2**32).run(T, discard)
        print(f"{gname}  → classify_ABC → {classify_A_and_BC(traj, N)}")

    # 2) Demonstrate variance comparison to distinguish B / C
    trajB = GraphSystemDecimal(graph_B(N), alpha=alpha, seed=1).run(T, discard)
    trajC = GraphSystemDecimal(graph_C(N), alpha=alpha, seed=2).run(T, discard)

    res, v_first, v_second = classify_B_vs_C(trajB, trajC)  # B comes first
    print("\nComparing two sequences:", res)
    print(f"  Mean hub variance of the first segment  = {v_first:.6e}")
    print(f"  Mean hub variance of the second segment = {v_second:.6e}")

A_N  → classify_ABC → A_N
B_N  → classify_ABC → B_N and C_N
C_N  → classify_ABC → B_N and C_N

Comparing two sequences: first_is_B
  Mean hub variance of the first segment  = 3.902977e-03
  Mean hub variance of the second segment = 7.846020e-03


In [6]:
# ======== New or Replacement Section Begins =================================
# I. General Logistic Map (Decimal version, co-existing with original 2 x mod 1)
def logistic_map_decimal(x: Decimal) -> Decimal:
    # “% 1” keeps the same format as in the original implementation
    return (Decimal(4) * x * (Decimal(1) - x)) % 1

# II. Example interface for an optional coupling function
def coupling_sin_diff(xs: Decimal, xt: Decimal) -> Decimal:
    v = -mp.sin(TWOPI * mp.mpf(str(xs))) + mp.sin(TWOPI * mp.mpf(str(xt)))
    return Decimal(str(v))

# III. --- Modify compute_strength / beta_var so they depend on local_map ---
def compute_strength(
    traj: np.ndarray,
    local_map_vec: Callable[[np.ndarray], np.ndarray]
) -> np.ndarray:
    """S_i = ⟨|Δ_i|⟩,  where Δ_i = x_{t+1} − f(x_t)"""
    x, x1 = traj[:, :-1], traj[:, 1:]
    Delta = moddiff(x1 - local_map_vec(x))
    return np.abs(Delta).mean(axis=1)

def beta_var(
    traj_i: np.ndarray,
    local_map_vec: Callable[[np.ndarray], np.ndarray],
    I_h_vec: Callable[[np.ndarray], np.ndarray]
) -> float:
    """
    Estimate β and return the residual variance.
    I_h_vec(x) = ∫ h(x, y) dm(y)
    """
    x     = traj_i[:-1]
    y     = moddiff(traj_i[1:] - local_map_vec(x))
    s     = I_h_vec(x)
    beta  = -(y @ s) / (s @ s)
    resid = y + beta * s
    return resid.var()

# IV. --- Vectorized utilities, isolated from the Decimal system -------------
logistic_vec = np.vectorize(lambda u: 4.0 * u * (1.0 - u))          # f(x)
Ih_vec       = np.vectorize(lambda u: -np.sin(2 * np.pi * u))       # ∫ h dm

# V. --- Adapted classification functions -----------------------------------
def classify_A_and_BC(traj: np.ndarray) -> str:
    S    = compute_strength(traj, logistic_vec)
    hubs = np.where(gmm_hubs(S))[0]
    return "A_N" if hubs.size == 1 else "B_N and C_N"

def average_hub_variance(traj: np.ndarray) -> float:
    S    = compute_strength(traj, logistic_vec)
    hubs = np.where(gmm_hubs(S))[0]
    if hubs.size != 2:
        raise RuntimeError("Data are not from a B/C graph (number of hubs ≠ 2)")
    vars_ = [beta_var(traj[i], logistic_vec, Ih_vec) for i in hubs]
    return float(np.mean(vars_))

# VI. ------------------------ Demo -----------------------------------------
if __name__ == "__main__":
    N, T, discard = 10, 6000, 600
    alpha = '0.25'

    local_map = logistic_map_decimal
    coupling  = coupling_sin_diff

    # Demonstrate single-segment classification for A/B/C
    for gname, maker in [("A_N", graph_A),
                         ("B_N", graph_B),
                         ("C_N", graph_C)]:
        gs   = GraphSystemDecimal(maker(N), alpha=alpha,
                                  local_map=local_map,
                                  coupling_fn=coupling,
                                  seed=hash(gname) % 2**32)
        traj = gs.run(T, discard)
        print(f"{gname}  → classify_ABC → {classify_A_and_BC(traj)}")

    # Compare two sequences to distinguish between B and C
    trajB = GraphSystemDecimal(graph_B(N), alpha=alpha,
                               local_map=local_map,
                               coupling_fn=coupling,
                               seed=1).run(T, discard)
    trajC = GraphSystemDecimal(graph_C(N), alpha=alpha,
                               local_map=local_map,
                               coupling_fn=coupling,
                               seed=2).run(T, discard)

    res, v_first, v_second = classify_B_vs_C(trajB, trajC)
    print("\nCompare the two sequences:", res)
    print(f"  Mean hub variance for the first sequence  = {v_first:.6e}")
    print(f"  Mean hub variance for the second sequence = {v_second:.6e}")
# ======== New or Replacement Section Ends ===================================


A_N  → classify_ABC → A_N
B_N  → classify_ABC → B_N and C_N
C_N  → classify_ABC → B_N and C_N

Compare the two sequences: first_is_B
  Mean hub variance for the first sequence  = 2.936251e-03
  Mean hub variance for the second sequence = 6.113247e-03


In [7]:
# ---------- 1. Theoretical variance function ---------------------
def theoretical_hub_variance(N: int,
                             graph_type: str,
                             sigma_h2: float = 0.3898615457,
                             alpha: float = 0.25) -> float:
    """
    Return the theoretical variance of hub nodes for a B- or C-type graph.
    """
    if graph_type == "B":
        L = N - 2
        Delta = N - 2
    elif graph_type == "C":
        L = N // 2 - 1
        Delta = N // 2 - 1
    else:
        raise ValueError("graph_type must be 'B' or 'C'")
    return (alpha ** 2) * sigma_h2 * L / (Delta ** 2)


# ---------- 2. Single-sequence B/C classification -------------------
def classify_single_BC_theory(traj: np.ndarray, N: int) -> tuple[str, dict]:
    """
    Input : an N × T trajectory array
    Output: 'B_N' or 'C_N' plus debugging information
    """
    # a) Locate hubs
    S = compute_strength(traj, logistic_vec)          # Use a different mapper? Modify this call.
    hubs = np.where(gmm_hubs(S))[0]
    if hubs.size != 2:
        raise RuntimeError("Number of hubs in trajectory ≠ 2; not a B/C graph")

    # b) Empirical average variance of hubs
    V_hat = average_hub_variance(traj)

    # c) Theoretical values
    V_B_th = theoretical_hub_variance(N, "B")
    V_C_th = theoretical_hub_variance(N, "C")

    # d) Log-distance between empirical and theoretical variances
    d_B = abs(np.log(V_hat) - np.log(V_B_th))
    d_C = abs(np.log(V_hat) - np.log(V_C_th))

    label = "B_N" if d_B < d_C else "C_N"
    info = {
        "V_hat": V_hat,
        "V_B_th": V_B_th,
        "V_C_th": V_C_th,
        "d_B": d_B,
        "d_C": d_C,
        "hubs": hubs.tolist(),
    }
    return label, info


# ---------- 3. Demo --------------------------------
if __name__ == "__main__":
    N, T, discard = 50, 8000, 800
    alpha = 0.25
    local_map = logistic_map_decimal
    coupling  = coupling_sin_diff

    trajB = (GraphSystemDecimal(graph_B(N),
                                alpha=alpha,
                                local_map=local_map,
                                coupling_fn=coupling,
                                seed=1)
             .run(T, discard))
    trajC = (GraphSystemDecimal(graph_C(N),
                                alpha=alpha,
                                local_map=local_map,
                                coupling_fn=coupling,
                                seed=2)
             .run(T, discard))

    resB, infoB = classify_single_BC_theory(trajB, N)
    resC, infoC = classify_single_BC_theory(trajC, N)

    print("Trajectory B → classified as:", resB, "| debug:", infoB)
    print("Trajectory C → classified as:", resC, "| debug:", infoC)


Trajectory B → classified as: B_N | debug: {'V_hat': 0.0005113619541740942, 'V_B_th': 0.0005076322209635416, 'V_C_th': 0.0010152644419270831, 'd_B': 0.0073204537525510815, 'd_C': 0.6858267268073943, 'hubs': [48, 49]}
Trajectory C → classified as: C_N | debug: {'V_hat': 0.0010205339499985673, 'V_B_th': 0.0005076322209635416, 'V_C_th': 0.0010152644419270831, 'd_B': 0.6983240387998242, 'd_C': 0.00517685823987879, 'hubs': [48, 49]}
