In [4]:
import networkx as nx

# 1. Creating a custom graph

In [5]:
# Create an empty undirected graph
G = nx.Graph()

In [6]:
C1 = [(0, 1), (1, 2), (2, 3)]
C2 = [(4, 5), (5, 6)]

G.add_edges_from(C1)
G.add_edges_from(C2)
# G.add_edges_from([
#     (1, 2),
#     (2, 3),
#     (3, 4)
# ])

# G.add_edges_from([
#     (5, 6),
#     (6, 7),
# ])

In [7]:
# Verifying components
list(nx.connected_components(G))

[{0, 1, 2, 3}, {4, 5, 6}]

In [8]:
G.nodes

NodeView((0, 1, 2, 3, 4, 5, 6))

In [9]:
len(G.nodes)

7

In [10]:
G.edges

EdgeView([(0, 1), (1, 2), (2, 3), (4, 5), (5, 6)])

In [11]:
len(G.edges)

5

## BFS + Connected components algorithm for PC computing

In [None]:
from collections import deque

def bfs(G: nx.Graph, source: int, visited: set[int]) -> list[int]:
  queue = deque()
  visited.add(source)
  res = []

  # visited[source] = True
  queue.append(source)

  while len(queue) > 0:

    v = queue.pop(0)
    res.append(v)
    # print(f"queue: {v}")

    for u in G.neighbors(v):
      if u not in visited:
        visited.add(u)
        queue.append(u)

  return res

In [None]:
def connected_components(G: nx.Graph) -> list[list[int]]:
  # visited = [False] * list(G.nodes)
  visited : set[int] = set()
  components : list[list[int]] = []

  V = list(G.nodes)

  for v in V:
    if v not in visited:
      comp = bfs(G, v, visited)
      components.append(comp)
      # yield comp

  return components

In [80]:
components = connected_components(G)
components

<generator object connected_components at 0x71f849489240>

- Considering terminal nodes

In [15]:
def create_custom_graph_with_2_comps(terminal_nodes: list[int]) -> nx.Graph:

  G = nx.Graph()

  C1 = [(0, 1), (1, 2), (2, 3)]
  C2 = [(4, 5), (5, 6)]

  G.add_edges_from(C1)
  G.add_edges_from(C2)

  # Add terminal attribute to nodes
  for v in G.nodes:
    if v in terminal_nodes:
      G.nodes[v]["terminal"] = True
    else:
      G.nodes[v]["terminal"] = False
  
  return G

In [16]:
TERMINAL_NODES = [0, 1, 3, 5, 6]

G = create_custom_graph_with_2_comps(terminal_nodes=TERMINAL_NODES)

In [17]:
G.nodes

NodeView((0, 1, 2, 3, 4, 5, 6))

In [18]:
G.nodes(data="terminal")

NodeDataView({0: True, 1: True, 2: False, 3: True, 4: False, 5: True, 6: True}, data='terminal')

In [19]:
def exclude_non_terminal_nodes_from_component(terminal_nodes: list[int], components: list[list[int]]) -> list[list[int]]:

  excluded_components = []

  for comp in components:
    excluded_comp = []
    for v in comp:
      if v in terminal_nodes:
        excluded_comp.append(v)
      
    excluded_components.append(excluded_comp)

  return excluded_components

In [20]:
components

[[0, 1, 2, 3], [4, 5, 6]]

In [21]:
excluded_comps = exclude_non_terminal_nodes_from_component(TERMINAL_NODES, components)
excluded_comps

[[0, 1, 3], [5, 6]]

- Compute Pairwise connectivity considering terminal nodes

In [22]:
import math

def compute_pc_terminals(G: nx.Graph, terminal_nodes: list[int]) -> int:

  # 1. Get connected components
  components = connected_components(G)

  # 2. Exclude non-terminal nodes from the components
  excluded_components = exclude_non_terminal_nodes_from_component(terminal_nodes, components)

  # 3. Compute pairwise connectivity using comb(n, 2) = n! / (n - 2)! * 2!
  pairwise_connectivity = 0

  for comp in excluded_components:
    n = int(len(comp))
    pairwise_connectivity += math.comb(n, 2)

  return pairwise_connectivity

In [23]:
TERMINAL_NODES = [0, 1, 2, 5, 6]

pc = compute_pc_terminals(G, TERMINAL_NODES)
pc

4

## Compute pairwise connectivity

