In [4]:
import estimators
from embeddings import PageRank

import numpy as np
import networkx as nx
import typing as tp
import copy
from itertools import chain

In [2]:
def communities_tail_indices(communities: list[set[int]],
                             pageranks: np.ndarray) -> tuple[list[int], list[int]]:
    '''
    Computes communities' tail indices and corresponding optimal tail sizes.
    '''
    tail_indices = []
    tail_sizes = []
    for community in communities:
        tail_index, tail_size = estimators.tail_index_and_tail_size(pageranks[list(community)])
        tail_indices.append(tail_index)
        tail_size.append(tail_size)

    return tail_indices, tail_sizes

In [None]:
class PRTICommunities:
    '''
    Community search on evolving random graphs using PageRank tail indices
    '''
    def __init__(self, init_graph: nx.MultiDiGraph, min_community: int,
                 pl_test_error: float = 0.05) -> None:
        '''
        Initialises base partition.
        :param init_graph: initial graph (nodes are expected to be integers 0, 1, ..., len(init_graph) - 1)
        :param min_community_size: minimal size of community
        :param pl_test_error: maximal first type error for P-L test
        '''
        self.max_node = len(init_graph)
        self.pl_test_error = pl_test_error

        self.normal_communities = nx.communities.louvain_communities(init_graph)
        # TODO: attach small communities to large ones
        
        self.abnormal_communities: list[set[int]] = []

        self.community_labels = {}
        for i, community in enumerate(self.normal_communities):
            self.community_labels.update((v, i) for v in community)

        self.embeddings = PageRank(init_graph)

        self.tail_indices, self.tail_sizes = communities_tail_indices(
            self.normal_communities, self.embeddings.to_numpy()
        )

    def __classify(self, graph: nx.MultiDiGraph, v: int) -> int:
        min_class = None
        for u in chain(graph.successors(v), graph.predecessors(v)):
            if u not in self.community_labels:
                continue
            cur_tail_index = self.tail_indices[self.community_labels[u]]
            if min_class is None or cur_tail_index < self.tail_indices[min_class]:
                min_class = self.community_labels[u]

        return min_class

    def update(self, snapshot: nx.MultiDiGraph) -> None:
        '''
        Updates the partition.
        :param snapshot: new snapshot of a graph
        '''
        self.embeddings.update(snapshot)
        pageranks = self.embeddings.to_numpy()

        added_nodes = [set() for _ in range(len(self.normal_communities))]
        for v in range(self.max_node, len(snapshot)):
            label = self.__classify(snapshot, v)
            added_nodes[label].add(v)

        grown_communities = [
            self.normal_communities[i] | added_nodes[i]
            for i in range(len(self.normal_communities))
        ]
        grown_tail_indices, grown_tail_sizes = communities_tail_indices(
            grown_communities, pageranks
        )

        for i in range(len(self.normal_communities)):
            pvalue = phillips_loretan_pvalue(
                self.tail_indices[i], self.tail_sizes[i],
                grown_tail_indices[i], grown_tail_sizes[i]
            )
            if pvalue < self.pl_test_error:
                # classify added nodes as new abnormal community