In [None]:
# === Standard Library ===
import math
import random
import time
import heapq
import itertools
from collections import defaultdict, deque
from itertools import combinations
from typing import Any, Tuple, Dict, List, Set, Sequence, Union, Callable

# === Third-Party Libraries ===

# --- Scientific Computing ---
import numpy as np
import pandas as pd
import scipy.sparse as sp
from scipy.optimize import linprog

# --- Plotting ---
import matplotlib.pyplot as plt

# --- Parallel Processing ---
from joblib import Parallel, delayed
from tqdm import tqdm

# --- Graph Processing ---
import networkx as nx

# --- JIT Compilation ---
from numba import njit, prange

# 1. EPC (parallelized)

In [4]:
def nx_to_csr(G: nx.Graph) -> Tuple[List[int], Dict[int, int], np.ndarray, np.ndarray, np.ndarray]:
     """Convert an undirected NetworkX graph (edge attr `'p'`) to CSR arrays."""
     nodes: List[int] = list(G.nodes())
     idx_of: Dict[int, int] = {u: i for i, u in enumerate(nodes)}

     indptr: List[int] = [0]
     indices: List[int] = []
     probs: List[float] = []

     for u in nodes:
         for v in G.neighbors(u):
             indices.append(idx_of[v])
             probs.append(G.edges[u, v]['p'])
         indptr.append(len(indices))

     return (
         nodes,
         idx_of,
         np.asarray(indptr, dtype=np.int32),
         np.asarray(indices, dtype=np.int32),
         np.asarray(probs, dtype=np.float32),
     )

@njit(inline="always")
def _bfs_component_size(start: int,
                    indptr: np.ndarray,
                    indices: np.ndarray,
                    probs: np.ndarray,
                    deleted: np.ndarray) -> int:
    """Return |C_u|−1 for **one** random realisation (stack BFS)."""
    n = deleted.size
    stack = np.empty(n, dtype=np.int32)
    visited = np.zeros(n, dtype=np.uint8)

    size = 1
    top = 0
    stack[top] = start
    top += 1
    visited[start] = 1

    while top:
        top -= 1
        v = stack[top]
        for eid in range(indptr[v], indptr[v + 1]):
            w = indices[eid]
            if deleted[w]:
                continue
            if np.random.random() >= probs[eid]:
                continue
            if visited[w]:
                continue
            visited[w] = 1
            stack[top] = w
            top += 1
            size += 1
    return size - 1

@njit(parallel=True)
def epc_mc(indptr: np.ndarray,
            indices: np.ndarray,
            probs: np.ndarray,
            deleted: np.ndarray,
            num_samples: int) -> float:
    """Monte‑Carlo estimator of **expected pairwise connectivity** (EPC)."""
    surv = np.where(~deleted)[0]
    m = surv.size
    if m < 2:
        return 0.0

    acc = 0.0
    for _ in prange(num_samples):
        u = surv[np.random.randint(m)]
        acc += _bfs_component_size(u, indptr, indices, probs, deleted)

    return (m * acc) / (2.0 * num_samples)

def epc_mc_deleted(
  G: nx.Graph,
  S: set,
  num_samples: int = 100_000,
) -> float:
  # build csr once
  nodes, idx_of, indptr, indices, probs = nx_to_csr(G)
  n = len(nodes)

  # turn python set S into a mask (node-IDs to delete)
  deleted = np.zeros(n, dtype=np.bool_)
  for u in S:
    deleted[idx_of[u]] = True

  epc = epc_mc(indptr, indices, probs, deleted, num_samples)

  return epc

