<a href="https://colab.research.google.com/github/mugalan/working/blob/main/hierarchical_graphs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# References

* https://arxiv.org/pdf/2303.03293
* https://ris.utwente.nl/ws/files/52018562/Kienreich2012graph.pdf

In [None]:
import networkx as nx

# Create a inner-level multi-edged graph
inner_graph = nx.MultiGraph()

# Add nodes (e.g., individual devices)
inner_graph.add_nodes_from(['D1', 'D2', 'D3', 'D4'])

# Add multiple edges with attributes (e.g., connection type, weight)
inner_graph.add_edge('D1', 'D2', type='Ethernet', weight=1.0)
inner_graph.add_edge('D1', 'D2', type='Fiber', weight=2.0)
inner_graph.add_edge('D3', 'D4', type='Wireless', weight=0.5)
inner_graph.add_edge('D1', 'D3', type='Ethernet', weight=1.0)


In [None]:
# Map inner-level nodes to outer-level groups (meta-nodes)
node_to_cluster_map = {'D1': 'GroupA', 'D2': 'GroupA', 'D3': 'GroupB', 'D4': 'GroupB'}
nx.set_node_attributes(inner_graph, node_to_cluster_map, name='group')

# Define the outer-level groups as the nodes for the outer-level graph
outer_level_nodes = set(nx.get_node_attributes(inner_graph, 'group').values())
# outer_level_nodes will be {'GroupA', 'GroupB'}


In [None]:
node_to_cluster_map = {'D1': 'GroupA', 'D2': 'GroupA', 'D3': 'GroupB', 'D4': 'GroupB'}

d

In [None]:
!pip install graphviz
# or pip install pydot (if pygraphviz has issues)


In [None]:
import graphviz
import networkx as nx
# Assuming 'outer_graph' is the NetworkX graph from the previous steps

def visualize_outer_graph(nx_graph, filename='outer_level_graph'):
    # Create a Graphviz object (using Graph for undirected)
    dot = graphviz.Graph(comment='outer Level Graph', engine='dot')

    # Add nodes
    for node in nx_graph.nodes:
        dot.node(str(node))

    # Add edges with attributes (e.g., count, types)
    for u, v, data in nx_graph.edges(data=True):
        label = f"Count: {data['count']}\\nTypes: {', '.join(data['connection_types'])}"
        dot.edge(str(u), str(v), label=label)

    # Render the graph
    dot.render(filename, view=True, format='png')
    print(f"outer-level graph saved as {filename}.png")

# Call the function:
# visualize_outer_graph(outer_graph)


In [None]:
visualize_outer_graph(outer_graph)

In [None]:
import graphviz
import networkx as nx

# Assuming 'inner_graph' and 'node_to_cluster_map' are defined from the previous step
# node_to_cluster_map = {'D1': 'GroupA', 'D2': 'GroupA', 'D3': 'GroupB', 'D4': 'GroupB'}

def visualize_inner_graph_with_clusters(graph, node_to_cluster_map, filename="inner_level_graph"):
    # This function uses the 'graphviz' library's Source object to render raw DOT code.
    dot_source = 'graph G {\n' # Changed from 'digraph' to 'graph' for undirected layout
    dot_source += '    rankdir="LR";\n'
    dot_source += '    node [shape=box];\n' # Make nodes look like boxes

    # Define clusters
    clusters = {}
    for node, cluster_id in node_to_cluster_map.items():
        if cluster_id not in clusters:
            clusters[cluster_id] = []
        clusters[cluster_id].append(node)

    for cluster_id, nodes in clusters.items():
        dot_source += f'    subgraph cluster_{cluster_id} {{\n'
        dot_source += f'        label="{cluster_id}";\n'
        dot_source += '        color=blue;\n'
        dot_source += '        style=filled;\n' # Add background fill
        dot_source += '        fillcolor=lightyellow;\n'
        dot_source += '        ' + " ".join([f'"{node}"' for node in nodes]) + ';\n'
        dot_source += '    }\n'

    # Add edges, including multi-edge attributes
    # Note: Graphviz naturally handles multiple edges in 'graph' mode, visually offsetting them slightly.
    for u, v, key, data in graph.edges(keys=True, data=True):
        label = data.get('type', '')
        # Using the key ensures graphviz treats them as distinct edges
        dot_source += f'    "{u}" -- "{v}" [label="{label}", id={key}];\n' # Use -- for undirected

    dot_source += '}\n'

    # Render the dot source using the graphviz library
    dot = graphviz.Source(dot_source, format='png')
    dot.render(filename, view=True) # Renders and opens the image
    print(f"inner-level graph with clusters saved as {filename}.png")

# Call the function:
# visualize_inner_graph_with_clusters(inner_graph, node_to_cluster_map)


