# Articulation points

See [wikipedia](https://en.wikipedia.org/wiki/Biconnected_component)

See algorithm at [link](https://stepik.org/lesson/12342/step/10?unit=2794)

In [1]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import networkx as nx
import itertools
import sys
import random
import time
import threading
from random import randint
from collections import defaultdict

In [2]:
# Disable plot axes by default
import matplotlib as mpl
if True:
    mpl.rc('axes.spines', top=False, bottom=False, left=False, right=False)
    mpl.rc('xtick', top=False, bottom=False, labelsize=0)
    mpl.rc('ytick', left=False, right=False, labelsize=0)

In [3]:
from IPython.display import Image, HTML

def pydot_image(graph, prog='dot', width=None):
    dot = nx.nx_pydot.to_pydot(graph)
    if width:
        return Image(dot.create_png(prog=prog), width=width)
    else:
        return HTML(dot.create_svg(prog=prog).decode())

In [4]:
test_edges = [tuple(map(int, s.split())) for s in '0 1\n1 2\n2 0\n3 2\n4 3\n4 2\n5 4\n'.splitlines()]

In [6]:
def make_random_graph(max_nodes, min_nodes=4, max_edges=None, max_node_edges=4):
    max_nodes = max(max_nodes, min_nodes)
    while True:
        num = randint(min_nodes, max_nodes)
        deg_in = [randint(1, max_node_edges) for i in range(num)]
        deg_out = deg_in[:]
        random.shuffle(deg_out)
        graph = nx.directed_configuration_model(in_degree_sequence=deg_in,
                                                out_degree_sequence=deg_out)
        edges = set()
        for a,b,c in list(graph.edges):
            if a != b:
                edges.add((a,b) if a < b else (b,a))
        if not edges:
            continue
        if max_edges and len(edges) > max_edges:
            continue
        graph = nx.Graph(list(edges))
        if len(graph) and nx.is_connected(graph):
            nodes = sorted(graph.nodes)
            graph = nx.relabel_nodes(graph, {nodes[i]: i for i in range(len(nodes))})
            return sorted(graph.edges)

In [7]:
def nx_articulation_points(edges):
    return sorted(nx.articulation_points(nx.from_edgelist(edges)))

cuts = nx_articulation_points(test_edges)
print(' '.join(str(v) for v in cuts))

2 4


## Prepare tests

In [21]:
all_tests = [make_random_graph(min_nodes=4, max_nodes=100) for _ in range(5000)]
print(max(list(map(len, all_tests))))

271


In [8]:
num_nodes = 4000
num_graphs = 3
max_edges = 10000
big_tests = [make_random_graph(min_nodes=num_nodes, max_nodes=num_nodes+10, max_edges=max_edges)
             for _ in range(num_graphs)]
print(max(list(map(len, big_tests))))

9999


In [20]:
num_nodes = 6000
num_graphs = 5
max_edges = 20000
big_tests = [make_random_graph(min_nodes=num_nodes, max_nodes=num_nodes+10, max_edges=max_edges)
             for _ in range(num_graphs)]
print(max(list(map(len, big_tests))))

15145


## Recursive

In [None]:
import sys
from collections import defaultdict

def my_recursive_articulation_points(edge_list):
    adj = defaultdict(set)
    for a,b in edge_list:
        adj[a].add(b)
        adj[b].add(a)
    adj = [sorted(adj[v]) for v in sorted(adj.keys())]
    if not adj:
        return []
    #nv = max(a[-1] if a else 0 for a in adj) + 1
    nv = max(a[-1] for a in adj) + 1
    assert len(adj) == nv

    visited = [False] * nv
    k_no = [0]
    k_val = [None] * nv
    parents = [None] * nv
    edges = set()
    cuts = set()

    def dfs(v):
        visited[v] = True
        k_no[0] += 1
        k_val[v] = k_cur = l_cur = k_no[0]
        l_children = num_children = 0

        for u in adj[v]:
            if visited[u]:
                continue
            parents[u] = v
            edges.add((u,v))
            edges.add((v,u))
            num_children += 1
            l_child = dfs(u)
            if l_child < l_cur:
                l_cur = l_child
            if l_child > l_children:
                l_children = l_child

        z = parents[v]
        while z is not None:
            if z in adj[v] and (z,v) not in edges:
                k_z = k_val[z]
                if k_z < l_cur:
                    l_cur = k_z
            z = parents[z]

        if v == 0:
            if num_children > 1:
                cuts.add(v)
        else:
            if l_children >= k_cur:
                cuts.add(v)

        return l_cur

    dfs(0)

    return sorted(cuts)

edges = test_edges
#edges = list(tuple(map(int, s.split())) for s in sys.stdin)
cuts = my_articulation_points(edges)
print(' '.join(str(v) for v in cuts))

In [None]:
max_time = 0
for test in all_tests:
    nx_cuts = nx_articulation_points(test)
    time1 = time.clock()
    my_cuts = my_recursive_articulation_points(test)
    time2 = time.clock()
    max_time = max(max_time, time2-time1)
    assert my_cuts == nx_cuts, 'not same!'
print('OK %.3g' % max_time)

In [None]:
threading.stack_size(64*1024*1024)
sys.setrecursionlimit(100100)
wrapper_result = [None]
def my_articulation_points_wrapper(test):
    wrapper_result[0] = None
    wrapper_result[0] = my_recursive_articulation_points(test)
nx_max_time = my_max_time = 0
for test in big_tests:
    time1 = time.clock()
    nx_cuts = nx_articulation_points(test)
    time2 = time.clock()
    nx_max_time = max(max_time, time2-time1)
    thread = threading.Thread(target=my_articulation_points_wrapper, args=[test])
    time1 = time.clock()
    thread.start()
    thread.join()
    my_cuts = wrapper_result[0]
    time2 = time.clock()
    my_max_time = max(max_time, time2-time1)
    assert my_cuts == nx_cuts, 'not same!'
print('OK nx:%.3g my:%.3g' % (nx_max_time, my_max_time))

## Non-recursive

In [None]:
import sys
from collections import defaultdict, deque

def my_nonrec_articulation_points(edge_list):
    adj = defaultdict(set)
    for a,b in edge_list:
        adj[a].add(b)
        adj[b].add(a)
    adj = [adj[v] for v in sorted(adj.keys())]
    if not adj:
        return []
    nv = len(adj)
    assert len(adj) == max(max(a) for a in adj) + 1

    visited = [False] * nv
    k_no = [0]
    k_val = [None] * nv
    l_val = [None] * nv
    parents = [None] * nv
    l_children = [0] * nv
    n_children = [0] * nv
    edges = set()
    cuts = []

    k_val[0] = l_val[0] = 1
    visited[0] = True
    k_no = [1]
    todo1 = [list(adj[v]) for v in range(nv)]
    todo2 = [[] for v in range(nv)]
    stack = [0]
    v = 0

    while v is not None or stack:
        if v is None:
            v = stack[-1]
        while todo2[v]:
            u = todo2[v].pop()
            if l_val[u] < l_val[v]:
                l_val[v] = l_val[u]
            if l_val[u] > l_children[v]:
                l_children[v] = l_val[u]
        else:
            if todo1[v]:
                u = todo1[v].pop()
                if visited[u]:
                    continue
                parents[u] = v
                edges.add((v,u))
                n_children[v] += 1
                visited[u] = True
                k_no[0] += 1
                k_val[u] = l_val[u] = k_no[0]
                todo2[v].append(u)
                stack.append(u)
                v = u
            else:
                v = stack.pop()
                u = parents[v]
                while u is not None:
                    if (u in adj[v]) and k_val[u] < l_val[v] \
                            and ((u,v) not in edges) and ((v,u) not in edges):
                        l_val[v] = k_val[u]
                    u = parents[u]
                v = None

    for v in range(nv):
        if (v == 0 and n_children[v] > 1) or (v > 0 and l_children[v] >= k_val[v]):
            cuts.append(v)

    return sorted(cuts)

#edge_list = list(tuple(map(int, s.split())) for s in sys.stdin)
edge_list = test_edges
cuts = my_nonrec_articulation_points(edge_list)
print(' '.join(str(v) for v in cuts))


In [None]:
max_time = 0
for test in all_tests:
    nx_cuts = nx_articulation_points(test)
    time1 = time.clock()
    my_cuts = my_nonrec_articulation_points(test)
    time2 = time.clock()
    max_time = max(max_time, time2-time1)
    assert my_cuts == nx_cuts, 'not same!'
print('OK %.3g' % max_time)

In [None]:
for i, test in enumerate(big_tests):
    time1 = time.clock()
    nx_cuts = nx_articulation_points(test)
    time2 = time.clock()
    nx_max_time = max(max_time, time2-time1)
    time1 = time.clock()
    nx_cuts = my_nonrec_articulation_points(test)
    time2 = time.clock()
    my_max_time = max(max_time, time2-time1)
    if my_cuts != nx_cuts:
        print('big_test[%d]: not same!' % i)
print('OK nx:%.3g my:%.3g' % (nx_max_time, my_max_time))

## From NetworkX

In [22]:
import sys
from collections import defaultdict, deque

def _my_nx_dfs(adj, components=True):
    # depth-first search algorithm to generate articulation points and biconnected components
    visited = set()
    for start in sorted(adj.keys()):
        if start in visited:
            continue
        discovery = {start:0} # "time" of first discovery of node during search
        low = {start:0}
        root_children = 0
        visited.add(start)
        edge_stack = []
        stack = [(start, start, iter(adj[start]))]
        while stack:
            grandparent, parent, children = stack[-1]
            try:
                child = next(children)
                if grandparent == child:
                    continue
                if child in visited:
                    if discovery[child] <= discovery[parent]: # back edge
                        low[parent] = min(low[parent],discovery[child])
                        if components:
                            edge_stack.append((parent,child))
                else:
                    low[child] = discovery[child] = len(discovery)
                    visited.add(child)
                    stack.append((parent, child, iter(adj[child])))
                    if components:
                        edge_stack.append((parent,child))
            except StopIteration:
                stack.pop()
                if len(stack) > 1:
                    if low[parent] >= discovery[grandparent]:
                        if components:
                            ind = edge_stack.index((grandparent,parent))
                            yield edge_stack[ind:]
                            edge_stack=edge_stack[:ind]
                        else:
                            yield grandparent
                    low[grandparent] = min(low[parent], low[grandparent])
                elif stack: # length 1 so grandparent is root
                    root_children += 1
                    if components:
                        ind = edge_stack.index((grandparent,parent))
                        yield edge_stack[ind:]
        if not components:
            # root node is articulation point if it has more than 1 child
            if root_children > 1:
                yield start

def my_nx_articulation_points(edge_list):
    adj = defaultdict(set)
    for a,b in edge_list:
        adj[a].add(b)
        adj[b].add(a)
    if not adj:
        return []
    return sorted(set(_my_nx_dfs(adj, components=False)))

#edge_list = list(tuple(map(int, s.split())) for s in sys.stdin)
edge_list = test_edges
cuts = my_nx_articulation_points(edge_list)
print(' '.join(str(v) for v in cuts))


2 4


In [23]:
max_time = 0
for test in all_tests:
    nx_cuts = nx_articulation_points(test)
    time1 = time.clock()
    my_cuts = my_nx_articulation_points(test)
    time2 = time.clock()
    max_time = max(max_time, time2-time1)
    assert my_cuts == nx_cuts, 'not same!'
print('OK %.3g' % max_time)

OK 0.00558


In [30]:
for i, test in enumerate(big_tests):
    time1 = time.clock()
    nx_cuts = nx_articulation_points(test)
    time2 = time.clock()
    nx_max_time = max(max_time, time2-time1)

    time1 = time.clock()
    my_cuts = my_nx_articulation_points(test)
    time2 = time.clock()
    my_max_time = max(max_time, time2-time1)

    if my_cuts != nx_cuts:
        print('big_test[%d]: not same: %s != %s' % (i, nx_cuts, my_cuts))
print('OK nx:%.3g my:%.3g' % (nx_max_time, my_max_time))

OK nx:0.27 my:0.163


In [29]:
test = big_tests[3]
print(nx_articulation_points(test))
print(my_nx_articulation_points(test))

[4851]
[4851]


## Subprocess tests

In [31]:
def run_program(edges):
    from sys import executable
    from subprocess import run, STDOUT, PIPE
    sdata = ''.join('{} {}\n'.format(a,b) for a,b in edges)
    pname = 'articulation-points.py'
    res = run([executable, pname], stderr=STDOUT, stdout=PIPE, input=bytes(sdata,'utf8'))
    return res.stdout.strip().decode()

print(run_program(test_edges))

max_time = 0
for test in all_tests[:100]:
    nx_cuts = ' '.join(str(v) for v in nx_articulation_points(test))
    time1 = time.clock()
    my_cuts = run_program(test)
    time2 = time.clock()
    max_time = max(max_time, time2-time1)
    assert my_cuts == nx_cuts, 'my:[{}] != nx:[{}]'.format(my_cuts, nx_cuts)
print('all OK %.3g' % max_time)

for test in big_tests[:10]:
    nx_cuts = ' '.join(str(v) for v in nx_articulation_points(test))
    time1 = time.clock()
    my_cuts = run_program(test)
    time2 = time.clock()
    max_time = max(max_time, time2-time1)
    assert my_cuts == nx_cuts, 'not same!'
print('big OK %.3g' % max_time)

2 4
all OK 0.417
big OK 0.569


## Debugging

In [None]:
if False:
    graph = nx.from_edgelist(test)
    dot = nx.nx_pydot.to_pydot(graph)
    HTML(dot.create_svg(prog='dot').decode())