In [2]:
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt

from Qommunity.samplers.hierarchical.advantage_sampler import AdvantageSampler
from Qommunity.searchers.hierarchical_searcher import HierarchicalSearcher

In [3]:
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Set, Any

@dataclass
class CommunityNode:
    """Represents a community in the hierarchy with parent-child relationships."""
    community: List[int]
    node_id: int
    parent_id: Optional[int] = None
    level: int = 0
    children_ids: List[int] = field(default_factory=list)
    modularity: Optional[float] = None
    sample_info: Optional[Any] = None

In [14]:
from Qommunity.samplers.hierarchical.hierarchical_sampler import HierarchicalSampler
import networkx as nx
import numpy as np

class HierarchicalSearcher:
    # Keep existing methods
    def __init__(self, sampler: HierarchicalSampler) -> None:
            self.sampler = sampler

    def single_community_search(
        self, verbosity: int = 0, community: list | None = None
    ) -> list:
        if not community:
            community = [*range(self.sampler.G.number_of_nodes())]

        if verbosity >= 1:
            print("Starting community detection")
        if verbosity >= 2:
            print("===========================================")
            print("Calculations for graph with", len(community), "nodes in community")
            print("===========================================")

        sample = self.sampler.sample_qubo_to_dict()

        c0, c1 = self._split_dict_to_lists(sample, community)

        if verbosity >= 2:
            print("Base community:", community, sep="\n")
            print("Community division:", c0, c1, sep="\n")
            print("===========================================\n")
        if verbosity >= 1:
            print("Stopping community detection")

        if c0 and c1:
            return [c0] + [c1]
        elif c0:
            return [c0]
        else:
            return [c1]
    
    def hierarchical_community_search(
            self,
            verbosity: int = 0,
            max_depth: int | None = None,
            division_tree: bool = False,
            return_modularities: bool = False,
            return_sampleset_info: bool = False,
            samples_filename: str | None = None,
            track_hierarchy: bool = False,  # New parameter
        ) -> list:
            if verbosity >= 1:
                print("Starting community detection")

            if max_depth is None or max_depth > 1:
                if division_tree == False:
                    division_tree_data = None
                else:
                    division_tree_data = []
                
                # For tracking parent-child relationships
                community_nodes = {}
                next_node_id = 0
                
                samples = []

                result = self._hierarchical_search_recursion(
                    verbosity=verbosity,
                    level=1,
                    max_depth=max_depth,
                    division_tree=division_tree_data,
                    samples=samples,
                    community_nodes=community_nodes if track_hierarchy else None,
                    next_node_id=next_node_id,
                )

                if samples_filename is not None:
                    np.save(f"{samples_filename}.npy", samples)
                
                # Initialize division_modularities variable
                division_modularities = None
                
                # Calculate modularities if needed
                if division_tree_data and return_modularities:
                    division_modularities = []
                    for division in division_tree_data:
                        division_modularity = nx.community.modularity(
                            G=self.sampler.G,
                            communities=division,
                            resolution=self.sampler.resolution,
                        )
                        division_modularities.append(division_modularity)
                elif return_modularities:
                    division_modularities = nx.community.modularity(
                        G=self.sampler.G,
                        communities=result,
                        resolution=self.sampler.resolution,
                    )
                
                if track_hierarchy:
                    # Return the enhanced result with parent-child relationships
                    return self._prepare_hierarchical_result(
                        result, 
                        division_tree_data, 
                        division_modularities,
                        samples if return_sampleset_info else None,
                        community_nodes
                    )
                    # Return original result format if not tracking hierarchy
                if division_tree and return_modularities and return_sampleset_info:
                    return result, division_tree_data, division_modularities, samples
                # Rest of your return statements...
                if division_tree and return_modularities and return_sampleset_info:
                    return result, division_tree, division_modularities, samples
                if division_tree and return_modularities:
                    return result, division_tree, division_modularities
                if division_tree:
                    return result, division_tree
                if return_modularities:
                    return result, division_modularities
                else:
                    return result

    
    def _hierarchical_search_recursion(
        self,
        verbosity: bool,
        max_depth: int,
        level: int,
        community: list | None = None,
        division_tree: list | None = None,
        samples: list | None = None,
        community_nodes: dict | None = None,
        next_node_id: int = 0,
        parent_id: int | None = None,
    ):
        if not community:
            community = [*range(self.sampler.G.number_of_nodes())]

        if len(community) == 1:
            if community_nodes is not None:
                # Add leaf node
                node_id = next_node_id
                next_node_id += 1
                community_nodes[node_id] = CommunityNode(
                    community=community,
                    node_id=node_id,
                    parent_id=parent_id,
                    level=level-1
                )
            return [community]

        if level == 1 and division_tree == []:
            division_tree.append([community])
            
            # Create the root node for tracking
            if community_nodes is not None:
                node_id = next_node_id
                next_node_id += 1
                community_nodes[node_id] = CommunityNode(
                    community=community,
                    node_id=node_id,
                    parent_id=None,
                    level=0
                )
                parent_id = node_id  # First level's parent is the root

        if verbosity >= 2:
            print("===========================================")
            print(
                "Calculations for graph with",
                len(community),
                "nodes, level of recursion:",
                level,
            )
            print("===========================================")

        self.sampler.update_community(community)
        sample, sampleset_full = self.sampler.sample_qubo_to_dict()
        samples.append(sampleset_full)

        c0, c1 = self._split_dict_to_lists(sample, community)

        if verbosity >= 2:
            print("Base community:", community, sep="\n")
            print("Community division:", c0, c1, sep="\n")
            print("===========================================")

        if division_tree:
            if len(division_tree) < level + 1:
                division_tree.append([])

            if c0 and c1:
                division_tree[level].append(c0)
                division_tree[level].append(c1)
                
                # Add to community nodes
                if community_nodes is not None:
                    # Create nodes for c0 and c1
                    c0_id = next_node_id
                    next_node_id += 1
                    c1_id = next_node_id
                    next_node_id += 1
                    
                    # Create the child nodes
                    community_nodes[c0_id] = CommunityNode(
                        community=c0,
                        node_id=c0_id,
                        parent_id=parent_id,
                        level=level
                    )
                    
                    community_nodes[c1_id] = CommunityNode(
                        community=c1,
                        node_id=c1_id,
                        parent_id=parent_id,
                        level=level
                    )
                    
                    # Update parent's children
                    if parent_id is not None:
                        community_nodes[parent_id].children_ids.extend([c0_id, c1_id])
                        
                    # For recursive calls
                    c0_parent_id = c0_id
                    c1_parent_id = c1_id
            else:
                division_tree[level].append(community)
                # No splitting happened, so no new nodes to create

        if level == max_depth:
            if c0 and c1:
                return [c0] + [c1]
            elif c0:
                return [c0]
            else:
                return [c1]
        else:
            if c0 and c1:
                return self._hierarchical_search_recursion(
                    verbosity,
                    max_depth,
                    level=level + 1,
                    community=c0,
                    division_tree=division_tree,
                    samples=samples,
                    community_nodes=community_nodes,
                    next_node_id=next_node_id,
                    parent_id=c0_parent_id if community_nodes is not None else None,
                ) + self._hierarchical_search_recursion(
                    verbosity,
                    max_depth,
                    level=level + 1,
                    community=c1,
                    division_tree=division_tree,
                    samples=samples,
                    community_nodes=community_nodes,
                    next_node_id=next_node_id + len(c0),  # Adjust ID to avoid collisions
                    parent_id=c1_parent_id if community_nodes is not None else None,
                )
            elif c0:
                return [c0]
            else:
                return [c1]
    
    def _prepare_hierarchical_result(self, result, division_tree, division_modularities, samples, community_nodes):
        """Prepare the hierarchical result with parent-child relationships."""
        # Add modularity to each community node if available
        if division_modularities:
            for level, mod in enumerate(division_modularities):
                # Find nodes at this level
                level_nodes = [node for node in community_nodes.values() if node.level == level]
                for node in level_nodes:
                    # If we have per-community modularity, assign it
                    # Otherwise assign the level's overall modularity
                    node.modularity = mod

        # Add sample info to each node if available
        if samples:
            for node_id, node in community_nodes.items():
                # Find the sample that corresponds to this community
                # This is approximate - you might need to adjust based on your data structure
                matching_samples = [s for s in samples if set(node.community) == set(s.community)]
                if matching_samples:
                    node.sample_info = matching_samples[0]

        # Create a structured hierarchy result
        hierarchy_result = {
            'communities': result,
            'community_tree': community_nodes,
            'division_tree': division_tree,
            'division_modularities': division_modularities,
            'samples': samples
        }
        
        return hierarchy_result
    

    def _split_dict_to_lists(self, dictionary, community):
        c0, c1 = [], []
        for i in community:
            value = dictionary.get(f"x{i}", None)
            if value == 0:
                c0.append(i)
            else:
                c1.append(i)
        return c0, c1

    def _flatten_list_to_set(self, list) -> set:
        result = set()
        for sublist in list:
            for item in sublist:
                result.add(item)

        return result