In [None]:
visualize_inner_graph_with_clusters(inner_graph, node_to_cluster_map)

In [None]:
Nodes = [
    {'label': 'A', 'group': 'Group A', 'type': 'system'},
    {'label': 'B', 'group': 'Group A', 'type': 'user'},
    {'label': 'C', 'group': 'Group B', 'type': 'system'},
    {'label': 'D', 'group': 'Group B', 'type': 'user'},
    {'label': 'E', 'group': 'Group C', 'type': 'system'},
]

Edges = [
    {'start': 'A', 'end': 'B', 'type': 'type1', 'weight': 0.4},
    {'start': 'A', 'end': 'C', 'type': 'type2', 'weight': 0.8},
    {'start': 'B', 'end': 'C', 'type': 'type1', 'weight': 0.5},
    {'start': 'C', 'end': 'E', 'type': 'type3', 'weight': 0.1},
    {'start': 'C', 'end': 'E', 'type': 'type1', 'weight': 0.9}, # Multi-edge here
    {'start': 'D', 'end': 'E', 'type': 'type2', 'weight': 0.3},
]

In [None]:
import networkx as nx

class HierarchicalGraph:
    def create_hierarchical_graphs_iterative(self, nodes_data, edges_data):
        # --- 1. Create inner Level Graph ---
        inner_graph = nx.MultiGraph()

        # Add nodes with all attributes
        for node_data in nodes_data:
            label = node_data['label']
            # Unpack all other data as attributes
            attributes = {k: v for k, v in node_data.items() if k != 'label'}
            inner_graph.add_node(label, **attributes)

        # Add edges with all attributes
        for edge_data in edges_data:
            start_node = edge_data['start']
            end_node = edge_data['end']
            # Unpack all other data as attributes
            attributes = {k: v for k, v in edge_data.items() if k not in ['start', 'end']}
            # NetworkX naturally handles the multi-edge key for us here
            inner_graph.add_edge(start_node, end_node, **attributes)

        # --- 2. Create outer Level (Aggregated) Graph ---
        outer_graph = nx.Graph()

        # Use a dictionary to track accumulated edge data between groups
        meta_edges_data = {}

        for u, v, data in inner_graph.edges(data=True):
            group_u = inner_graph.nodes[u]['group']
            group_v = inner_graph.nodes[v]['group']

            if group_u != group_v:
                # Sort keys for undirected graph consistency
                meta_edge_key = tuple(sorted((group_u, group_v)))

                if meta_edge_key not in meta_edges_data:
                    meta_edges_data[meta_edge_key] = {'count': 0, 'connection_types': set(), 'total_weight': 0.0}

                meta_edges_data[meta_edge_key]['count'] += 1
                meta_edges_data[meta_edge_key]['connection_types'].add(data['type'])
                meta_edges_data[meta_edge_key]['total_weight'] += data['weight']

        # Add nodes and edges to the outer level graph
        outer_graph.add_nodes_from(set(d['group'] for d in nodes_data))

        for (u_group, v_group), data in meta_edges_data.items():
            # Convert sets to lists for easy data storage/visualization compatibility
            data['connection_types'] = list(data['connection_types'])
            data['average_weight'] = data['total_weight'] / data['count']
            outer_graph.add_edge(u_group, v_group, **data)

        return inner_graph, outer_graph

    def visualize_outer_graph(self, nx_graph, filename='outer_level_graph'):
        # Create a Graphviz object (using Graph for undirected)
        dot = graphviz.Graph(comment='outer Level Graph', engine='dot')

        # Add nodes
        for node in nx_graph.nodes:
            dot.node(str(node))

        # Add edges with attributes (e.g., count, types)
        for u, v, data in nx_graph.edges(data=True):
            label = f"Count: {data['count']}\\nTypes: {', '.join(data['connection_types'])}"
            dot.edge(str(u), str(v), label=label)

        # Render the graph
        dot.render(filename, view=True, format='png')
        print(f"outer-level graph saved as {filename}.png")

    def visualize_inner_graph_with_clusters(self, graph, filename="inner_level_graph"):
        """
        Visualizes the inner-level graph with nodes clustered into their
        assigned groups using graphviz subgraphs, saving as PNG and displaying.
        """
        # Start building the raw DOT source string
        dot_source = 'graph G {\n'
        dot_source += '    rankdir="LR";\n'
        dot_source += '    node [shape=box, style="filled", fillcolor="white"];\n'

        # Group nodes by their 'group' attribute
        clusters = {}
        for node, attributes in graph.nodes(data=True):
            group_id = attributes.get('group', 'DefaultGroup')
            if group_id not in clusters:
                clusters[group_id] = []
            clusters[group_id].append(node)

        # Define clusters in DOT language
        colors = ['lightblue', 'lightgreen', 'lightyellow', 'lightpink']
        for i, (cluster_id, nodes) in enumerate(clusters.items()):
            color = colors[i % len(colors)]
            dot_source += f'    subgraph cluster_{cluster_id.replace(" ", "_")} {{\n'
            dot_source += f'        label="{cluster_id}";\n'
            dot_source += f'        bgcolor="{color}";\n'
            # Add nodes to the subgraph
            dot_source += '        ' + " ".join([f'"{node}"' for node in nodes]) + ';\n'
            dot_source += '    }\n'

        # Add edges, including multi-edge attributes (Graphviz naturally offsets them)
        for u, v, key, data in graph.edges(keys=True, data=True):
            label = data.get('type', '')
            # Use a thicker penwidth based on weight
            penwidth = str(data.get('weight', 1.0) * 2)
            # Use a unique ID for each edge (the key from MultiGraph)
            dot_source += f'    "{u}" -- "{v}" [label="{label}", id={key}, penwidth={penwidth}];\n'

        dot_source += '}\n'

        # Render the dot source using the graphviz library
        dot = graphviz.Source(dot_source, format='png')
        dot.render(filename, view=True) # Renders and opens the image
        print(f"inner-level graph with clusters saved as {filename}.png and displayed.")