In [None]:
def compute_pc(G: nx.Graph, S: set, terminal_nodes: list[int]) -> int:

  # Remaining vertices after deletion
  # V_remaining = set(G.nodes) - S
  # num_rem = len(V_remaining)

  T = set(terminal_nodes)

  G_copy = G.copy()
  G_copy.remove_nodes_from(S)

  # 1. Get connected components
  components = connected_components(G_copy)

  # 2. Exclude non-terminal nodes from the components
  excluded_components = []

  for comp in components:
    excluded_comp = []
    for v in comp:
      if v in T:
        excluded_comp.append(v)

    excluded_components.append(excluded_comp) 

  # print(f"Excluded components: {excluded_components}")
  
  # 3. Compute pairwise connectivity across terminal nodes over components
  total_pc = 0
  for comp in excluded_components:
    n = len(comp)
    total_pc += n * (n - 1) // 2

  ## More simpler usage:
  # total_pc = 0
  # for comp in connected_components(G_copy):
  #   comp_count = sum(1 for v in comp if v in T)
  #   total_pc += comp_count * (comp_count - 1) // 2

  return total_pc

In [25]:
G.nodes

NodeView((0, 1, 2, 3, 4, 5, 6))

In [26]:
S = set([2, 5])

In [27]:
set(G.nodes) - S

{0, 1, 3, 4, 6}

In [28]:
components = connected_components(G)
components

exclude_components = []
for comp in components:
  exclude_comp = []
  for v in comp:
    if v in set(G.nodes) - S and v in TERMINAL_NODES:
      exclude_comp.append(v)

  exclude_components.append(exclude_comp)

exclude_components


[[0, 1], [6]]

In [29]:
type(G.nodes - S)

set

In [30]:
G.edges

EdgeView([(0, 1), (1, 2), (2, 3), (4, 5), (5, 6)])

## Simple Greedy from ES and from MIS algorithms

### Greedy from ES

In [None]:
def greedy_algorithm(
    G: nx.Graph, terminals: list[int], 
    k: int, case: 1 | 2) -> tuple[set, list[int]]:
  
  # If graph is too big, use sparse representation
  # for better memory usage

  T : set = set(terminals)
  V : set = set(G.nodes)

  S : set = set()
  pc_deltas : list[int] = []

  for _ in range(k):

    best_node = None
    best_deg = -1
    best_pc = float("inf")

    # print(f"\nK: iteration\n")
    for node in V - S:

      if case == 1:
        # case 1: S in V
        S_j = S | {node}
        pc_Sj = compute_pc(G=G, S=S_j, terminal_nodes=terminals)

        print(f"node: {node} and pc Sj : {pc_Sj}")

      if case == 2:
        # case 2: S in V \ T
        if node not in T:
          S_j = S | {node}
          pc_Sj = compute_pc(G=G, S=S_j, terminal_nodes=terminals)

      # tie-breaker - if two nodes give the same best_pc
      # get the one with higher degree
      deg = G.degree(node)

      # argmin
      if pc_Sj < best_pc or (pc_Sj == best_pc and deg > best_deg):
        best_pc = pc_Sj
        best_node = node
        best_deg = deg
        print(f"best node: {best_node} and best gain: {best_pc}")

    if best_node is None:
      # No feasible solution
      break 
    
    S.add(best_node)
    pc_deltas.append(best_pc)

  return S, pc_deltas

In [176]:
TERMINAL_NODES = [0, 1, 3, 5, 6]
G = create_custom_graph_with_2_comps(TERMINAL_NODES)

print(G.nodes, "\n", G.edges)

[0, 1, 2, 3, 4, 5, 6] 
 [(0, 1), (1, 2), (2, 3), (4, 5), (5, 6)]


In [177]:
S = greedy_algorithm(G=G, terminals=TERMINAL_NODES, k=2, case=1)
S

node: 0 and pc Sj : 2
best node: 0 and best gain: 2
node: 1 and pc Sj : 1
best node: 1 and best gain: 1
node: 2 and pc Sj : 2
node: 3 and pc Sj : 2
node: 4 and pc Sj : 4
node: 5 and pc Sj : 3
node: 6 and pc Sj : 3
node: 0 and pc Sj : 1
best node: 0 and best gain: 1
node: 2 and pc Sj : 1
best node: 2 and best gain: 1
node: 3 and pc Sj : 1
node: 4 and pc Sj : 1
node: 5 and pc Sj : 0
best node: 5 and best gain: 0
node: 6 and pc Sj : 0