In [11]:
import networkx as nx
import matplotlib.pyplot as plt
from typing import Dict, List
import numpy as np

class HierarchyVisualizer:
    @staticmethod
    def create_hierarchy_graph(community_nodes: Dict) -> nx.DiGraph:
        """Create a directed graph representing the community hierarchy."""
        G = nx.DiGraph()
        
        # Add nodes
        for node_id, node in community_nodes.items():
            community_size = len(node.community)
            G.add_node(
                node_id, 
                community=node.community,
                level=node.level,
                size=community_size,
                label=f"L{node.level}:{community_size}"
            )
        
        # Add edges
        for node_id, node in community_nodes.items():
            for child_id in node.children_ids:
                G.add_edge(node_id, child_id)
        
        return G
    
    @staticmethod
    def plot_hierarchy(community_nodes: Dict, figsize=(12, 10)):
        """Plot the community hierarchy graph."""
        G = HierarchyVisualizer.create_hierarchy_graph(community_nodes)
        
        # Position nodes by level
        pos = {}
        levels = {}
        
        # Group nodes by level
        for node_id, data in G.nodes(data=True):
            level = data['level']
            if level not in levels:
                levels[level] = []
            levels[level].append(node_id)
        
        # Position nodes in layers with x coordinate based on tree structure
        max_level = max(levels.keys())
        
        # Start from the bottom and work up
        for level in range(max_level, -1, -1):
            nodes = sorted(levels[level])
            y = max_level - level
            
            if level == max_level:
                # Position leaf nodes evenly
                width = max(1, len(nodes))
                for i, node_id in enumerate(nodes):
                    pos[node_id] = (i/width, y)
            else:
                # Position internal nodes based on their children's positions
                for node_id in nodes:
                    children = [c for c in G.successors(node_id)]
                    if children:
                        # Place parent between its leftmost and rightmost children
                        x_vals = [pos[c][0] for c in children]
                        pos[node_id] = (sum(x_vals)/len(children), y)
                    else:
                        # No children (shouldn't happen except at max_level)
                        pos[node_id] = (len(pos), y)
        
        plt.figure(figsize=figsize)
        
        # Draw the graph
        node_sizes = [G.nodes[n]['size']*100 for n in G.nodes]
        node_colors = [G.nodes[n]['level'] for n in G.nodes]
        labels = {n: G.nodes[n]['label'] for n in G.nodes}
        
        nx.draw(G, pos, 
                with_labels=True, 
                labels=labels,
                node_size=node_sizes,
                node_color=node_colors, 
                cmap=plt.cm.viridis,
                arrows=True,
                arrowsize=20)
        
        plt.title("Community Hierarchy")
        plt.tight_layout()
        return plt