In [None]:
# Example usage:
visualize_outer_graph(outer_graph_1)


In [None]:
import graphviz

def visualize_inner_graph_with_clusters(graph, filename="inner_level_graph"):
    """
    Visualizes the inner-level graph with nodes clustered into their
    assigned groups using graphviz subgraphs, saving as PNG and displaying.
    """
    # Start building the raw DOT source string
    dot_source = 'graph G {\n'
    dot_source += '    rankdir="LR";\n'
    dot_source += '    node [shape=box, style="filled", fillcolor="white"];\n'

    # Group nodes by their 'group' attribute
    clusters = {}
    for node, attributes in graph.nodes(data=True):
        group_id = attributes.get('group', 'DefaultGroup')
        if group_id not in clusters:
            clusters[group_id] = []
        clusters[group_id].append(node)

    # Define clusters in DOT language
    colors = ['lightblue', 'lightgreen', 'lightyellow', 'lightpink']
    for i, (cluster_id, nodes) in enumerate(clusters.items()):
        color = colors[i % len(colors)]
        dot_source += f'    subgraph cluster_{cluster_id.replace(" ", "_")} {{\n'
        dot_source += f'        label="{cluster_id}";\n'
        dot_source += f'        bgcolor="{color}";\n'
        # Add nodes to the subgraph
        dot_source += '        ' + " ".join([f'"{node}"' for node in nodes]) + ';\n'
        dot_source += '    }\n'

    # Add edges, including multi-edge attributes (Graphviz naturally offsets them)
    for u, v, key, data in graph.edges(keys=True, data=True):
        label = data.get('type', '')
        # Use a thicker penwidth based on weight
        penwidth = str(data.get('weight', 1.0) * 2)
        # Use a unique ID for each edge (the key from MultiGraph)
        dot_source += f'    "{u}" -- "{v}" [label="{label}", id={key}, penwidth={penwidth}];\n'

    dot_source += '}\n'

    # Render the dot source using the graphviz library
    dot = graphviz.Source(dot_source, format='png')
    dot.render(filename, view=True) # Renders and opens the image
    print(f"inner-level graph with clusters saved as {filename}.png and displayed.")

# Example usage:
# visualize_inner_graph_with_clusters(inner_graph)


In [None]:
visualize_inner_graph_with_clusters(inner_graph_1)

# Class Definition

In [None]:
import networkx as nx
import graphviz
from IPython.display import display, Image

