___
# Analyze Inferred-GRN Edge Robustness
___
- Simulate random edge failures and hub-edge attacks (based on edge betweenness or weight)
- Batch-process phase-specific network directories
- Outputs: edge-removal robustness plots per network and an aggregate AUC summary

In [1]:
# import modules
from __future__ import annotations

import random
from pathlib import Path
from typing import Iterable, List, Dict, Optional

import numpy as np
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt

In [2]:
## infers the delimiter used in an edge list file
def guess_delimiter(path: str | Path, sample_lines: int = 20) -> Optional[str]:
    candidates = ['	', ',', ';', '|']
    with open(path, 'r', encoding='utf-8', errors='ignore') as handle:
        for _ in range(sample_lines):
            line = handle.readline()
            if not line:
                break
            line = line.strip()
            if not line or line.startswith('#'):
                continue
            for delim in candidates:
                if delim in line:
                    return delim
    return None

# loads an edge list into a NetworkX graph
def load_graph_from_edgelist(
    path: str | Path,
    *,
    directed: bool = True,
    weighted: bool = False,
    weight_attr: str = 'weight',
) -> nx.Graph:
    path = Path(path)
    delim = guess_delimiter(path)
    create_using = nx.DiGraph() if directed else nx.Graph()
    read_kwargs = {
        'create_using': create_using,
        'nodetype': str,
        'comments': '#',
    }
    if delim is not None:
        read_kwargs['delimiter'] = delim
    if weighted:
        read_kwargs['data'] = [(weight_attr, float)]
    else:
        read_kwargs['data'] = False
    G = nx.read_edgelist(path, **read_kwargs)
    G.remove_edges_from(nx.selfloop_edges(G))
    if G.number_of_edges() == 0:
        raise ValueError(f'No edges loaded from {path}. Check delimiter/format.')
    return G

# computes the GCC fraction
def gcc_fraction(G: nx.Graph, component_mode: str = 'weak') -> float:
    if G.number_of_nodes() == 0:
        return 0.0
    if component_mode == 'strong' and G.is_directed():
        components = nx.strongly_connected_components(G)
    elif component_mode == 'weak':
        components = nx.weakly_connected_components(G) if G.is_directed() else nx.connected_components(G)
    else:
        components = nx.connected_components(G.to_undirected()) if G.is_directed() else nx.connected_components(G)
    largest = max((len(c) for c in components), default=0)
    return largest / max(1, G.number_of_nodes())

In [3]:
# sample random edge removal orders
def random_edge_failure_orders(G: nx.Graph, n_trials: int, seed: int = 42) -> List[List[tuple]]:
    rng = random.Random(seed)
    edges = list(G.edges())
    orders = []
    for _ in range(n_trials):
        rng.shuffle(edges)
        orders.append(list(edges))
    return orders

# determine targeted edge removal order
def targeted_edge_attack_order(
    G: nx.Graph,
    *,
    metric: str = 'betweenness',
    weight_attr: str = 'weight',
) -> List[tuple]:
    if metric == 'betweenness':
        weights = nx.get_edge_attributes(G, weight_attr)
        weight = weight_attr if weights else None
        values = nx.edge_betweenness_centrality(G, weight=weight)
    elif metric == 'weight' and nx.get_edge_attributes(G, weight_attr):
        values = {(u, v): data.get(weight_attr, 1.0) for u, v, data in G.edges(data=True)}
    else:
        values = {(u, v): G.degree(u) + G.degree(v) for u, v in G.edges()}
    sorted_edges = sorted(values.items(), key=lambda x: (-x[1], sorted(x[0])))
    return [edge for edge, _ in sorted_edges]

# simulate edge removal curve
def simulate_edge_removal(
    G: nx.Graph,
    removal_order: List[tuple],
    component_mode: str = 'weak',
) -> Dict[str, np.ndarray]:
    H = G.copy()
    M = H.number_of_edges()
    removed_frac = np.zeros(M + 1)
    gcc_frac = np.zeros(M + 1)
    removed_frac[0] = 0.0
    gcc_frac[0] = gcc_fraction(H, component_mode=component_mode)
    for i, edge in enumerate(removal_order, start=1):
        if H.has_edge(*edge):
            H.remove_edge(*edge)
        removed_frac[i] = i / M
        gcc_frac[i] = gcc_fraction(H, component_mode=component_mode)
    return {'removed_frac': removed_frac, 'gcc_frac': gcc_frac}

# aggregate random trials
def aggregate_random_edge_failures(
    G: nx.Graph,
    *,
    n_trials: int = 20,
    seed: int = 42,
    component_mode: str = 'weak',
) -> Dict[str, np.ndarray]:
    orders = random_edge_failure_orders(G, n_trials=n_trials, seed=seed)
    curves = [simulate_edge_removal(G, order, component_mode=component_mode)['gcc_frac'] for order in orders]
    arr = np.vstack(curves)
    mean_curve = arr.mean(axis=0)
    std_curve = arr.std(axis=0)
    removed_frac = simulate_edge_removal(G, orders[0], component_mode=component_mode)['removed_frac']
    return {'removed_frac': removed_frac, 'mean_gcc': mean_curve, 'std_gcc': std_curve}