In [5]:
def local_search_swap(
  # G: nx.Graph,
  S: Set[int],
  *,
  csr: Tuple[List[int], Dict[int,int], np.ndarray, np.ndarray, np.ndarray],
  num_samples: int = 100_000,
  max_iter: int = 5
) -> Set[int]:
  """
  Given initial delete-set S, try 1-for-1 swaps to reduce EPC.
  csr = (nodes, idx_of, indptr, indices, probs).
  """
  nodes, idx_of, indptr, indices, probs = csr
  n = len(nodes)

  # build boolean mask from S
  deleted = np.zeros(n, dtype=np.bool_)
  for u in S:
    deleted[idx_of[u]] = True

  # current EPC
  curr = epc_mc(indptr, indices, probs, deleted, num_samples)

  for it in range(max_iter):
    best_delta = 0.0
    best_swap = None

    # try swapping each i in S with each j not in S
    for i in list(S):
      ii = idx_of[i]
      # undelete i
      deleted[ii] = False

      for j in nodes:
        jj = idx_of[j]
        if deleted[jj]:
          continue
        # delete j
        deleted[jj] = True

        sigma = epc_mc(indptr, indices, probs, deleted, num_samples)
        delta = curr - sigma
        if delta > best_delta:
          best_delta = delta
          best_swap = (ii, jj, sigma)

        # revert j
        deleted[jj] = False

      # revert i

      deleted[ii] = True

    if best_swap is None:
      break   

    # commit the best swap
    ii, jj, new_sigma = best_swap
    deleted[ii] = False
    deleted[jj] = True
    curr = new_sigma

    # update S
    S.remove(nodes[ii])
    S.add(nodes[jj])

  return S

# 2. LNS + 1-swap LS + tiny tabu memory

In [None]:
# component_sampling_epc_mc(G, S, num_samples) → float
# local_search_swap(S, csr, num_samples, max_iter) → Set[int]
# build_csr(G) → (nodes, idx_of, indptr, indices, probs)
# epc_mc(indptr, indices, probs, deleted_mask, num_samples) → float

# greedy EC
def greedy_rebuild(
    G: nx.Graph,
    K: int,
    sigma_mc: Any,
    S_start: Set[int] = None
) -> Set[int]:
    S = set(S_start) if S_start else set()
    σ_S = sigma_mc(S)
    while len(S) < K:
        # compute single‐node gains
        best_gain, best_v = -math.inf, None
        for v in G.nodes():
            if v in S:
                continue
            gain = σ_S - sigma_mc(S | {v})
            if gain > best_gain:
                best_gain, best_v = gain, v
        S.add(best_v)
        σ_S = sigma_mc(S)
    return S

# destroy-recreate lns
def destroy_recreate_lns(
    G: nx.Graph,
    K: int,
    S_init: Set[int],
    rho: float,
    iterations: int,
    sigma_mc: Any
) -> Set[int]:
    S_best = set(S_init)
    score_best = sigma_mc(S_best)

    for _ in range(iterations):
        # destroy
        to_remove = random.sample(S_best, max(1, int(math.ceil(rho * K))))
        S_partial = S_best - set(to_remove)

        # recreate
        S_new = greedy_rebuild(G, K, sigma_mc, S_partial)
        score_new = sigma_mc(S_new)

        if score_new < score_best:
            S_best, score_best = S_new, score_new

    return S_best

# tabu search
def tabu_search_swap(
    S: Set[int],
    csr: Tuple[List[int], Dict[int,int], np.ndarray, np.ndarray, np.ndarray],
    num_samples: int,
    max_iter: int,
    tabu_tenure: int
) -> Set[int]:
    nodes, idx_of, indptr, indices, probs = csr
    n = len(nodes)
    deleted = np.zeros(n, dtype=bool)
    for u in S:
        deleted[idx_of[u]] = True

    curr = epc_mc(indptr, indices, probs, deleted, num_samples)
    tabu: Dict[int,int] = {}

    for _ in range(max_iter):
        best_delta = 0.0
        best_move = None

        # examine all 1‐for‐1 swaps, skipping tabu nodes
        for i in list(S):
            if tabu.get(i, 0) > 0:
                continue
            ii = idx_of[i]
            deleted[ii] = False
            for j in nodes:
                if j in S or tabu.get(j, 0) > 0:
                    continue
                jj = idx_of[j]
                deleted[jj] = True
                new_score = epc_mc(indptr, indices, probs, deleted, num_samples)
                delta = curr - new_score
                if delta > best_delta:
                    best_delta = delta
                    best_move = (i, j, new_score)
                deleted[jj] = False
            deleted[ii] = True

        # decrement tabu tenures
        for node in list(tabu):
            tabu[node] -= 1
            if tabu[node] <= 0:
                del tabu[node]

        if best_move is None:
            break

        # commit best swap
        i_out, j_in, curr = best_move
        ii, jj = idx_of[i_out], idx_of[j_in]
        deleted[ii], deleted[jj] = False, True
        S.remove(i_out); S.add(j_in)

        # mark both nodes tabu
        tabu[i_out] = tabu_tenure
        tabu[j_in]  = tabu_tenure

    return S