({1, 5}, [1, 0])

In [57]:
G = create_custom_graph_with_2_comps(TERMINAL_NODES)
G

<networkx.classes.graph.Graph at 0x71f8497b31a0>

In [58]:
G.nodes

NodeView((0, 1, 2, 3, 4, 5, 6))

In [59]:
G.edges

EdgeView([(0, 1), (1, 2), (2, 3), (4, 5), (5, 6)])

In [60]:
G.remove_node(1)


In [62]:
G.edges

EdgeView([(2, 3), (4, 5), (5, 6)])

In [63]:
G.nodes

NodeView((0, 2, 3, 4, 5, 6))

In [84]:
components =list(nx.connected_components(G))
components

[{0}, {2, 3}, {4, 5, 6}]

In [87]:
components = connected_components(G)
components

[[0], [2, 3], [4, 5, 6]]

### Greedy MIS

In [None]:
def greedy_mis_algorithm(
    G: nx.Graph, terminals: list[int], 
    k: int, case: int, mis_trails: int = 30 ) -> tuple[set, list[int]]:
  
  # If graph is too big, use sparse representation
  # for better memory usage
  
  S : set = set()
  V : set = set(G.nodes)
  T : set = set(terminals)

  # Loop for selecting best warm-up MIS
  # which has resulted in lower total PC
  best_pc = float("inf")
  MIS : set = set()

  for _ in range(mis_trails):
    mis = set(nx.maximal_independent_set(G))

    # If case 2, ensure terminal nodes are not deleted
    # keep terminal nodes in MIS
    if case == 2:
      mis = mis | T
    
    # deletions correspond to V \ MIS
    S0 = V - mis

    # feasibility for case 2: do not delete terminal nodes
    if case == 2 and (S0 & T):
      continue

    curr_pc = compute_pc(G=G, S=S0, terminal_nodes=terminals)

    if curr_pc < best_pc:
      best_pc = curr_pc
      MIS = mis

  print(f"mis: {MIS}")
  
  pc_deltas : list[int] = []

  while len(MIS) != len(V) - k:

    best_node = None
    best_deg = -1
    best_pc = float("inf")

    for node in V - MIS:

      if case == 1:
        # case 1: S in V
        S_j = V - (MIS | {node})
        pc_Sj = compute_pc(G=G, S=S_j, terminal_nodes=terminals)
        print(f"node: {node} and pc Sj : {pc_Sj}")
        
      if case == 2:
        # case 2: S in V \ T
        if node not in T:
          S_j = V - (MIS | {node})
          pc_Sj = compute_pc(G=G, S=S_j, terminal_nodes=terminals)

      # tie-breaker - if two nodes give the same best_pc
      # get the one with higher degree
      deg = G.degree(node)

      # argmin
      if pc_Sj < best_pc or (pc_Sj == best_pc and deg > best_deg):
        best_pc = pc_Sj
        best_node = node
        best_deg = deg
        # print(f"best node: {best_node} and best gain: {best_pc}")
    
    if best_node is None:
      # No feasible solution
      break

    MIS.add(best_node)
    pc_deltas.append(best_pc)
  
    print(f"mis iter: {MIS}")  
  
  S = V - MIS
  print(f"mis: {MIS} and S: {S}")

  return S, pc_deltas

In [199]:
TERMINAL_NODES = [0, 1, 3, 5, 6]
G = create_custom_graph_with_2_comps(TERMINAL_NODES)

print(G.nodes, "\n", G.edges)

[0, 1, 2, 3, 4, 5, 6] 
 [(0, 1), (1, 2), (2, 3), (4, 5), (5, 6)]


In [200]:
S = greedy_mis_algorithm(G=G, terminals=TERMINAL_NODES, k=2, case=1)
S

mis: {0, 2, 4, 6}
node: 1 and pc Sj : 1
best node: 1 and best gain: 1
node: 3 and pc Sj : 0
best node: 3 and best gain: 0
node: 5 and pc Sj : 1
mis iter: {0, 2, 3, 4, 6}
mis: {0, 2, 3, 4, 6} and S: {1, 5}


({1, 5}, [0])

In [191]:
G.nodes

NodeView((0, 1, 2, 3, 4, 5, 6))

In [144]:
nx.maximal_independent_set(G)

[5, 0, 2]