In [4]:
# plot helper
def plot_edge_robustness(
    name: str,
    phase: str,
    removed_frac: np.ndarray,
    random_mean: np.ndarray,
    random_std: np.ndarray,
    targeted_curve: np.ndarray,
    *,
    target_metric: str,
    component_mode: str,
    base_dir: str | Path = '../../data/result/edge_robustness/figure',
    show: bool = True,
) -> Path:
    base_dir = Path(base_dir)
    fig_dir = base_dir / phase
    fig_dir.mkdir(parents=True, exist_ok=True)
    fig, ax = plt.subplots(figsize=(6, 4.5))
    ax.plot(removed_frac, random_mean, label='Random edge failure (mean)')
    ax.fill_between(removed_frac, random_mean - random_std, random_mean + random_std, alpha=0.2, label='Random ±1σ')
    ax.plot(removed_frac, targeted_curve, label=f'Targeted edge attack ({target_metric})')
    ax.set_xlabel('Removed fraction of edges')
    ax.set_ylabel('GCC fraction (|C_max| / N)')
    ax.set_title(f'Edge Robustness: {phase} / {name}')
    ax.legend()
    ax.grid(True, linestyle='--', alpha=0.4)
    fig_path = fig_dir / f'{name}_edge.png'
    fig.tight_layout()
    fig.savefig(fig_path, dpi=200)
    if show:
        plt.show()
    else:
        plt.close(fig)
    return fig_path

In [5]:
# helper to infer phase
def infer_phase(path: Path, default: str = 'unknown') -> str:
    for parent in path.parents:
        name = parent.name
        if name.lower().startswith('phase'):
            return name
    return default

# analyze one file for edge robustness
def analyze_edge_robustness(
    path: str | Path,
    *,
    name: Optional[str] = None,
    directed: bool = True,
    weighted: bool = False,
    weight_attr: str = 'weight',
    component_mode: str = 'weak',
    edge_metric: str = 'betweenness',
    n_random_trials: int = 20,
    seed: int = 42,
    fig_base: str | Path = '../../data/result/edge_robustness/figure',
    show_plot: bool = True,
) -> Dict[str, object]:
    path = Path(path)
    if name is None:
        name = path.stem
    phase = infer_phase(path)
    print(f'[INFO] Loading: {path}')
    G = load_graph_from_edgelist(path, directed=directed, weighted=weighted, weight_attr=weight_attr)
    rand_stats = aggregate_random_edge_failures(
        G,
        n_trials=n_random_trials,
        seed=seed,
        component_mode=component_mode,
    )
    targeted_order = targeted_edge_attack_order(G, metric=edge_metric, weight_attr=weight_attr)
    targeted_curve = simulate_edge_removal(G, targeted_order, component_mode=component_mode)['gcc_frac']
    random_auc = float(np.trapz(rand_stats['mean_gcc'], rand_stats['removed_frac']))
    targeted_auc = float(np.trapz(targeted_curve, rand_stats['removed_frac']))
    print(f'[INFO] Random edge AUC   : {random_auc:.6f}')
    print(f'[INFO] Targeted edge AUC : {targeted_auc:.6f}')
    fig_path = plot_edge_robustness(
        name,
        phase,
        rand_stats['removed_frac'],
        rand_stats['mean_gcc'],
        rand_stats['std_gcc'],
        targeted_curve,
        target_metric=edge_metric,
        component_mode=component_mode,
        base_dir=fig_base,
        show=show_plot,
    )
    return {
        'phase': phase,
        'network': name,
        'source_path': str(path),
        'component_mode': component_mode,
        'edge_metric': edge_metric,
        'n_random_trials': n_random_trials,
        'seed': seed,
        'random_auc': random_auc,
        'targeted_auc': targeted_auc,
        'figure_path': str(fig_path),
    }

