# BGP AS-Level Topology: Ego-Network Graph Feature Extraction Pipeline

**Purpose:** Download RIPE RIS MRT data (RIB dumps), parse with bgpkit-parser,
construct AS-level topology graphs **per snapshot**, extract a **k-hop ego-network**
around a target AS, and compute comprehensive graph-theoretic features as
**time series** for BGP anomaly detection.

**Key Idea — Ego-Network Approach:**
Instead of computing expensive features on the full 84K+ node global AS graph,
we focus on a **target AS and its k-hop neighborhood**. This mirrors how ISPs
actually monitor BGP: they track their own AS and their neighbors, not the
entire Internet topology.

**Benefits:**
- **Fast:** ~500-2,000 nodes instead of 84K+ → features compute in seconds, not hours
- **Realistic:** Matches real-world ISP monitoring (track your own neighborhood)
- **Better anomaly signal:** Local topology changes are not diluted by 84K other nodes

**Workflow:**
1. Parse RIB dump → build full AS-level edge set
2. Construct full graph (needed to know who connects to whom)
3. Extract **k-hop ego subgraph** around `TARGET_AS`
4. Compute all 16 graph-level + 10 node-level features on the ego subgraph
5. Repeat per snapshot → time series

**Output:**
- `graph_level_timeseries_*.csv` — one row per snapshot, all 16 graph-level features (ego subgraph)
- `node_level_timeseries_*.csv` — one row per (snapshot, ASN), all 10 node-level features (ego nodes)

---

## 1. Installation & Imports

In [None]:
# Install dependencies (uncomment if needed)
# !pip install pybgpkit networkx scipy numpy pandas matplotlib seaborn tqdm

In [None]:
import os
import time
import json
import logging
import warnings
from datetime import datetime, timedelta, timezone
from pathlib import Path
from collections import defaultdict
from typing import Optional, Dict, List, Tuple, Set

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import networkx as nx
from scipy import sparse
from scipy.sparse.linalg import eigsh
from scipy import stats as sp_stats
import bgpkit

warnings.filterwarnings('ignore', category=FutureWarning)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Optional: NetworKit for high-performance computation on large graphs
try:
    import networkit as nk
    HAS_NETWORKIT = True
    logger.info("NetworKit available - will use for performance-critical computations")
except ImportError:
    HAS_NETWORKIT = False
    logger.info("NetworKit not available - using NetworkX only (slower for large graphs)")

print(f"NetworkX version: {nx.__version__}")
print(f"NumPy version: {np.__version__}")
print(f"NetworKit available: {HAS_NETWORKIT}")

## 2. Configuration

All parameters are configurable. Adjust the collector, date range, and processing mode below.

In [None]:
# ============================================================================
# CONFIGURATION - Modify these parameters as needed
# ============================================================================

# --- RIPE RIS Collector ---
COLLECTOR = "rrc04"

# --- Date Range ---
START_DATE = "2025-11-17"  # YYYY-MM-DD
END_DATE = "2025-11-18"    # YYYY-MM-DD (inclusive)

# --- Ego-Network Settings ---
# TARGET_AS: The AS number to center the ego-network on.
#   This is the AS whose local neighborhood you want to monitor.
#   Examples: 174 (Cogent), 3356 (Lumen/Level3), 13335 (Cloudflare),
#             1299 (Arelion/Telia), 6939 (Hurricane Electric)
TARGET_AS = 3333  # RIPE NCC — small ego network, exact features

# EGO_K_HOP: Number of hops from TARGET_AS to include in the subgraph.
#   1-hop = TARGET_AS + direct peers only (star-like, limited structure)
#   2-hop = TARGET_AS + peers + peers-of-peers (recommended, captures local topology)
#   3-hop = larger neighborhood (slower, but more context)
EGO_K_HOP = 2

# --- Output Directories ---
BASE_DIR = Path("./bgp_graph_features")
DATA_DIR = BASE_DIR / "data"
OUTPUT_DIR = BASE_DIR / "output"
FIGURES_DIR = BASE_DIR / "figures"

for d in [DATA_DIR, OUTPUT_DIR, FIGURES_DIR]:
    d.mkdir(parents=True, exist_ok=True)

# --- Per-Snapshot Processing ---
PER_SNAPSHOT_CSV = True  # Save a separate CSV for each RIB file
SNAPSHOTS_DIR = OUTPUT_DIR / "snapshots"
SNAPSHOTS_DIR.mkdir(parents=True, exist_ok=True)

# --- Private / Reserved ASN Ranges (to be filtered out) ---
PRIVATE_ASNS = (
    set(range(64512, 65535))              # 16-bit private (RFC 6996)
    | set(range(4200000000, 4294967295))  # 32-bit private (RFC 6996)
    | {0, 23456, 65535, 4294967295}       # reserved / special (RFC 7300)
)

def is_valid_public_asn(asn: int) -> bool:
    """Return True if the ASN is a valid public (non-private, non-reserved) ASN."""
    return asn not in PRIVATE_ASNS

# --- Performance Settings ---
BETWEENNESS_SAMPLE_K = None  # None = exact (feasible on small ego subgraphs)
COMPUTE_SPECTRAL = True
MAX_NODES_FOR_CLIQUE = 5000  # ego subgraphs are typically well under this

# --- RIPE RIS URL Pattern ---
RIPE_BASE_URL = "https://data.ris.ripe.net"

print(f"Configuration:")
print(f"  Collector: {COLLECTOR}")
print(f"  Date range: {START_DATE} to {END_DATE}")
print(f"  Target AS: {TARGET_AS}")
print(f"  Ego-network hops: {EGO_K_HOP}")
print(f"  Output: {OUTPUT_DIR}")
print(f"  Snapshots dir: {SNAPSHOTS_DIR}")
print(f"  Private ASN filter: {len(PRIVATE_ASNS):,} ASNs will be excluded")
print(f"  Betweenness sampling: {'exact' if BETWEENNESS_SAMPLE_K is None else f'k={BETWEENNESS_SAMPLE_K}'}")

## 3. Data Discovery & DownloadUses BGPKIT Broker to discover available MRT files for the configured collector and time range,then constructs direct URLs for RIB dumps (`bview.*`).**RIPE RIS URL pattern:**```https://data.ris.ripe.net/{collector}/{YYYY.MM}/bview.{YYYYMMDD}.{HHMM}.gz```RIB dumps are generated every **8 hours** at 00:00, 08:00, 16:00 UTC.

In [None]:
def generate_rib_urls(collector: str, start_date: str, end_date: str) -> List[str]:    """    Generate URLs for all RIB dump files in the given date range.    RIB dumps are available at 00:00, 08:00, 16:00 UTC daily.    """    urls = []    start = datetime.strptime(start_date, "%Y-%m-%d")    end = datetime.strptime(end_date, "%Y-%m-%d")    rib_hours = [0, 8, 16]    current = start    while current <= end:        year_month = current.strftime("%Y.%m")        for hour in rib_hours:            ts = current.replace(hour=hour, minute=0)            if ts < start or ts > end + timedelta(days=1) - timedelta(seconds=1):                continue            filename = f"bview.{ts.strftime('%Y%m%d.%H%M')}.gz"            url = f"{RIPE_BASE_URL}/{collector}/{year_month}/{filename}"            urls.append(url)        current += timedelta(days=1)    return urlsdef discover_files_via_broker(collector: str, start_date: str, end_date: str) -> List[dict]:    """    Use BGPKIT Broker to discover available RIB MRT files.    Falls back to URL generation if Broker is unavailable.    """    try:        broker = bgpkit.Broker()        items = broker.query(            ts_start=f"{start_date}T00:00:00",            ts_end=f"{end_date}T23:59:59",            collector_id=collector,            data_type="rib"        )        if items:            logger.info(f"Broker found {len(items)} RIB files")            return items    except Exception as e:        logger.warning(f"Broker query failed: {e}. Falling back to URL generation.")    urls = generate_rib_urls(collector, start_date, end_date)    logger.info(f"Generated {len(urls)} RIB URLs")    return [{"url": url} for url in urls]# Discover RIB filesrib_files = discover_files_via_broker(COLLECTOR, START_DATE, END_DATE)print(f"\nRIB files discovered: {len(rib_files)}")for f in rib_files[:5]:    url = f['url'] if isinstance(f, dict) else f.url    print(f"  {url}")

## 4. MRT Parsing with bgpkit-parser

**bgpkit-parser** is a Rust-based MRT parser with Python bindings (`pybgpkit`).  
It handles gzip decompression and MRT parsing transparently.

### Workflow (CSV-first approach)
1. **Download** MRT `.gz` files locally (saved to `data/mrt_files/`)
2. **Parse** each file into structured rows matching the standard TABLE_DUMP2 format
3. **Save** all rows as a single CSV (saved to `output/`)
4. **Build** AS edges from the CSV for graph construction

### CSV Schema (TABLE_DUMP2 format for RIB entries)

| Column | Description | Example |
|--------|-------------|---------|
| `MRT_Type` | Always `TABLE_DUMP2` for RIB dumps | `TABLE_DUMP2` |
| `Timestamp` | Dump timestamp (UTC) | `2025-11-17 00:00:00` |
| `Entry_Type` | Always `B` (table entry) for RIB | `B` |
| `Peer_IP` | IP of the BGP peer | `198.32.132.97` |
| `Peer_AS` | ASN of the peer | `13335` |
| `Prefix` | Announced prefix | `1.0.0.0/24` |
| `AS_Path` | Full AS path (space-separated) | `3356 1299 13335` |
| `Origin` | ORIGIN attribute (IGP/EGP/INCOMPLETE) | `IGP` |
| `Next_Hop` | Next-hop IP | `198.32.132.97` |
| `Local_Pref` | LOCAL_PREF value | `100` |
| `MED` | Multi-Exit Discriminator | `0` |
| `Community` | BGP communities (space-separated) | `3356:100 3356:123` |
| `Atomic_Aggregate` | Atomic aggregate flag | `AG` or empty |
| `Aggregator` | Aggregator AS and IP | `13335 198.32.132.97` |