def build_csr(G: nx.Graph):

    nodes = sorted(G.nodes())
    idx_of = {u: i for i, u in enumerate(nodes)}
    n = len(nodes)

    degs = [len(list(G.neighbors(u))) for u in nodes]
    indptr = np.zeros(n + 1, dtype=int)
    indptr[1:] = np.cumsum(degs)

    indices = np.empty(indptr[-1], dtype=int)
    probs = np.empty(indptr[-1], dtype=float)
    ptr = 0

    for u in nodes:
        for v in G.neighbors(u):
            indices[ptr] = idx_of[v]
            probs[ptr] = G.edges[u, v]['p']
            ptr += 1
            
    return nodes, idx_of, indptr, indices, probs

# hybrid approach
def lns_tabu_pipeline(
    G: nx.Graph,
    K: int,
    rho: float = 0.3,
    lns_iters: int = 20,
    mc_samples_construct: int = 500,
    mc_samples_ls: int = 2000,
    mc_samples_eval: int = 2000,
    max_ls_iter: int = 3,
    tabu_tenure: int = 5,
    tabu_iter: int = 5
) -> Tuple[Set[int], float]:
    # build CSR once
    csr = build_csr(G)
    # prepare a cached sigma for fast destroy‐recreate
    cache: Dict[frozenset, float] = {}
    def sigma_construct(S: Set[int]) -> float:
        key = frozenset(S)
        if key not in cache:
            cache[key] = epc_mc_deleted(G, S, num_samples=mc_samples_construct)
        return cache[key]

    # initial solution (pure greedy)
    S0 = greedy_rebuild(G, K, sigma_construct)

    best_S, best_score = set(S0), sigma_construct(S0)

    # LNS + LS + Tabu
    for _ in range(1): 
        # destroy‐recreate
        S1 = destroy_recreate_lns(G, K, best_S, rho, lns_iters, sigma_construct)
        # polish with your 1‐swap local search
        S2 = local_search_swap(S1, csr=csr, num_samples=mc_samples_ls, max_iter=max_ls_iter)
        # tiny tabu around 1‐swap
        S3 = tabu_search_swap(S2, csr=csr, num_samples=mc_samples_ls,
                              max_iter=tabu_iter, tabu_tenure=tabu_tenure)
        # final evaluation
        score3 = epc_mc_deleted(G, S3, num_samples=mc_samples_eval)
        if score3 < best_score:
            best_S, best_score = set(S3), score3

    return best_S, best_score

NameError: name 'Callable' is not defined

In [None]:
G = nx.erdos_renyi_graph(100, 0.0443, 42)
p_edge = 1.0
K = 10
# alpha = 0.4

# assign uniform edge‐survival 0.8
for u, v in G.edges():
  G[u][v]['p'] = p_edge

S_star, score_star = lns_tabu_pipeline(
    G.copy(), K=K,
    rho=0.3,
    lns_iters=30,
    mc_samples_construct=1000,
    mc_samples_ls=5000,
    mc_samples_eval=2000,
    max_ls_iter=2,
    tabu_tenure=7,
    tabu_iter=5
)

epc_grasp_final = epc_mc_deleted(G.copy(), S_star, 100_000)

print("Best S:", S_star)
print("EPC(S):", score_star)
print("Estimated final sigma(S*)  :", epc_grasp_final)