In [6]:
# analyze directory
def analyze_edge_directory(
    base_dirs: Iterable[str | Path],
    *,
    patterns: str = '*.ncol,*.txt,*.tsv,*.csv',
    directed: bool = True,
    weighted: bool = False,
    weight_attr: str = 'weight',
    component_mode: str = 'weak',
    edge_metric: str = 'betweenness',
    n_random_trials: int = 20,
    seed: int = 42,
    fig_base: str | Path = '../../data/result/edge_robustness/figure',
    summary_dir: str | Path = '../../data/result/edge_robustness',
) -> pd.DataFrame:
    if isinstance(base_dirs, (str, Path)):
        base_dirs = [base_dirs]
    all_paths: List[Path] = []
    for base in base_dirs:
        base = Path(base)
        for pattern in patterns.split(','):
            pattern = pattern.strip()
            if not pattern:
                continue
            all_paths.extend(base.glob(f'**/{pattern}'))
    unique_paths = sorted({p.resolve() for p in all_paths})
    if not unique_paths:
        print(f"[WARN] No files matched in {base_dirs} with patterns '{patterns}'")
        return pd.DataFrame(columns=['phase', 'network', 'random_auc', 'targeted_auc'])
    records: List[Dict[str, object]] = []
    total = len(unique_paths)
    for idx, path in enumerate(unique_paths, start=1):
        if idx > 1:
            print()
        print(f'[BATCH {idx}/{total}] {path}')
        try:
            result = analyze_edge_robustness(
                path,
                name=path.stem,
                directed=directed,
                weighted=weighted,
                weight_attr=weight_attr,
                component_mode=component_mode,
                edge_metric=edge_metric,
                n_random_trials=n_random_trials,
                seed=seed,
                fig_base=fig_base,
                show_plot=False,
            )
            records.append(result)
        except Exception as exc:
            print(f'[ERROR] {path}: {exc}')
            records.append({
                'phase': infer_phase(path),
                'network': path.stem,
                'source_path': str(path),
                'error': str(exc),
            })
    full_df = pd.DataFrame(records)
    summary_df = full_df.copy()
    if 'error' in summary_df.columns:
        summary_df = summary_df[summary_df['error'].isna()]
    summary_df = summary_df.dropna(subset=['random_auc', 'targeted_auc'], how='any')
    summary_df = summary_df.loc[:, ['phase', 'network', 'random_auc', 'targeted_auc', 'edge_metric']]
    summary_dir = Path(summary_dir)
    summary_dir.mkdir(parents=True, exist_ok=True)
    summary_path = summary_dir / 'edge_robustness_summary.csv'
    summary_df.to_csv(summary_path, index=False)
    print(f'[OK] Saved summary table: {summary_path}')
    return summary_df.reset_index(drop=True)

## Example Usage (Batch)

In [7]:
root_dir = Path('../../data/inferred_grn')
phase_dirs = [root_dir / f'phase{i}' for i in range(1, 5)]
batch_edge_results = analyze_edge_directory(
    phase_dirs,
    patterns='*.ncol',
    directed=True,
    weighted=False,
    component_mode='weak',
    edge_metric='betweenness',
    n_random_trials=20,
    seed=42,
    fig_base='../../data/result/edge_robustness/figure',
    summary_dir='../../data/result/edge_robustness',
)
batch_edge_results.head()

[BATCH 1/40] /home/Data_Drive_8TB_2/Controllability/single-cell-grn-control/data/inferred_grn/phase1/Blastomeres.ncol
[INFO] Loading: /home/Data_Drive_8TB_2/Controllability/single-cell-grn-control/data/inferred_grn/phase1/Blastomeres.ncol


  random_auc = float(np.trapz(rand_stats['mean_gcc'], rand_stats['removed_frac']))
  targeted_auc = float(np.trapz(targeted_curve, rand_stats['removed_frac']))


[INFO] Random edge AUC   : 0.586612
[INFO] Targeted edge AUC : 0.376985

[BATCH 2/40] /home/Data_Drive_8TB_2/Controllability/single-cell-grn-control/data/inferred_grn/phase1/Enveloping_Layer.ncol
[INFO] Loading: /home/Data_Drive_8TB_2/Controllability/single-cell-grn-control/data/inferred_grn/phase1/Enveloping_Layer.ncol
[INFO] Random edge AUC   : 0.703328
[INFO] Targeted edge AUC : 0.435964

[BATCH 3/40] /home/Data_Drive_8TB_2/Controllability/single-cell-grn-control/data/inferred_grn/phase1/Primordial_Germ_cells.ncol
[INFO] Loading: /home/Data_Drive_8TB_2/Controllability/single-cell-grn-control/data/inferred_grn/phase1/Primordial_Germ_cells.ncol
[INFO] Random edge AUC   : 0.710942
[INFO] Targeted edge AUC : 0.634892

[BATCH 4/40] /home/Data_Drive_8TB_2/Controllability/single-cell-grn-control/data/inferred_grn/phase2/Ectoderm.ncol
[INFO] Loading: /home/Data_Drive_8TB_2/Controllability/single-cell-grn-control/data/inferred_grn/phase2/Ectoderm.ncol
[INFO] Random edge AUC   : 0.611168
[INF

Unnamed: 0,phase,network,random_auc,targeted_auc,edge_metric
0,phase1,Blastomeres,0.586612,0.376985,betweenness
1,phase1,Enveloping_Layer,0.703328,0.435964,betweenness
2,phase1,Primordial_Germ_cells,0.710942,0.634892,betweenness
3,phase2,Ectoderm,0.611168,0.426951,betweenness
4,phase2,Enveloping_Layer,0.637921,0.385517,betweenness