**Note:** RIB dumps use `TABLE_DUMP2` with entry type `B`, unlike UPDATE files which use `BGP4MP` with `A` (announcement) and `W` (withdrawal).

### Graph construction from AS_PATH
1. Parse AS_PATH string (space-separated ASNs)
2. Remove AS prepending (consecutive duplicates)
3. Skip AS_SET entries (e.g., `{1234,5678}`)
4. Filter private/reserved ASNs (RFC 6996, RFC 7300)
5. Extract pairwise adjacent links -> undirected edges

In [None]:
import urllib.request

def parse_as_path(as_path_str: str) -> List[int]:
    """
    Parse an AS_PATH string into a deduplicated list of valid public ASNs.
    Handles:
    - Standard AS paths: "3356 1299 13335"
    - AS prepending: "3356 3356 3356 1299" -> [3356, 1299]
    - AS_SETs: "{1234,5678}" -> skipped entirely
    - Private/reserved ASNs: filtered out (RFC 6996, RFC 7300)
    """
    if not as_path_str:
        return []

    tokens = as_path_str.split()
    deduped = []
    for token in tokens:
        if '{' in token or '}' in token:
            continue
        try:
            asn = int(token)
            if not is_valid_public_asn(asn):
                continue
            if not deduped or asn != deduped[-1]:
                deduped.append(asn)
        except ValueError:
            continue
    return deduped


def extract_edges_from_as_path(as_path: List[int]) -> List[Tuple[int, int]]:
    """
    Extract pairwise AS adjacency edges from a parsed AS path.
    Returns a list (not set) so callers can count occurrences for weighting.
    """
    edges = []
    for i in range(len(as_path) - 1):
        edge = tuple(sorted([as_path[i], as_path[i + 1]]))
        if edge[0] != edge[1]:
            edges.append(edge)
    return edges


def download_mrt_file(url: str, dest_dir: Path) -> Path:
    """Download an MRT file and save it locally. Returns the local file path."""
    filename = url.split('/')[-1]
    local_path = dest_dir / filename

    if local_path.exists():
        size_mb = local_path.stat().st_size / (1024 * 1024)
        logger.info(f"  Already downloaded: {filename} ({size_mb:.1f} MB)")
        return local_path

    logger.info(f"  Downloading: {filename}")
    t0 = time.time()
    urllib.request.urlretrieve(url, str(local_path))
    elapsed = time.time() - t0
    size_mb = local_path.stat().st_size / (1024 * 1024)
    logger.info(f"  Saved: {filename} ({size_mb:.1f} MB, {elapsed:.1f}s)")
    return local_path


def parse_mrt_to_rows(file_path: str) -> Tuple[List[dict], dict]:
    """
    Parse a single MRT RIB dump file into structured TABLE_DUMP2 rows.

    Each RIB entry is mapped to:
        TABLE_DUMP2|timestamp|B|peer_ip|peer_as|prefix|as_path|origin|
        next_hop|local_pref|med|community|atomic_agg|aggregator

    Returns:
        rows: list of dicts (one per RIB entry)
        stats: parsing statistics
    """
    rows = []
    stats = {
        'total_elements': 0,
        'announcements': 0,
        'withdrawals': 0,
        'unique_prefixes': set(),
        'unique_peers': set(),
        'parse_errors': 0,
    }

    logger.info(f"  Parsing: {Path(file_path).name}")
    t0 = time.time()

    try:
        parser = bgpkit.Parser(url=str(file_path))
        for elem in parser:
            stats['total_elements'] += 1

            if elem.elem_type == "W":
                stats['withdrawals'] += 1
                continue

            stats['announcements'] += 1

            if elem.prefix:
                stats['unique_prefixes'].add(elem.prefix)
            if elem.peer_asn:
                stats['unique_peers'].add(elem.peer_asn)

            # Convert timestamp to readable UTC string
            ts = datetime.fromtimestamp(
                elem.timestamp, tz=timezone.utc
            ).strftime('%Y-%m-%d %H:%M:%S')

            # Build community string (space-separated)
            communities = ''
            if elem.communities:
                communities = ' '.join(str(c) for c in elem.communities)

            # Build aggregator string
            aggregator = ''
            aggr_asn = getattr(elem, 'aggr_asn', None)
            aggr_ip = getattr(elem, 'aggr_ip', None)
            if aggr_asn:
                aggregator = f"{aggr_asn} {aggr_ip}".strip() if aggr_ip else str(aggr_asn)

            row = {
                'MRT_Type': 'TABLE_DUMP2',
                'Timestamp': ts,
                'Entry_Type': 'B',
                'Peer_IP': elem.peer_ip or '',
                'Peer_AS': elem.peer_asn if elem.peer_asn else '',
                'Prefix': elem.prefix or '',
                'AS_Path': elem.as_path or '',
                'Origin': elem.origin or '',
                'Next_Hop': elem.next_hop or '',
                'Local_Pref': elem.local_pref if elem.local_pref is not None else '',
                'MED': elem.med if elem.med is not None else '',
                'Community': communities,
                'Atomic_Aggregate': 'AG' if elem.atomic else '',
                'Aggregator': aggregator,
            }
            rows.append(row)

    except Exception as e:
        logger.error(f"Error parsing {file_path}: {e}")
        stats['parse_errors'] += 1

    elapsed = time.time() - t0
    stats['unique_prefixes'] = len(stats['unique_prefixes'])
    stats['unique_peers'] = len(stats['unique_peers'])
    stats['rows_parsed'] = len(rows)
    stats['parse_time_sec'] = round(elapsed, 2)

    logger.info(
        f"  -> {stats['total_elements']:,} elements, "
        f"{len(rows):,} rows, {stats['unique_prefixes']:,} prefixes, "
        f"{stats['unique_peers']:,} peers in {elapsed:.1f}s"
    )
    return rows, stats


print("Parse functions defined:")
print("  - parse_as_path(as_path_str) -> List[int]")
print("  - extract_edges_from_as_path(as_path) -> List[Tuple[int,int]]")
print("  - download_mrt_file(url, dest_dir) -> Path")
print("  - parse_mrt_to_rows(file_path) -> (rows, stats)")

In [None]:
# ============================================================================
# STEP 1: Download MRT files locally & parse to structured CSV
# ============================================================================
# Downloads raw MRT .gz files and parses every RIB entry into CSV format.
# Also saves per-snapshot CSVs and builds a snapshot manifest.
# ============================================================================

MRT_DIR = DATA_DIR / "mrt_files"
MRT_DIR.mkdir(parents=True, exist_ok=True)

all_rows = []
all_stats = []
snapshot_manifest = []  # Per-snapshot metadata for the main processing loop

print("=" * 70)
print("STEP 1: DOWNLOAD MRT FILES & PARSE TO CSV")
print("=" * 70)

