In [198]:
import numpy as np
import matplotlib.pyplot as plt
import networkx as nx
from scipy.sparse.linalg import svds
from sklearn.preprocessing import normalize
from sklearn.metrics.pairwise import cosine_similarity

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

## functions to generate dynamic SBM according to changepoints 

In [253]:
def create_SBM_parameters(n=50, p11=0.25, p12=0.05, minimum_average_node_per_block=5):
    num_blocks = 2 ** np.arange(np.log(n // minimum_average_node_per_block) / np.log(2))
    num_blocks = num_blocks.astype(int)
    size_prob_parameters = []
    for n_block in num_blocks:
        sizes = [n // n_block] * (n_block - 1)
        sizes.append(n - sum(sizes))
        p = (p11 - p12) * np.eye(n_block) + p12 * np.ones((n_block, n_block))
        size_prob_parameters.append([sizes, p])
        
    return size_prob_parameters

In [226]:
def generate_snapshot_on_consecutive_networks(G_old, G_new, alpha=0):
    if alpha == 0:
        return G_old
    if alpha == 1:
        return G_new
    return nx.from_edgelist([edge for u, edge in zip(np.random.rand(G_old.number_of_edges()), nx.to_edgelist(G_old)) if u < alpha] + \
                            [edge for u, edge in zip(np.random.rand(G_new.number_of_edges()), nx.to_edgelist(G_new)) if u > alpha])

In [242]:
def generate_dynamic_SBM(T=100, cps=None, n=50, p11=0.25, p12=0.05, alpha=0, directed=False):
    if cps is None:
        cps = [T//2]
    cps.sort()
    cp_counter = 0
    G_prev = None
    G_curr = None
    G_list = []
    size_prob_parameters = create_SBM_parameters(n, p11, p12)
    size_prob_index_prev = None
    size_prob_index_curr = np.random.choice(len(size_prob_parameters))
    for t in range(T):
        cp = False
        if cp_counter < len(cps) and t == cps[cp_counter]:
            cp = True
            if size_prob_index_prev is None:
                size_prob_index_prev = size_prob_index_curr
            while size_prob_index_curr == size_prob_index_prev:
                size_prob_index_curr = np.random.choice(len(size_prob_parameters))
            cp_counter += 1
        
        sizes, p = size_prob_parameters[size_prob_index_curr]
        G_curr = nx.stochastic_block_model(sizes, p, directed=directed)
        if G_prev is not None and cp == False:
            G_curr = generate_snapshot_on_consecutive_networks(G_prev, G_curr, alpha)
        G_list.append(G_curr)
        G_prev = G_curr
    
    return G_list, cps

## functions to compute z-scores based on the Laplacian eigenvalues and detect changepoints

In [228]:
def compute_z_score_single(curr_vec, typical_vec):
    if len(curr_vec.shape) != 2:
        curr_vec = curr_vec.reshape(1, -1)
    if len(typical_vec.shape) != 2:
        typical_vec = typical_vec.reshape(1, -1)    
    return 1 - np.abs(cosine_similarity(curr_vec, typical_vec))

In [264]:
def compute_typical_vector_from_window(eigenvalue_list_window, principal=True):
    if principal == False:
        return np.mean(eigenvalue_list_window, axis=0)
    eigenvalue_list_window = eigenvalue_list_window.T # transpose so that time is along the 1-axis
    u, _, _ = np.linalg.svd(eigenvalue_list_window, full_matrices=False)
    return u[:, 0]

In [230]:
def compute_z_score_sequence_two_windows(eigenvalue_list, principal=True, window1=None, window2=None, initial_window=None, difference=True):
    T = eigenvalue_list.shape[0]
    if initial_window is None:
        initial_window = T // 20
    if window1 is None:
        window1 = T // 100
    if window2 is None:
        window2 = T // 50
    if initial_window < max(window1, window2):
        initial_window = max(window1, window2)
    
    z1_list = np.zeros(T)
    z2_list = np.zeros(T)
    for t in range(initial_window, T):
        typical_vec = compute_typical_vector_from_window(eigenvalue_list[t-window1:t], principal=principal)
        z1 = compute_z_score_single(eigenvalue_list[t], typical_vec)
        typical_vec = compute_typical_vector_from_window(eigenvalue_list[t-window2:t], principal=principal)
        z2 = compute_z_score_single(eigenvalue_list[t], typical_vec)
        z1_list[t] = z1
        z2_list[t] = z2
    
    if difference:
        z1_list = np.diff(z1_list, prepend=0)
        z2_list = np.diff(z2_list, prepend=0)
    z_list = np.maximum(z1_list, z2_list)
    return z_list

In [256]:
def LAD(G_list, cps, num_eigens=None, directed=False, num_detections=None, principal=True, window1=None, window2=None, initial_window=None, difference=True):
    if num_eigens is None:
        num_eigens = G_list[0].number_of_nodes() // 10
    if num_detections is None:
        num_detections = len(G_list) // 100
    eigenvalue_list = np.zeros((len(G_list), num_eigens), dtype=float)
    for t, G_t in enumerate(G_list):
        if directed:
            L_t = nx.directed_laplacian_matrix(G_t)
        else:
            L_t = nx.laplacian_matrix(G_t).asfptype()
        vecs, vals, _ = svds(L_t, k=num_eigens, which="LM")
#         max_val_index = np.argsort(vals)[-1]
#         max_eigenvector_list[t] = vecs[:, max_val_index]
        eigenvalue_list[t] = vals
    eigenvalue_list = eigenvalue_list.real
    eigenvalue_list = normalize(eigenvalue_list, norm='l2')
    
    z_list = compute_z_score_sequence_two_windows(eigenvalue_list, principal=principal, window1=window1, window2=window2, initial_window=initial_window, difference=difference)
    cps_LAD = z_list.argsort()[::-1][:num_detections]
    accuracy = len(set(cps).intersection(set(cps_LAD))) / len(cps)
    return accuracy, cps_LAD

## test the code

In [206]:
G_list, cp_list = generate_dynamic_SBM(100)
accuracy, _ = LAD(G_list, cp_list)
print(accuracy)

1.0


In [266]:
T = 60; cps = [20, 50]; n = 50; directed = True
G_list, cps = generate_dynamic_SBM(T, cps, n, directed=directed)
test_inputs = [(20, 2, True, 2, 4, 5, True), 
               (20, 2, True, 2, 4, 5, False), 
               (20, 1, True, 2, 4, 5, True), 
               (10, 2, False, 4, 10, 10, True)]
print(f"{T} steps (change points: {cps})")
for i, x in enumerate(test_inputs):
    print(f"------ Test {i+1} ---------")
    num_eigens, num_detections, principal, window1, window2, initial_window, difference = x
    accuracy, _ = LAD(G_list, cps, num_eigens=num_eigens, directed=directed, num_detections=num_detections, principal=principal, window1=window1, window2=window2, initial_window=initial_window, difference=difference)
    print(f"[principal: {principal}; difference: {difference}] : accuracy {accuracy}")

60 steps (change points: [20, 50])
------ Test 1 ---------
[principal: True; difference: True] : accuracy 1.0
------ Test 2 ---------
[principal: True; difference: False] : accuracy 0.5
------ Test 3 ---------
[principal: True; difference: True] : accuracy 0.5
------ Test 4 ---------
[principal: False; difference: True] : accuracy 1.0


In [272]:
num_eigens = 20
window1 = 5
window2 = 10
initial_window = 20
difference = True
principal = True


for i in range(5):
    T = 100 + 10 ** np.random.randint(4)
    num_cps = 2 ** np.random.randint(1, 4)
    cps = np.random.choice(np.arange(20, T), num_cps, replace=False)
    n = 2 ** np.random.randint(5, 10)
    directed = np.random.rand() > 0.5
    G_list, cps = generate_dynamic_SBM(T, cps, n, directed=directed)
    print(f"------ Test {i+1}: {n} nodes, {T} steps, {num_cps} cps, directed: {directed}  ---------")
    accuracy, _ = LAD(G_list, cps, num_eigens=num_eigens, directed=directed, num_detections=num_cps, principal=principal, window1=window1, window2=window2, initial_window=initial_window, difference=difference)
    print(f"accuracy: {accuracy}")

------ Test 1: 128 nodes, 110 steps, 8 cps, directed: False  ---------
accuracy: 0.875
------ Test 2: 32 nodes, 200 steps, 3 cps, directed: True  ---------
accuracy: 1.0
------ Test 3: 512 nodes, 1100 steps, 9 cps, directed: True  ---------
accuracy: 1.0
------ Test 4: 32 nodes, 110 steps, 6 cps, directed: False  ---------
accuracy: 1.0
------ Test 5: 128 nodes, 200 steps, 7 cps, directed: False  ---------
accuracy: 0.8571428571428571


In [277]:
num_eigens = 10
window1 = 5
window2 = 10
initial_window = 20
difference = True
principal = True


for i in range(5):
    T = 100 + 10 ** np.random.randint(4)
    num_cps = 2 ** np.random.randint(4, 7)
    cps = np.random.choice(np.arange(20, T), num_cps, replace=False)
    n = 2 ** np.random.randint(4, 9)
    directed = np.random.rand() > 0.5
    G_list, cps = generate_dynamic_SBM(T, cps, n, directed=directed)
    print(f"------ Test {i+1}: {n} nodes, {T} steps, {num_cps} cps, directed: {directed}  ---------")
    accuracy, _ = LAD(G_list, cps, num_eigens=num_eigens, directed=directed, num_detections=num_cps, principal=principal, window1=window1, window2=window2, initial_window=initial_window, difference=difference)
    print(f"accuracy: {accuracy}")

------ Test 1: 64 nodes, 110 steps, 64 cps, directed: True  ---------
accuracy: 0.625
------ Test 2: 256 nodes, 200 steps, 32 cps, directed: True  ---------
accuracy: 0.84375
------ Test 3: 128 nodes, 101 steps, 16 cps, directed: True  ---------
accuracy: 0.75
------ Test 4: 64 nodes, 1100 steps, 64 cps, directed: True  ---------
accuracy: 0.96875
------ Test 5: 256 nodes, 101 steps, 32 cps, directed: False  ---------
accuracy: 0.75


## use ipytest to check if the code returns reasonable results

In [245]:
cps_list = [None, [20, 50]]
T_list = [None, 60]
num_eigens_list=[None, 20]
directed_list=[False, True]
num_detections_list=[None, 1, 2]
principal_list=[True]
window1_list=[None]
window2_list=[None]
initial_window_list=[None]
difference_list=[True, False]


In [248]:
import pytest
import ipytest
ipytest.autoconfig()
import itertools

test_inputs = list(itertools.product(cps_list, T_list, num_eigens_list, directed_list, num_detections_list, principal_list, window1_list, window2_list, initial_window_list, difference_list))
print(len(test_inputs))

def f_test(x):
    cps, T, num_eigens, directed, num_detections, principal, window1, window2, initial_window, difference = x
    G_list, cps = generate_dynamic_SBM(T, cps)
    accuracy, _ = LAD(G_list, cps, num_eigens=num_eigens, directed=directed, num_detections=num_detections, principal=principal, window1=window1, window2=window2, initial_window=initial_window, difference=difference)
    return accuracy

@pytest.mark.parametrize("test_input", test_inputs)
def test_eval(test_input, expected=None):
    result = f_test(test_input)
    print(test_input, result)
    assert result is not None and result <= 1 and result >= 0
    
ipytest.run()