class HierarchicalGraph:
    def __init__(self, nodes_data, edges_data):
        self.nodes_data = nodes_data
        self.edges_data = edges_data
        self.inner_graph = nx.MultiGraph()
        self.outer_graph = nx.Graph()
        self.create_hierarchical_graphs_iterative()

    def create_hierarchical_graphs_iterative(self):
        inner_graph = nx.MultiGraph()

        for node_data in self.nodes_data:
            label = node_data['label']
            attributes = {k: v for k, v in node_data.items() if k != 'label'}
            inner_graph.add_node(label, **attributes)

        for edge_data in self.edges_data:
            start = edge_data['start']
            end = edge_data['end']
            attributes = {k: v for k, v in edge_data.items() if k not in ['start', 'end']}
            inner_graph.add_edge(start, end, **attributes)

        outer_graph = nx.Graph()
        meta_edges_data = {}

        for u, v, data in inner_graph.edges(data=True):
            group_u = inner_graph.nodes[u]['group']
            group_v = inner_graph.nodes[v]['group']

            if group_u != group_v:
                key = tuple(sorted((group_u, group_v)))
                if key not in meta_edges_data:
                    meta_edges_data[key] = {'count': 0, 'connection_types': set(), 'total_weight': 0.0}

                meta_edges_data[key]['count'] += 1
                meta_edges_data[key]['connection_types'].add(data.get('type', 'unknown'))
                meta_edges_data[key]['total_weight'] += data.get('weight', 1.0)

        outer_graph.add_nodes_from(set(d['group'] for d in self.nodes_data))

        for (u_group, v_group), data in meta_edges_data.items():
            data['connection_types'] = list(data['connection_types'])
            data['average_weight'] = data['total_weight'] / data['count']
            outer_graph.add_edge(u_group, v_group, **data)

        self.inner_graph = inner_graph
        self.outer_graph = outer_graph

    # --------------------------
    # Node Operations
    # --------------------------
    def add_node(self, node_data):
        self.nodes_data.append(node_data)
        self.create_hierarchical_graphs_iterative()

    def edit_node(self, label, new_data):
        for node in self.nodes_data:
            if node['label'] == label:
                node.update(new_data)
                break
        self.create_hierarchical_graphs_iterative()

    def delete_node(self, label):
        self.nodes_data = [node for node in self.nodes_data if node['label'] != label]
        self.edges_data = [edge for edge in self.edges_data if edge['start'] != label and edge['end'] != label]
        self.create_hierarchical_graphs_iterative()

    # --------------------------
    # Edge Operations
    # --------------------------
    def add_edge(self, edge_data):
        self.edges_data.append(edge_data)
        self.create_hierarchical_graphs_iterative()

    def edit_edge(self, start, end, new_data):
        for edge in self.edges_data:
            if edge['start'] == start and edge['end'] == end:
                edge.update(new_data)
                break
        self.create_hierarchical_graphs_iterative()

    def delete_edge(self, start, end):
        self.edges_data = [edge for edge in self.edges_data if not (edge['start'] == start and edge['end'] == end)]
        self.create_hierarchical_graphs_iterative()

    # --------------------------
    # Visualization Methods
    # --------------------------
    def visualize_outer_graph(self, filename='outer_level_graph'):
        dot = graphviz.Graph(comment='Outer Level Graph', engine='dot')

        for node in self.outer_graph.nodes:
            dot.node(str(node))

        for u, v, data in self.outer_graph.edges(data=True):
            label = f"Count: {data['count']}\\nTypes: {', '.join(data['connection_types'])}"
            dot.edge(str(u), str(v), label=label)

        filepath = dot.render(filename, format='png', cleanup=True)
        print(f"Outer-level graph saved as {filepath}")
        try:
            display(Image(filename=filepath))
        except:
            print("Open the saved image to view the graph.")

    def visualize_inner_graph_with_clusters(self, filename="inner_level_graph"):
        dot_source = 'graph G {\n'
        dot_source += '    rankdir="LR";\n'
        dot_source += '    node [shape=box, style="filled", fillcolor="white"];\n'

        clusters = {}
        for node, attrs in self.inner_graph.nodes(data=True):
            group = attrs.get('group', 'DefaultGroup')
            clusters.setdefault(group, []).append(node)

        colors = ['lightblue', 'lightgreen', 'lightyellow', 'lightpink', 'lightgray']
        for i, (group, nodes) in enumerate(clusters.items()):
            color = colors[i % len(colors)]
            cluster_id = group.replace(" ", "_")
            dot_source += f'    subgraph cluster_{cluster_id} {{\n'
            dot_source += f'        label="{group}";\n'
            dot_source += f'        bgcolor="{color}";\n'
            dot_source += '        ' + ' '.join(f'"{node}"' for node in nodes) + ';\n'
            dot_source += '    }\n'

        for u, v, key, data in self.inner_graph.edges(keys=True, data=True):
            label = data.get('type', '')
            penwidth = str(data.get('weight', 1.0) * 2)
            dot_source += f'    "{u}" -- "{v}" [label="{label}", id="{key}", penwidth={penwidth}];\n'

        dot_source += '}\n'

        dot = graphviz.Source(dot_source, format='png')
        filepath = dot.render(filename, cleanup=True)
        print(f"Inner-level graph saved as {filepath}")
        try:
            display(Image(filename=filepath))
        except:
            print("Open the saved image to view the graph.")


In [None]:
nodes = [
    {"label": "A", "group": "G1"},
    {"label": "B", "group": "G1"},
    {"label": "C", "group": "G2"},
    {"label": "D", "group": "G2"},
]

edges = [
    {"start": "A", "end": "B", "type": "friend", "weight": 1},
    {"start": "A", "end": "C", "type": "colleague", "weight": 2},
    {"start": "B", "end": "D", "type": "colleague", "weight": 3},
]

hg = HierarchicalGraph(nodes, edges)
hg.visualize_inner_graph_with_clusters()
hg.visualize_outer_graph()