for i, f in enumerate(rib_files):
    url = f["url"] if isinstance(f, dict) else f.url
    print(f"
[{i+1}/{len(rib_files)}] {url}")

    local_path = download_mrt_file(url, MRT_DIR)
    rows, stats = parse_mrt_to_rows(str(local_path))
    stats["file_type"] = "rib"
    stats["url"] = url
    stats["local_path"] = str(local_path)
    all_rows.extend(rows)
    all_stats.append(stats)

    # Save per-snapshot CSV
    if PER_SNAPSHOT_CSV and rows:
        filename = Path(local_path).name  # e.g. bview.20251117.0000.gz
        parts = filename.replace("bview.", "").replace(".gz", "").split(".")
        date_part, time_part = parts[0], parts[1]
        snapshot_ts = f"{date_part[:4]}-{date_part[4:6]}-{date_part[6:8]}T{time_part[:2]}:{time_part[2:]}:00Z"
        snapshot_id = f"{COLLECTOR}_{date_part}_{time_part}"

        snap_csv_path = SNAPSHOTS_DIR / f"rib_{snapshot_id}.csv"
        pd.DataFrame(rows).to_csv(snap_csv_path, index=False)

        snapshot_manifest.append({
            "snapshot_id": snapshot_id,
            "timestamp": snapshot_ts,
            "collector": COLLECTOR,
            "url": url,
            "mrt_path": str(local_path),
            "csv_path": str(snap_csv_path),
            "n_rows": len(rows),
        })
        print(f"  Snapshot CSV: {snap_csv_path.name} ({len(rows):,} rows)")

    print(f"  Running total: {len(all_rows):,} rows")

# Save combined CSV
csv_filename = f"rib_parsed_{COLLECTOR}_{START_DATE}_{END_DATE}.csv"
csv_path = OUTPUT_DIR / csv_filename
rib_df = pd.DataFrame(all_rows)
rib_df.to_csv(csv_path, index=False)

# Save stats and manifest
pd.DataFrame(all_stats).to_csv(OUTPUT_DIR / "parsing_stats.csv", index=False)
pd.DataFrame(snapshot_manifest).to_csv(OUTPUT_DIR / "snapshot_manifest.csv", index=False)

print(f"
{'=' * 70}")
print(f"DOWNLOAD & PARSE COMPLETE")
print(f"  Combined CSV: {csv_path} ({csv_path.stat().st_size / (1024*1024):.1f} MB)")
print(f"  Total rows: {len(rib_df):,}")
print(f"  Snapshot manifest: {len(snapshot_manifest)} snapshots")
print(f"{'=' * 70}")

## 5. Edge Extraction Function

Build AS topology edges from a parsed RIB CSV. Called once per snapshot.

In [None]:
from collections import Counter

def build_edges_from_csv(csv_path_or_df):
    """
    Load a per-snapshot CSV and build AS topology edges.

    Args:
        csv_path_or_df: Path to CSV file or pre-loaded DataFrame

    Returns:
        all_edges: set of unique (asn_a, asn_b) edges (sorted tuple)
        edge_counts: Counter mapping edge -> observation count
    """
    if isinstance(csv_path_or_df, (str, Path)):
        df = pd.read_csv(csv_path_or_df)
    else:
        df = csv_path_or_df

    all_edges_list = []
    for as_path_raw in df["AS_Path"].dropna().astype(str):
        if as_path_raw == "" or as_path_raw == "nan":
            continue
        as_path = parse_as_path(as_path_raw)
        edges = extract_edges_from_as_path(as_path)
        all_edges_list.extend(edges)

    edge_counts = Counter(all_edges_list)
    all_edges = set(edge_counts.keys())

    return all_edges, edge_counts


print("Defined: build_edges_from_csv(csv_path_or_df) -> (edges, edge_counts)")

## 6. Graph Construction & Ego-Network Extraction

Build an undirected NetworkX graph from AS adjacency pairs, then extract the
**k-hop ego subgraph** around the target AS. Only the ego subgraph is used
for feature computation.

### Ego-Network Extraction
Given the full AS graph G and a target AS node v:
1. Find all nodes within k hops of v using BFS
2. Extract the induced subgraph on those nodes (preserves all edges between them)
3. Check connectivity — if the ego subgraph is disconnected, take the LCC containing v
4. Convert to NetworKit for performance-critical computations

In [None]:
def nx_to_nk(G_nx):
    """Convert a NetworkX graph to NetworKit format."""
    if not HAS_NETWORKIT:
        return None, None, None
    node_list = sorted(G_nx.nodes())
    nx2nk_map = {n: i for i, n in enumerate(node_list)}
    nk2nx_map = {i: n for n, i in nx2nk_map.items()}
    G_nk = nk.Graph(len(node_list), weighted=False, directed=False)
    for u, v in G_nx.edges():
        G_nk.addEdge(nx2nk_map[u], nx2nk_map[v])
    return G_nk, nx2nk_map, nk2nx_map


def extract_ego_subgraph(G, target_as, k_hop):
    """
    Extract k-hop ego subgraph around target_as from the full graph.

    Args:
        G: full NetworkX graph
        target_as: the center AS number
        k_hop: number of hops to include

    Returns:
        G_ego: the ego subgraph (connected component containing target_as)
        ego_info: dict with metadata about the extraction
    """
    if target_as not in G:
        return None, {"error": f"AS {target_as} not found in graph"}

    # BFS to find all nodes within k hops
    ego_nodes = set()
    ego_nodes.add(target_as)
    frontier = {target_as}
    for hop in range(k_hop):
        next_frontier = set()
        for node in frontier:
            for neighbor in G.neighbors(node):
                if neighbor not in ego_nodes:
                    next_frontier.add(neighbor)
                    ego_nodes.add(neighbor)
        frontier = next_frontier
        if not frontier:
            break

    # Extract induced subgraph
    G_ego = G.subgraph(ego_nodes).copy()

    ego_info = {
        "target_as": target_as,
        "k_hop": k_hop,
        "ego_nodes": len(ego_nodes),
        "ego_edges": G_ego.number_of_edges(),
        "target_degree_full": G.degree(target_as),
    }

    # Ensure connectivity (ego subgraph should be connected by construction,
    # but check anyway in case of graph oddities)
    if not nx.is_connected(G_ego):
        components = list(nx.connected_components(G_ego))
        # Take the component containing the target AS
        for comp in components:
            if target_as in comp:
                G_ego = G_ego.subgraph(comp).copy()
                break
        ego_info["ego_connected"] = False
        ego_info["ego_nodes_after_lcc"] = G_ego.number_of_nodes()
    else:
        ego_info["ego_connected"] = True

    return G_ego, ego_info


def build_snapshot_graph(edges, edge_counts, collector, snapshot_id, timestamp,
                         target_as=None, k_hop=2):
    """
    Build AS-level topology graph, optionally extract ego subgraph.

    If target_as is provided, extracts k-hop ego subgraph and uses that
    for all feature computation. Otherwise uses full graph LCC (original behavior).

    Returns:
        G: full graph
        G_sub: subgraph for feature computation (ego or LCC)
        G_nk: NetworKit graph of G_sub (or None)
        nx2nk_map: NetworkX->NetworKit node ID mapping (or None)
        nk2nx_map: NetworKit->NetworkX node ID mapping (or None)
        info: dict with graph metadata
    """
    G = nx.Graph()
    for (u, v), w in edge_counts.items():
        G.add_edge(u, v, weight=w)

    G.graph["name"] = f"AS-level topology ({collector}, {snapshot_id})"
    G.graph["collector"] = collector
    G.graph["snapshot_id"] = snapshot_id
    G.graph["timestamp"] = timestamp

    info = {
        "n_nodes_full": G.number_of_nodes(),
        "n_edges_full": G.number_of_edges(),
    }

    # --- Ego-network extraction ---
    if target_as is not None:
        G_ego, ego_info = extract_ego_subgraph(G, target_as, k_hop)
        info.update(ego_info)

        if G_ego is None:
            raise ValueError(f"Target AS {target_as} not found in snapshot {snapshot_id}")

        G_sub = G_ego
        info["mode"] = "ego"
        info["n_nodes"] = G_sub.number_of_nodes()
        info["n_edges"] = G_sub.number_of_edges()
        info["lcc_fraction"] = G_sub.number_of_nodes() / G.number_of_nodes()

    # --- Full graph LCC (fallback / original behavior) ---
    else:
        info["is_connected"] = nx.is_connected(G)
        info["mode"] = "full"

        if not nx.is_connected(G):
            components = list(nx.connected_components(G))
            sizes = sorted([len(c) for c in components], reverse=True)
            info["n_components"] = len(components)
            info["lcc_fraction"] = sizes[0] / G.number_of_nodes()
            largest_cc = max(components, key=len)
            G_sub = G.subgraph(largest_cc).copy()
        else:
            info["n_components"] = 1
            info["lcc_fraction"] = 1.0
            G_sub = G

        info["n_nodes"] = G_sub.number_of_nodes()
        info["n_edges"] = G_sub.number_of_edges()

    # NetworKit conversion
    G_nk, nx2nk_map, nk2nx_map = nx_to_nk(G_sub)

    return G, G_sub, G_nk, nx2nk_map, nk2nx_map, info


print("Defined: extract_ego_subgraph(G, target_as, k_hop) -> (G_ego, ego_info)")
print("Defined: build_snapshot_graph(edges, edge_counts, ..., target_as, k_hop)")
print("  Returns (G, G_sub, G_nk, nx2nk_map, nk2nx_map, info)")

## 7. Graph-Level Feature Extraction Function

Extract all 16 graph-level metrics from a single snapshot graph.
Each feature computation is wrapped in try/except for graceful failure handling.

In [None]:
def extract_graph_level_features(G_lcc, G_nk, nk2nx_map, config):
    """
    Extract all 16 graph-level features from a snapshot's LCC graph.

    Args:
        G_lcc: NetworkX graph (largest connected component)
        G_nk: NetworKit graph (or None)
        nk2nx_map: NetworKit->NetworkX node ID mapping (or None)
        config: dict with 'compute_spectral', 'betweenness_sample_k'

    Returns:
        features: dict of feature_name -> value (None if failed)
        shared_data: dict with '_bc_map', '_core_map' for node-level reuse
    """
    features = {}
    shared_data = {}
    n_nodes = G_lcc.number_of_nodes()
    n_edges = G_lcc.number_of_edges()
    features['n_nodes'] = n_nodes
    features['n_edges'] = n_edges

    # Pre-compute sparse matrices (reused by features 5, 6, 8-11, 16)
    A_sparse = nx.adjacency_matrix(G_lcc).astype(float)
    L_sparse = nx.laplacian_matrix(G_lcc).astype(float)

    # Degree list (reused by rich-club percentiles)
    degrees = [d for _, d in G_lcc.degree()]

    # ------------------------------------------------------------------
    # 1. ASSORTATIVITY
    # Citation: Newman, Phys. Rev. Lett. 89, 208701 (2002).
    # ------------------------------------------------------------------
    t0 = time.time()
    try:
        features['assortativity'] = nx.degree_assortativity_coefficient(G_lcc)
        logger.info(f"  [1/16] Assortativity: {features['assortativity']:.6f} ({time.time()-t0:.1f}s)")
    except Exception as e:
        features['assortativity'] = None
        logger.warning(f"  [1/16] Assortativity: FAILED ({e})")

    # ------------------------------------------------------------------
    # 2. DENSITY
    # ------------------------------------------------------------------
    features['density'] = nx.density(G_lcc)
    logger.info(f"  [2/16] Density: {features['density']:.8f}")

    # ------------------------------------------------------------------
    # 3. CLUSTERING COEFFICIENT
    # Citation: Watts & Strogatz, Nature 393 (1998).
    # ------------------------------------------------------------------
    t0 = time.time()
    try:
        if HAS_NETWORKIT:
            features['clustering_global'] = nk.globals.ClusteringCoefficient.exactGlobal(G_nk)
            features['clustering_avg_local'] = nk.globals.ClusteringCoefficient.sequentialAvgLocal(G_nk)
        else:
            features['clustering_global'] = nx.transitivity(G_lcc)
            features['clustering_avg_local'] = nx.average_clustering(G_lcc)
        logger.info(f"  [3/16] Clustering: global={features['clustering_global']:.6f}, "
                    f"local={features['clustering_avg_local']:.6f} ({time.time()-t0:.1f}s)")
    except Exception as e:
        features['clustering_global'] = None
        features['clustering_avg_local'] = None
        logger.warning(f"  [3/16] Clustering: FAILED ({e})")

    # ------------------------------------------------------------------
    # 4. DIAMETER & AVERAGE PATH LENGTH
    # Citation: Watts & Strogatz, Nature 393 (1998).
    # ------------------------------------------------------------------
    t0 = time.time()
    try:
        if HAS_NETWORKIT:
            diam_algo = nk.distance.Diameter(G_nk, algo=nk.distance.DiameterAlgo.AUTOMATIC)
            diam_algo.run()
            features['diameter'] = diam_algo.getDiameter()[0]
        elif n_nodes < 50000:
            features['diameter'] = nx.diameter(G_lcc)
        else:
            sample_nodes = np.random.choice(list(G_lcc.nodes()), size=min(100, n_nodes), replace=False)
            features['diameter'] = max(nx.eccentricity(G_lcc, v=node) for node in sample_nodes)
        logger.info(f"  [4/16] Diameter: {features['diameter']} ({time.time()-t0:.1f}s)")
    except Exception as e:
        features['diameter'] = None
        logger.warning(f"  [4/16] Diameter: FAILED ({e})")

    t0 = time.time()
    try:
        if n_nodes < 20000:
            features['avg_path_length'] = nx.average_shortest_path_length(G_lcc)
        elif HAS_NETWORKIT:
            # Use NetworKit BFS for ~10x speedup over NetworkX
            sample_size = min(500, n_nodes)
            sample_nk_ids = np.random.choice(G_nk.numberOfNodes(), size=sample_size, replace=False)
            total_dist, count = 0, 0
            for nk_id in sample_nk_ids:
                bfs = nk.distance.BFS(G_nk, int(nk_id))
                bfs.run()
                dists = bfs.getDistances()
                total_dist += sum(dists)
                count += len(dists) - 1
            features['avg_path_length'] = total_dist / count if count > 0 else float('inf')
        else:
            sample_size = min(500, n_nodes)
            sample_nodes = np.random.choice(list(G_lcc.nodes()), size=sample_size, replace=False)
            total_dist, count = 0, 0
            for node in sample_nodes:
                lengths = nx.single_source_shortest_path_length(G_lcc, node)
                total_dist += sum(lengths.values())
                count += len(lengths) - 1
            features['avg_path_length'] = total_dist / count if count > 0 else float('inf')
        logger.info(f"  [4/16] Avg path length: {features['avg_path_length']:.4f} ({time.time()-t0:.1f}s)")
    except Exception as e:
        features['avg_path_length'] = None
        logger.warning(f"  [4/16] Avg path length: FAILED ({e})")

    # ------------------------------------------------------------------
    # 5. ALGEBRAIC CONNECTIVITY (Fiedler Value)
    # Use shift-invert mode (sigma near 0) for fast convergence on
    # the smallest Laplacian eigenvalues. Falls back to which='SM'.
    # Citation: Fiedler, Czech. Math. J. 23 (1973).
    # ------------------------------------------------------------------
    t0 = time.time()
    try:
        try:
            eigenvalues = eigsh(L_sparse, k=2, sigma=1e-6, which='LM',
                                maxiter=5000, return_eigenvectors=False)
        except Exception:
            eigenvalues = eigsh(L_sparse, k=2, which='SM',
                                maxiter=n_nodes, return_eigenvectors=False)
        features['algebraic_connectivity'] = float(np.sort(eigenvalues)[1])
        logger.info(f"  [5/16] Algebraic connectivity: {features['algebraic_connectivity']:.6f} "
                    f"({time.time()-t0:.1f}s, sparse eigsh)")
    except Exception as e:
        features['algebraic_connectivity'] = None
        logger.warning(f"  [5/16] Algebraic connectivity: FAILED ({e})")

    # ------------------------------------------------------------------
    # 6. SPECTRAL RADIUS
    # Citation: Cvetkovic et al., Cambridge (2010).
    # ------------------------------------------------------------------
    t0 = time.time()
    try:
        sr_vals = eigsh(A_sparse, k=1, which='LM', maxiter=5000,
                        return_eigenvectors=False)
        features['spectral_radius'] = float(sr_vals[0])
        logger.info(f"  [6/16] Spectral radius: {features['spectral_radius']:.4f} ({time.time()-t0:.1f}s)")
    except Exception as e:
        features['spectral_radius'] = None
        logger.warning(f"  [6/16] Spectral radius: FAILED ({e})")

    # ------------------------------------------------------------------
    # 7. PERCOLATION LIMIT
    # Citation: Pastor-Satorras & Vespignani, PRL 86 (2001).
    # ------------------------------------------------------------------
    if features.get('spectral_radius'):
        features['percolation_limit'] = 1.0 / features['spectral_radius']
        logger.info(f"  [7/16] Percolation limit: {features['percolation_limit']:.6f}")
    else:
        features['percolation_limit'] = None

    # ------------------------------------------------------------------
    # 8-11. SPECTRAL METRICS (conditional on COMPUTE_SPECTRAL)
    # Uses shift-invert mode for Laplacian eigenvalues to avoid
    # ARPACK convergence issues with which='SM' on singular matrices.
    # ------------------------------------------------------------------
    if config.get('compute_spectral', True):
        t0 = time.time()
        # 50 eigenvalues is sufficient for symmetry ratio, natural
        # connectivity, and Kirchhoff index. 300 was causing ARPACK
        # to hang on large graphs.
        n_eigs = min(n_nodes - 2, 50)
        use_full_spectrum = n_nodes < 5000

        try:
            if use_full_spectrum:
                L_dense = L_sparse.toarray()
                A_dense = A_sparse.toarray()
                laplacian_eigs = np.sort(np.real(np.linalg.eigvalsh(L_dense)))
                adjacency_eigs = np.sort(np.real(np.linalg.eigvalsh(A_dense)))[::-1]
            else:
                # Laplacian: shift-invert mode for fast convergence near 0
                try:
                    laplacian_eigs = np.sort(eigsh(
                        L_sparse, k=min(n_eigs, n_nodes-2),
                        sigma=1e-6, which='LM', maxiter=5000,
                        return_eigenvectors=False))
                except Exception as e_si:
                    logger.warning(f"  Shift-invert failed ({e_si}), falling back to which='SM'")
                    laplacian_eigs = np.sort(eigsh(
                        L_sparse, k=min(n_eigs, n_nodes-2),
                        which='SM', maxiter=n_nodes,
                        return_eigenvectors=False))
                # Adjacency: which='LM' converges fast (no issues)
                adjacency_eigs = np.sort(eigsh(
                    A_sparse, k=min(n_eigs, n_nodes-2),
                    which='LM', maxiter=5000,
                    return_eigenvectors=False))[::-1]
            logger.info(f"  Spectrum computation: {time.time()-t0:.1f}s "
                        f"({'full' if use_full_spectrum else f'partial, {len(adjacency_eigs)} eigs'})")

            # 8. SYMMETRY RATIO
            distinct_eigs = len(np.unique(np.round(adjacency_eigs, 8)))
            D = features.get('diameter', 10) or 10
            features['symmetry_ratio'] = distinct_eigs / (D + 1)
            features['symmetry_ratio_partial'] = not use_full_spectrum
            logger.info(f"  [8/16] Symmetry ratio: {features['symmetry_ratio']:.4f}")

            # 9. NATURAL CONNECTIVITY
            # Fix: use n_nodes as denominator for partial spectrum
            max_eig = np.max(adjacency_eigs)
            shifted = np.exp(adjacency_eigs - max_eig)
            if use_full_spectrum:
                features['natural_connectivity'] = float(max_eig + np.log(np.mean(shifted)))
            else:
                features['natural_connectivity'] = float(max_eig + np.log(np.sum(shifted) / n_nodes))
            logger.info(f"  [9/16] Natural connectivity: {features['natural_connectivity']:.4f}")

            # 10. KIRCHHOFF INDEX
            nonzero_lap = laplacian_eigs[laplacian_eigs > 1e-10]
            if len(nonzero_lap) > 0:
                features['kirchhoff_index'] = float(n_nodes * np.sum(1.0 / nonzero_lap))
            else:
                features['kirchhoff_index'] = None
            logger.info(f"  [10/16] Kirchhoff index: {features.get('kirchhoff_index')}")

            # 11. SPANNING TREES (full spectrum only)
            if use_full_spectrum and len(nonzero_lap) > 0:
                features['log_spanning_trees'] = float(np.sum(np.log(nonzero_lap)) - np.log(n_nodes))
            else:
                features['log_spanning_trees'] = None
            logger.info(f"  [11/16] log(spanning trees): {features.get('log_spanning_trees')}")

        except Exception as e:
            logger.warning(f"  [8-11] Spectral metrics: FAILED ({e})")
            for k in ['symmetry_ratio', 'symmetry_ratio_partial', 'natural_connectivity',
                       'kirchhoff_index', 'log_spanning_trees']:
                features.setdefault(k, None)
    else:
        for k in ['symmetry_ratio', 'symmetry_ratio_partial', 'natural_connectivity',
                   'kirchhoff_index', 'log_spanning_trees']:
            features[k] = None
        logger.info("  [8-11] Spectral metrics: SKIPPED (COMPUTE_SPECTRAL=False)")

    # ------------------------------------------------------------------
    # 12. NODE & EDGE CONNECTIVITY
    # Citation: Whitney, Am. J. Math. 54 (1932).
    # Exact computation with timeout to prevent indefinite hanging.
    # ------------------------------------------------------------------
    t0 = time.time()
    try:
        features['edge_connectivity'] = nx.edge_connectivity(G_lcc)
        logger.info(f"  [12/16] Edge connectivity: {features['edge_connectivity']} ({time.time()-t0:.1f}s)")
    except Exception as e:
        features['edge_connectivity'] = None
        logger.warning(f"  [12/16] Edge connectivity: FAILED ({e})")

    t0 = time.time()
    try:
        features['node_connectivity'] = nx.node_connectivity(G_lcc)
        logger.info(f"  [12/16] Node connectivity: {features['node_connectivity']} ({time.time()-t0:.1f}s)")
    except Exception as e:
        features['node_connectivity'] = None
        logger.warning(f"  [12/16] Node connectivity: FAILED ({e})")

    # ------------------------------------------------------------------
    # 13. RICH-CLUB COEFFICIENT
    # Citation: Zhou & Mondragon, IEEE Comm. Lett. (2004).
    # ------------------------------------------------------------------
    t0 = time.time()
    try:
        rc = nx.rich_club_coefficient(G_lcc, normalized=False)
        p25_k = int(np.percentile(degrees, 25))
        p50_k = int(np.percentile(degrees, 50))
        p75_k = int(np.percentile(degrees, 75))
        p90_k = int(np.percentile(degrees, 90))
        p95_k = int(np.percentile(degrees, 95))
        features['rich_club_p25'] = rc.get(p25_k)
        features['rich_club_p50'] = rc.get(p50_k)
        features['rich_club_p75'] = rc.get(p75_k)
        features['rich_club_p90'] = rc.get(p90_k)
        features['rich_club_p95'] = rc.get(p95_k)
        logger.info(f"  [13/16] Rich-club coefficient ({time.time()-t0:.1f}s)")
    except Exception as e:
        for k in ['rich_club_p25', 'rich_club_p50', 'rich_club_p75', 'rich_club_p90', 'rich_club_p95']:
            features[k] = None
        logger.warning(f"  [13/16] Rich-club: FAILED ({e})")

    # ------------------------------------------------------------------
    # 14. BETWEENNESS CENTRALITY DISTRIBUTION
    # Computed ONCE, stored in shared_data for node-level reuse.
    # Citation: Brandes, J. Math. Soc. 25(2) (2001).
    # ------------------------------------------------------------------
    t0 = time.time()
    try:
        if HAS_NETWORKIT:
            if config.get('betweenness_sample_k'):
                bc_algo = nk.centrality.ApproxBetweenness(G_nk, epsilon=0.01, delta=0.1)
            else:
                bc_algo = nk.centrality.Betweenness(G_nk, normalized=True)
            bc_algo.run()
            bc_scores_nk = bc_algo.scores()
            _bc_map = {nk2nx_map[i]: bc_scores_nk[i] for i in range(len(bc_scores_nk))}
        else:
            _bc_map = nx.betweenness_centrality(
                G_lcc, k=config.get('betweenness_sample_k'), normalized=True)

        bc_scores = np.array(list(_bc_map.values()))
        features['betweenness_mean'] = float(np.mean(bc_scores))
        features['betweenness_max'] = float(np.max(bc_scores))
        features['betweenness_std'] = float(np.std(bc_scores))
        features['betweenness_skewness'] = float(sp_stats.skew(bc_scores))
        shared_data['_bc_map'] = _bc_map
        logger.info(f"  [14/16] Betweenness distribution ({time.time()-t0:.1f}s)")
    except Exception as e:
        for k in ['betweenness_mean', 'betweenness_max', 'betweenness_std', 'betweenness_skewness']:
            features[k] = None
        logger.warning(f"  [14/16] Betweenness: FAILED ({e})")

    # ------------------------------------------------------------------
    # 15. K-CORE DECOMPOSITION
    # Computed ONCE, stored in shared_data for node-level reuse.
    # Citation: Seidman, Social Networks 5(3) (1983).
    # ------------------------------------------------------------------
    t0 = time.time()
    try:
        if HAS_NETWORKIT:
            cd = nk.centrality.CoreDecomposition(G_nk)
            cd.run()
            _core_scores_nk = cd.scores()
            _core_map = {nk2nx_map[i]: int(_core_scores_nk[i]) for i in range(len(_core_scores_nk))}
            features['degeneracy'] = int(cd.maxCoreNumber())
        else:
            _core_map = nx.core_number(G_lcc)
            features['degeneracy'] = int(max(_core_map.values()))

        core_numbers = np.array(list(_core_map.values()))
        features['core_mean'] = float(np.mean(core_numbers))
        features['core_std'] = float(np.std(core_numbers))
        features['core_median'] = float(np.median(core_numbers))
        features['innermost_core_size'] = int(np.sum(core_numbers == features['degeneracy']))
        shared_data['_core_map'] = _core_map
        logger.info(f"  [15/16] k-Core: degeneracy={features['degeneracy']} ({time.time()-t0:.1f}s)")
    except Exception as e:
        for k in ['degeneracy', 'core_mean', 'core_std', 'core_median', 'innermost_core_size']:
            features[k] = None
        logger.warning(f"  [15/16] k-Core: FAILED ({e})")

    # ------------------------------------------------------------------
    # 16. SPECTRAL GAP
    # Citation: Chung, Spectral Graph Theory, AMS (1997).
    # ------------------------------------------------------------------
    if config.get('compute_spectral', True):
        try:
            adjacency_eigs_local = adjacency_eigs  # from spectral block above
            if len(adjacency_eigs_local) >= 2:
                sorted_eigs = np.sort(adjacency_eigs_local)[::-1]
                features['spectral_gap'] = float(sorted_eigs[0] - sorted_eigs[1])
                features['adj_eig_ratio_1_2'] = (
                    float(sorted_eigs[0] / sorted_eigs[1]) if sorted_eigs[1] != 0 else None
                )
            else:
                features['spectral_gap'] = None
                features['adj_eig_ratio_1_2'] = None
            logger.info(f"  [16/16] Spectral gap: {features.get('spectral_gap')}")
        except Exception:
            features['spectral_gap'] = None
            features['adj_eig_ratio_1_2'] = None
    else:
        features['spectral_gap'] = None
        features['adj_eig_ratio_1_2'] = None

    shared_data['degrees'] = degrees
    return features, shared_data


print("Defined: extract_graph_level_features(G_lcc, G_nk, nk2nx_map, config)")
print("  Returns (features_dict, shared_data)")

## 8. Node-Level Feature Extraction Function

Extract all 10 node-level metrics for every AS in the snapshot's LCC graph.
Reuses betweenness and k-core results from graph-level extraction.

In [None]:
def extract_node_level_features(G_lcc, G_nk, nx2nk_map, nk2nx_map, shared_data, config):
    """
    Extract all 10 node-level features for a single snapshot.

    Args:
        G_lcc: NetworkX graph (LCC)
        G_nk: NetworKit graph (or None)
        nx2nk_map, nk2nx_map: node ID mappings
        shared_data: dict with '_bc_map' and '_core_map' from graph-level
        config: dict with 'max_nodes_for_clique'

    Returns:
        node_df: DataFrame indexed by ASN with columns for each node feature
        extra_graph_features: dict with 'radius' (from eccentricity)
    """
    n_nodes = G_lcc.number_of_nodes()
    extra_graph_features = {}
    node_features = pd.DataFrame(index=sorted(G_lcc.nodes()))
    node_features.index.name = 'asn'

    # ------------------------------------------------------------------
    # 1. DEGREE CENTRALITY
    # Citation: Freeman, Social Networks 1(3) (1979).
    # ------------------------------------------------------------------
    t0 = time.time()
    try:
        dc = nx.degree_centrality(G_lcc)
        node_features['degree_centrality'] = node_features.index.map(dc)
        node_features['degree'] = node_features.index.map(dict(G_lcc.degree()))
        logger.info(f"    [1/10] Degree centrality ({time.time()-t0:.1f}s)")
    except Exception as e:
        logger.warning(f"    [1/10] Degree centrality: FAILED ({e})")

    # ------------------------------------------------------------------
    # 2. BETWEENNESS CENTRALITY (reused from graph-level)
    # ------------------------------------------------------------------
    t0 = time.time()
    try:
        _bc_map = shared_data.get('_bc_map', {})
        if _bc_map:
            node_features['betweenness_centrality'] = node_features.index.map(_bc_map)
        else:
            bc = nx.betweenness_centrality(G_lcc, k=config.get('betweenness_sample_k'), normalized=True)
            node_features['betweenness_centrality'] = node_features.index.map(bc)
        logger.info(f"    [2/10] Betweenness centrality (reused) ({time.time()-t0:.1f}s)")
    except Exception as e:
        logger.warning(f"    [2/10] Betweenness: FAILED ({e})")

    # ------------------------------------------------------------------
    # 3. CLOSENESS CENTRALITY
    # Citation: Sabidussi, Psychometrika 31(4) (1966).
    # ------------------------------------------------------------------
    t0 = time.time()
    try:
        if HAS_NETWORKIT:
            cc_algo = nk.centrality.Closeness(G_nk, True, nk.centrality.ClosenessVariant.GENERALIZED)
            cc_algo.run()
            cc_scores = cc_algo.scores()
            cc_map = {nk2nx_map[i]: cc_scores[i] for i in range(len(cc_scores))}
            node_features['closeness_centrality'] = node_features.index.map(cc_map)
        else:
            cc = nx.closeness_centrality(G_lcc, wf_improved=True)
            node_features['closeness_centrality'] = node_features.index.map(cc)
        logger.info(f"    [3/10] Closeness centrality ({time.time()-t0:.1f}s)")
    except Exception as e:
        logger.warning(f"    [3/10] Closeness: FAILED ({e})")

    # ------------------------------------------------------------------
    # 4. EIGENVECTOR CENTRALITY
    # Citation: Bonacich, J. Math. Soc. 2(1) (1972).
    # ------------------------------------------------------------------
    t0 = time.time()
    try:
        if HAS_NETWORKIT:
            ev_algo = nk.centrality.EigenvectorCentrality(G_nk, tol=1e-8)
            ev_algo.run()
            ev_scores = ev_algo.scores()
            ev_map = {nk2nx_map[i]: ev_scores[i] for i in range(len(ev_scores))}
            node_features['eigenvector_centrality'] = node_features.index.map(ev_map)
        else:
            try:
                ev = nx.eigenvector_centrality(G_lcc, max_iter=200, tol=1e-6)
            except nx.PowerIterationFailedConvergence:
                ev = nx.eigenvector_centrality_numpy(G_lcc)
            node_features['eigenvector_centrality'] = node_features.index.map(ev)
        logger.info(f"    [4/10] Eigenvector centrality ({time.time()-t0:.1f}s)")
    except Exception as e:
        logger.warning(f"    [4/10] Eigenvector: FAILED ({e})")

    # ------------------------------------------------------------------
    # 5. PAGERANK
    # Citation: Brin & Page, Computer Networks 30 (1998).
    # ------------------------------------------------------------------
    t0 = time.time()
    try:
        if HAS_NETWORKIT:
            pr_algo = nk.centrality.PageRank(G_nk, damp=0.85, tol=1e-8)
            pr_algo.run()
            pr_scores = pr_algo.scores()
            pr_map = {nk2nx_map[i]: pr_scores[i] for i in range(len(pr_scores))}
            node_features['pagerank'] = node_features.index.map(pr_map)
        else:
            pr = nx.pagerank(G_lcc, alpha=0.85)
            node_features['pagerank'] = node_features.index.map(pr)
        logger.info(f"    [5/10] PageRank ({time.time()-t0:.1f}s)")
    except Exception as e:
        logger.warning(f"    [5/10] PageRank: FAILED ({e})")

    # ------------------------------------------------------------------
    # 6. LOCAL CLUSTERING COEFFICIENT
    # Citation: Watts & Strogatz, Nature 393 (1998).
    # ------------------------------------------------------------------
    t0 = time.time()
    try:
        if HAS_NETWORKIT:
            lcc_algo = nk.centrality.LocalClusteringCoefficient(G_nk, turbo=True)
            lcc_algo.run()
            lcc_scores = lcc_algo.scores()
            lcc_map = {nk2nx_map[i]: lcc_scores[i] for i in range(len(lcc_scores))}
            node_features['local_clustering'] = node_features.index.map(lcc_map)
        else:
            clust = nx.clustering(G_lcc)
            node_features['local_clustering'] = node_features.index.map(clust)
        logger.info(f"    [6/10] Local clustering ({time.time()-t0:.1f}s)")
    except Exception as e:
        logger.warning(f"    [6/10] Local clustering: FAILED ({e})")

    # ------------------------------------------------------------------
    # 7. AVERAGE NEIGHBOR DEGREE
    # Citation: Pastor-Satorras et al., PRL 87 (2001).
    # ------------------------------------------------------------------
    t0 = time.time()
    try:
        and_dict = nx.average_neighbor_degree(G_lcc)
        node_features['avg_neighbor_degree'] = node_features.index.map(and_dict)
        logger.info(f"    [7/10] Avg neighbor degree ({time.time()-t0:.1f}s)")
    except Exception as e:
        logger.warning(f"    [7/10] Avg neighbor degree: FAILED ({e})")

    # ------------------------------------------------------------------
    # 8. NODE CLIQUE NUMBER (NP-hard, with greedy fallback)
    # ------------------------------------------------------------------
    t0 = time.time()
    max_nodes_clique = config.get('max_nodes_for_clique', 5000)
    try:
        if n_nodes <= max_nodes_clique:
            ncn = nx.node_clique_number(G_lcc)
            node_features['node_clique_number'] = node_features.index.map(ncn)
        else:
            def greedy_clique_size(G, node):
                clique = {node}
                candidates = set(G.neighbors(node))
                for cand in sorted(candidates, key=lambda x: G.degree(x), reverse=True):
                    if all(G.has_edge(cand, c) for c in clique):
                        clique.add(cand)
                return len(clique)

            _core_map = shared_data.get('_core_map', {})
            k_max = max(_core_map.values()) if _core_map else 0
            ncn_approx = {}

            if k_max > 0:
                core_subgraph = nx.k_core(G_lcc, k=k_max)
                if core_subgraph.number_of_nodes() <= max_nodes_clique:
                    ncn_approx.update(nx.node_clique_number(core_subgraph))

            for node in G_lcc.nodes():
                if node not in ncn_approx:
                    ncn_approx[node] = greedy_clique_size(G_lcc, node)
            node_features['node_clique_number'] = node_features.index.map(ncn_approx)
        logger.info(f"    [8/10] Node clique number ({time.time()-t0:.1f}s)")
    except Exception as e:
        logger.warning(f"    [8/10] Node clique number: FAILED ({e})")

    # ------------------------------------------------------------------
    # 9. ECCENTRICITY (also yields graph radius)
    # For large graphs with NetworKit: iterate BFS per node in C++.
    # The graph is LCC (connected), so no need to filter inf distances.
    # ------------------------------------------------------------------
    t0 = time.time()
    try:
        if n_nodes < 30000:
            ecc = nx.eccentricity(G_lcc)
            node_features['eccentricity'] = node_features.index.map(ecc)
            extra_graph_features['radius'] = min(ecc.values())
        elif HAS_NETWORKIT:
            ecc_map = {}
            n = G_nk.numberOfNodes()
            for nk_id in range(n):
                bfs = nk.distance.BFS(G_nk, nk_id)
                bfs.run()
                # Graph is LCC (connected) — no inf values to filter
                ecc_map[nk2nx_map[nk_id]] = int(max(bfs.getDistances()))
                if (nk_id + 1) % 10000 == 0:
                    logger.info(f"      Eccentricity: {nk_id+1}/{n} nodes ({time.time()-t0:.1f}s)")
            node_features['eccentricity'] = node_features.index.map(ecc_map)
            extra_graph_features['radius'] = min(ecc_map.values())
        else:
            sample_size = min(500, n_nodes)
            sample_nodes = np.random.choice(list(G_lcc.nodes()), size=sample_size, replace=False)
            ecc_sample = {}
            for node in sample_nodes:
                lengths = nx.single_source_shortest_path_length(G_lcc, node)
                ecc_sample[node] = max(lengths.values())
            node_features['eccentricity'] = node_features.index.map(
                lambda x: ecc_sample.get(x, np.nan))
            extra_graph_features['radius'] = min(ecc_sample.values()) if ecc_sample else None
        logger.info(f"    [9/10] Eccentricity ({time.time()-t0:.1f}s), "
                    f"radius={extra_graph_features.get('radius')}")
    except Exception as e:
        extra_graph_features['radius'] = None
        logger.warning(f"    [9/10] Eccentricity: FAILED ({e})")

    # ------------------------------------------------------------------
    # 10. K-SHELL / CORE NUMBER (reused from graph-level)
    # Citation: Seidman, Social Networks 5(3) (1983).
    # ------------------------------------------------------------------
    t0 = time.time()
    try:
        _core_map = shared_data.get('_core_map', {})
        if _core_map:
            node_features['core_number'] = node_features.index.map(_core_map)
        else:
            cn = nx.core_number(G_lcc)
            node_features['core_number'] = node_features.index.map(cn)
        logger.info(f"    [10/10] Core number (reused) ({time.time()-t0:.1f}s)")
    except Exception as e:
        logger.warning(f"    [10/10] Core number: FAILED ({e})")

    return node_features, extra_graph_features


print("Defined: extract_node_level_features(G_lcc, G_nk, nx2nk_map, nk2nx_map, shared_data, config)")
print("  Returns (node_df, extra_graph_features)")

## 9. Per-Snapshot Processing Loop

Process each RIB snapshot independently:
1. Load per-snapshot CSV -> build edges
2. Build full graph -> extract **ego subgraph** around `TARGET_AS`
3. Extract graph-level features on the ego subgraph
4. Extract node-level features for all ego nodes
5. Append results to time-series accumulators

With the ego-network approach, each snapshot processes in **seconds** instead of hours.

In [None]:
# ============================================================================
# MAIN LOOP: Process each RIB snapshot independently
# ============================================================================

config = {
    'compute_spectral': COMPUTE_SPECTRAL,
    'betweenness_sample_k': BETWEENNESS_SAMPLE_K,
    'max_nodes_for_clique': MAX_NODES_FOR_CLIQUE,
}

graph_level_rows = []       # list of dicts, one per snapshot
node_level_rows = []        # list of DataFrames, one per snapshot
snapshot_errors = []        # track any snapshot that fails entirely

mode_label = f"EGO (AS {TARGET_AS}, {EGO_K_HOP}-hop)" if TARGET_AS else "FULL GRAPH"

print("=" * 70)
print(f"PROCESSING {len(snapshot_manifest)} SNAPSHOTS — Mode: {mode_label}")
print("=" * 70)

for idx, snap in enumerate(snapshot_manifest):
    snap_id = snap['snapshot_id']
    snap_ts = snap['timestamp']
    snap_csv = snap['csv_path']

    print(f"\n{'='*70}")
    print(f"[{idx+1}/{len(snapshot_manifest)}] Snapshot: {snap_id} ({snap_ts})")
    print(f"{'='*70}")

    t_snap_start = time.time()

    try:
        # Step 1: Build edges
        t0 = time.time()
        edges, edge_counts = build_edges_from_csv(snap_csv)
        print(f"  Edges: {len(edges):,} unique ({time.time()-t0:.1f}s)")

        if len(edges) == 0:
            print(f"  WARNING: No edges found, skipping snapshot")
            snapshot_errors.append({'snapshot_id': snap_id, 'error': 'no edges'})
            continue

        # Step 2: Build graph + ego extraction
        t0 = time.time()
        G, G_sub, G_nk, nx2nk, nk2nx, graph_info = build_snapshot_graph(
            edges, edge_counts, COLLECTOR, snap_id, snap_ts,
            target_as=TARGET_AS, k_hop=EGO_K_HOP
        )

        if TARGET_AS:
            print(f"  Full graph: {graph_info['n_nodes_full']:,} nodes, {graph_info['n_edges_full']:,} edges")
            print(f"  Ego subgraph (AS {TARGET_AS}, {EGO_K_HOP}-hop): "
                  f"{graph_info['n_nodes']:,} nodes, {graph_info['n_edges']:,} edges "
                  f"({graph_info['n_nodes']/graph_info['n_nodes_full']:.1%} of full) ({time.time()-t0:.1f}s)")
            print(f"  Target AS degree in full graph: {graph_info.get('target_degree_full', '?')}")
        else:
            print(f"  Graph: {graph_info['n_nodes']:,} nodes, {graph_info['n_edges']:,} edges "
                  f"(LCC {graph_info['lcc_fraction']:.1%}) ({time.time()-t0:.1f}s)")

        # Step 3: Graph-level features (on ego subgraph or LCC)
        t0 = time.time()
        graph_feats, shared_data = extract_graph_level_features(G_sub, G_nk, nk2nx, config)
        graph_feats['snapshot_id'] = snap_id
        graph_feats['timestamp'] = snap_ts
        graph_feats['collector'] = COLLECTOR
        graph_feats.update(graph_info)
        print(f"  Graph features: {time.time()-t0:.1f}s")

        # Step 4: Node-level features (for all nodes in ego subgraph)
        t0 = time.time()
        node_df, extra_graph_feats = extract_node_level_features(
            G_sub, G_nk, nx2nk, nk2nx, shared_data, config
        )
        graph_feats.update(extra_graph_feats)
        node_df['snapshot_id'] = snap_id
        node_df['timestamp'] = snap_ts
        node_df.index.name = 'asn'
        print(f"  Node features: {node_df.shape[1]} cols x {node_df.shape[0]:,} ASes ({time.time()-t0:.1f}s)")

        # Accumulate results
        graph_level_rows.append(graph_feats)
        node_level_rows.append(node_df.reset_index())

        elapsed = time.time() - t_snap_start
        print(f"  SNAPSHOT COMPLETE in {elapsed:.1f}s")

        # Free memory
        del G, G_sub, G_nk, node_df

    except Exception as e:
        logger.error(f"  SNAPSHOT FAILED: {e}")
        snapshot_errors.append({'snapshot_id': snap_id, 'error': str(e)})
        import traceback
        traceback.print_exc()

print(f"\n{'='*70}")
print(f"ALL SNAPSHOTS PROCESSED")
print(f"  Mode: {mode_label}")
print(f"  Successful: {len(graph_level_rows)}/{len(snapshot_manifest)}")
print(f"  Errors: {len(snapshot_errors)}")
print(f"{'='*70}")

## 10. Combine Results into Time-Series DataFrames

In [None]:
# ============================================================================
# Combine per-snapshot results into time-series DataFrames
# ============================================================================

# Graph-level time series
graph_ts_df = pd.DataFrame(graph_level_rows)
meta_cols = ['snapshot_id', 'timestamp', 'collector']
feature_cols = [c for c in graph_ts_df.columns if c not in meta_cols]
graph_ts_df = graph_ts_df[meta_cols + sorted(feature_cols)]
graph_ts_df['timestamp'] = pd.to_datetime(graph_ts_df['timestamp'])
graph_ts_df = graph_ts_df.sort_values('timestamp').reset_index(drop=True)

print(f"Graph-level time series: {graph_ts_df.shape}")

# Node-level time series
node_ts_df = pd.concat(node_level_rows, ignore_index=True)
node_ts_df['timestamp'] = pd.to_datetime(node_ts_df['timestamp'])
node_ts_df = node_ts_df.sort_values(['timestamp', 'asn']).reset_index(drop=True)

print(f"Node-level time series: {node_ts_df.shape}")
print(f"  Unique snapshots: {node_ts_df['snapshot_id'].nunique()}")
print(f"  Unique ASes: {node_ts_df['asn'].nunique()}")

## 11. Export Results

In [None]:
# ============================================================================
# Export time-series results
# ============================================================================

# Build filename suffix based on mode
if TARGET_AS:
    suffix = f"{COLLECTOR}_AS{TARGET_AS}_{EGO_K_HOP}hop_{START_DATE}_{END_DATE}"
else:
    suffix = f"{COLLECTOR}_{START_DATE}_{END_DATE}"

# 1. Graph-level time series CSV
graph_ts_path = OUTPUT_DIR / f"graph_level_timeseries_{suffix}.csv"
graph_ts_df.to_csv(graph_ts_path, index=False)
print(f"Graph-level time series: {graph_ts_path}")

# 2. Node-level time series CSV
node_ts_path = OUTPUT_DIR / f"node_level_timeseries_{suffix}.csv"
node_ts_df.to_csv(node_ts_path, index=False)
print(f"Node-level time series: {node_ts_path}")

# 3. Snapshot errors (if any)
if snapshot_errors:
    errors_df = pd.DataFrame(snapshot_errors)
    errors_df.to_csv(OUTPUT_DIR / "snapshot_errors.csv", index=False)
    print(f"Snapshot errors: {OUTPUT_DIR / 'snapshot_errors.csv'}")

# 4. Per-snapshot graph features as JSON
for _, row in graph_ts_df.iterrows():
    snap_id = row['snapshot_id']
    feats = row.drop(['snapshot_id', 'timestamp', 'collector']).to_dict()
    feats_clean = {}
    for k, v in feats.items():
        if isinstance(v, (np.integer,)):
            feats_clean[k] = int(v)
        elif isinstance(v, (np.floating,)):
            feats_clean[k] = float(v) if not np.isnan(v) else None
        elif pd.isna(v):
            feats_clean[k] = None
        else:
            feats_clean[k] = v
    with open(SNAPSHOTS_DIR / f"graph_features_{snap_id}.json", 'w') as f:
        json.dump(feats_clean, f, indent=2, default=str)

print(f"\nAll output files in {OUTPUT_DIR}:")
for f in sorted(OUTPUT_DIR.iterdir()):
    if f.is_file():
        sz = f.stat().st_size / (1024 * 1024)
        print(f"  {f.name:<55} {sz:.2f} MB")

## 12. Visualization: Time-Series Plots

In [None]:
# ============================================================================
# Visualization: Graph-level feature time series
# ============================================================================

title_suffix = f"AS {TARGET_AS} {EGO_K_HOP}-hop Ego" if TARGET_AS else "Full Graph"

key_features = [
    ('n_nodes', 'Number of Nodes'),
    ('n_edges', 'Number of Edges'),
    ('assortativity', 'Assortativity'),
    ('density', 'Density'),
    ('clustering_global', 'Global Clustering'),
    ('diameter', 'Diameter'),
    ('algebraic_connectivity', 'Algebraic Connectivity'),
    ('spectral_radius', 'Spectral Radius'),
    ('degeneracy', 'Degeneracy (k-max)'),
    ('betweenness_max', 'Max Betweenness'),
]

n_plots = len(key_features)
n_cols = 2
n_rows = (n_plots + 1) // 2

fig, axes = plt.subplots(n_rows, n_cols, figsize=(16, 3 * n_rows), sharex=True)
axes = axes.flatten()

for i, (feat, label) in enumerate(key_features):
    ax = axes[i]
    if feat in graph_ts_df.columns:
        vals = graph_ts_df[feat].dropna()
        ts = graph_ts_df.loc[vals.index, 'timestamp']
        ax.plot(ts, vals, 'o-', markersize=4, linewidth=1.5)
        ax.set_ylabel(label)
        ax.grid(True, alpha=0.3)
        ax.set_title(label)
    else:
        ax.set_visible(False)

for j in range(i + 1, len(axes)):
    axes[j].set_visible(False)

fig.autofmt_xdate()
plt.suptitle(f'Graph-Level Feature Time Series ({COLLECTOR}, {title_suffix}, {START_DATE} to {END_DATE})',
             fontsize=14, y=1.02)
plt.tight_layout()
plt.savefig(FIGURES_DIR / 'graph_features_timeseries.png', dpi=150, bbox_inches='tight')
plt.show()
print(f"Saved: {FIGURES_DIR / 'graph_features_timeseries.png'}")

In [None]:
# ============================================================================
# Visualization: Degree Distribution (last snapshot)
# ============================================================================

if len(node_level_rows) > 0:
    last_snap = node_ts_df[node_ts_df['snapshot_id'] == graph_ts_df.iloc[-1]['snapshot_id']]
    degrees_last = last_snap['degree'].dropna().values

    fig, axes = plt.subplots(1, 2, figsize=(14, 5))

    # Linear histogram
    axes[0].hist(degrees_last, bins=100, edgecolor='black', alpha=0.7, color='steelblue')
    axes[0].set_xlabel('Degree')
    axes[0].set_ylabel('Frequency')
    axes[0].set_title(f'Degree Distribution — {title_suffix}')
    axes[0].axvline(x=np.mean(degrees_last), color='red', linestyle='--',
                    label=f'Mean={np.mean(degrees_last):.1f}')
    axes[0].legend()

    # Highlight target AS if in ego mode
    if TARGET_AS and TARGET_AS in last_snap['asn'].values:
        target_deg = last_snap.loc[last_snap['asn'] == TARGET_AS, 'degree'].iloc[0]
        axes[0].axvline(x=target_deg, color='green', linestyle='-', linewidth=2,
                        label=f'AS {TARGET_AS} (deg={int(target_deg)})')
        axes[0].legend()

    # Log-log CCDF
    sorted_deg = np.sort(degrees_last)[::-1]
    ccdf = np.arange(1, len(sorted_deg) + 1) / len(sorted_deg)
    axes[1].loglog(sorted_deg, ccdf, '.', markersize=3, color='steelblue')
    axes[1].set_xlabel('Degree k')
    axes[1].set_ylabel('P(X >= k)')
    axes[1].set_title(f'Degree CCDF — {title_suffix}')
    axes[1].grid(True, alpha=0.3)

    plt.tight_layout()
    plt.savefig(FIGURES_DIR / 'degree_distribution.png', dpi=150, bbox_inches='tight')
    plt.show()
    print(f"Saved: {FIGURES_DIR / 'degree_distribution.png'}")

In [None]:
# ============================================================================
# Visualization: Centrality Correlations (last snapshot)
# ============================================================================

if len(node_level_rows) > 0:
    last_snap = node_ts_df[node_ts_df['snapshot_id'] == graph_ts_df.iloc[-1]['snapshot_id']]
    centrality_cols = ['degree_centrality', 'betweenness_centrality', 'closeness_centrality',
                       'eigenvector_centrality', 'pagerank']
    existing_cols = [c for c in centrality_cols if c in last_snap.columns]

    if len(existing_cols) >= 2:
        fig, ax = plt.subplots(figsize=(8, 6))
        corr = last_snap[existing_cols].corr(method='spearman')
        im = ax.imshow(corr, cmap='RdBu_r', vmin=-1, vmax=1)
        ax.set_xticks(range(len(existing_cols)))
        ax.set_yticks(range(len(existing_cols)))
        short_names = [c.replace('_centrality', '').replace('_', ' ').title() for c in existing_cols]
        ax.set_xticklabels(short_names, rotation=45, ha='right')
        ax.set_yticklabels(short_names)

        for i_r in range(len(existing_cols)):
            for j_r in range(len(existing_cols)):
                ax.text(j_r, i_r, f"{corr.iloc[i_r, j_r]:.2f}", ha='center', va='center',
                        color='white' if abs(corr.iloc[i_r, j_r]) > 0.5 else 'black', fontsize=10)

        plt.colorbar(im, label='Spearman rho')
        ax.set_title(f'Centrality Correlations — {title_suffix}')
        plt.tight_layout()
        plt.savefig(FIGURES_DIR / 'centrality_correlations.png', dpi=150, bbox_inches='tight')
        plt.show()
        print(f"Saved: {FIGURES_DIR / 'centrality_correlations.png'}")

    # --- Target AS feature summary ---
    if TARGET_AS:
        target_row = last_snap[last_snap['asn'] == TARGET_AS]
        if not target_row.empty:
            print(f"\nTarget AS {TARGET_AS} — Feature Summary (last snapshot):")
            feature_cols = [c for c in last_snap.columns
                          if c not in ['asn', 'snapshot_id', 'timestamp']]
            for col in feature_cols:
                val = target_row[col].iloc[0]
                if pd.notna(val):
                    # Show rank within ego network
                    rank = (last_snap[col].dropna() >= val).sum()
                    total = last_snap[col].dropna().shape[0]
                    print(f"  {col:<30} {val:>12.6f}  (rank {rank}/{total})")

## Feature Definitions Reference

**Note:** All features below are computed on the **ego subgraph** (k-hop neighborhood
of the target AS), not the full global AS graph. This means:
- Graph-level features describe the **local topology** around the target AS
- Node-level features are computed for the target AS **and all its k-hop neighbors**
- Changes in these features over time indicate **local topology anomalies**

### Graph-Level Features (16)

| # | Feature | Definition | Ego Interpretation | Citation |
|---|---------|------------|--------------------|----------|
| 1 | **Assortativity** | Pearson correlation of degrees at edge endpoints | Do high-degree nodes in the neighborhood connect to each other? | Newman (2002) |
| 2 | **Density** | 2\|E\| / [\|V\|(\|V\|-1)] | How interconnected is the local neighborhood? | Standard |
| 3 | **Clustering** | 3 x triangles / connected triples | Do the target's peers form triangles with each other? | Watts & Strogatz (1998) |
| 4 | **Diameter** | max d(u,v) in ego subgraph | How "wide" is the local neighborhood? | Watts & Strogatz (1998) |
| 5 | **Algebraic connectivity** | 2nd smallest Laplacian eigenvalue | How robust is the local neighborhood to splits? | Fiedler (1973) |
| 6 | **Spectral radius** | Largest adjacency eigenvalue | Concentration of connectivity in the neighborhood | Cvetkovic et al. (2010) |
| 7 | **Percolation limit** | 1/spectral_radius | Epidemic threshold in the local network | Pastor-Satorras & Vespignani (2001) |
| 8 | **Symmetry ratio** | \|distinct eigenvalues\| / (D+1) | Structural regularity of the neighborhood | Dekker (2005) |
| 9 | **Natural connectivity** | ln[(1/n) sum exp(eigenvalues)] | Redundancy / fault tolerance of neighborhood | Wu et al. (2011) |
| 10 | **Kirchhoff index** | n sum(1/laplacian_eigs) | Effective resistance of the neighborhood | Klein & Randic (1993) |
| 11 | **log(Spanning trees)** | Matrix tree theorem | Path diversity in the local network | Kirchhoff (1847) |
| 12 | **Edge/node connectivity** | Min cut size | Minimum links to disconnect the neighborhood | Whitney (1932) |
| 13 | **Rich-club coefficient** | phi(k) at degree percentiles | Do high-degree neighbors preferentially connect? | Zhou & Mondragon (2004) |
| 14 | **Betweenness distribution** | Mean, max, std, skewness | Traffic concentration in the neighborhood | Brandes (2001) |
| 15 | **k-Core metrics** | Degeneracy, core distribution | Depth of hierarchical structure | Seidman (1983) |
| 16 | **Spectral gap** | lambda_1 - lambda_2 | Expansion properties of the neighborhood | Chung (1997) |

### Node-Level Features (10)

| # | Feature | Definition | Citation |
|---|---------|------------|----------|
| 1 | **Degree centrality** | deg(v)/(n-1) | Freeman (1979) |
| 2 | **Betweenness centrality** | Sum of shortest path fractions through v | Brandes (2001) |
| 3 | **Closeness centrality** | (n-1) / sum of distances from v | Sabidussi (1966) |
| 4 | **Eigenvector centrality** | Principal eigenvector of A | Bonacich (1972) |
| 5 | **PageRank** | Stationary random walk distribution (d=0.85) | Brin & Page (1998) |
| 6 | **Local clustering** | 2 x triangles(v) / [d(d-1)] | Watts & Strogatz (1998) |
| 7 | **Avg neighbor degree** | (1/d) sum of neighbor degrees | Pastor-Satorras et al. (2001) |
| 8 | **Node clique number** | Max clique containing v | NP-hard (Karp, 1972) |
| 9 | **Eccentricity** | max distance from v | Standard |
| 10 | **Core number (k-shell)** | max{k : v in H_k} | Seidman (1983) |

In [None]:
# ============================================================================
# PIPELINE COMPLETE
# ============================================================================

print("\n" + "=" * 70)
print("PIPELINE COMPLETE")
print("=" * 70)
print(f"\nCollector: {COLLECTOR}")
print(f"Date range: {START_DATE} to {END_DATE}")
if TARGET_AS:
    print(f"Mode: Ego-network (AS {TARGET_AS}, {EGO_K_HOP}-hop)")
else:
    print(f"Mode: Full graph")
print(f"Snapshots processed: {len(graph_level_rows)}/{len(snapshot_manifest)}")
if snapshot_errors:
    print(f"Snapshot errors: {len(snapshot_errors)}")
print(f"\nGraph-level time series: {graph_ts_df.shape}")
print(f"Node-level time series: {node_ts_df.shape}")
print(f"\nOutput directory: {OUTPUT_DIR}")
print(f"Figures directory: {FIGURES_DIR}")

# Quick verification
assert len(graph_ts_df) > 0, "No snapshots processed"
assert 'snapshot_id' in graph_ts_df.columns
assert 'timestamp' in graph_ts_df.columns
assert 'algebraic_connectivity' in graph_ts_df.columns

for snap_id_v in graph_ts_df['snapshot_id']:
    n_expected = graph_ts_df.loc[graph_ts_df['snapshot_id'] == snap_id_v, 'n_nodes'].iloc[0]
    n_actual = len(node_ts_df[node_ts_df['snapshot_id'] == snap_id_v])
    assert n_actual == n_expected, f"Node count mismatch for {snap_id_v}: {n_actual} vs {n_expected}"

print("\nAll verification checks passed.")