In [15]:
# Example usage
G = nx.powerlaw_cluster_graph(n=100, m=1, p=0.1)
advantage = AdvantageSampler(
    G, num_reads=100, version="", region="na-west-1", 
    use_clique_embedding=True, measure_times=True, 
    return_sampleset_info=True
)
hierarchical_searcher = HierarchicalSearcher(advantage)

# Enable hierarchy tracking
result = hierarchical_searcher.hierarchical_community_search(
    division_tree=True,
    return_modularities=True,
    return_sampleset_info=True,
    track_hierarchy=True  # Enable tracking parent-child relationships
)

# Access the structured result
communities = result['communities']
community_tree = result['community_tree']
division_tree = result['division_tree']
division_modularities = result['division_modularities']

# Visualize the hierarchy
from Qommunity.utils.hierarchy_visualizer import HierarchyVisualizer
HierarchyVisualizer.plot_hierarchy(community_tree)
plt.show()

NotAPartition: [[0, 1, 14, 22, 29, 32, 36, 40, 42, 45, 48, 54, 64, 65, 67, 77, 84, 90, 92, 97, 99], [20, 50], [5, 37, 58], [16, 30, 47], [6, 12, 25, 34, 66, 72], [2, 11, 33, 38, 39, 83, 89], [56, 57, 62], [9, 31, 68, 70, 82], [7, 19, 23, 26, 53, 86], [8, 27, 91], [43, 55, 73, 76], [71, 78, 80], [21, 75], [28, 52, 63, 85], [13, 24, 35, 61, 81, 87, 98]] is not a valid partition of the graph Graph with 100 nodes and 99 edges