In [53]:
import networkx as nx
import pandas as pd
import matplotlib.pyplot as plt
import json
import numpy as np
from typing import List, Tuple, Dict, Any

In [54]:
import networkx as nx
import numpy as np
import colorsys

def synthesize_graph(
    total_nodes: int,
    total_layers: int,
    distribution: str,
    clear_cut_layer: int,
    layer_names: list = None
) -> nx.DiGraph:
    """
    Synthesize a directed graph based on given parameters.
    
    :param total_nodes: Total number of nodes in the graph
    :param total_layers: Total number of layers in the graph
    :param distribution: Distribution of nodes across layers ('uniform', 'normal', 'pos_exp', 'neg_exp')
    :param clear_cut_layer: Layer number below which there's only one parent per node
    :param layer_names: Optional list of layer names. If provided, should have at least one entry.
    :return: NetworkX DiGraph object
    """
    G = nx.DiGraph()
    
    # Distribute nodes across layers
    if distribution == 'uniform':
        nodes_per_layer = [total_nodes // total_layers] * total_layers
    elif distribution == 'normal':
        nodes_per_layer = np.random.normal(total_nodes / total_layers, total_nodes / (4 * total_layers), total_layers)
    elif distribution == 'pos_exp':
        nodes_per_layer = np.exp(np.linspace(0, 1, total_layers))
    elif distribution == 'neg_exp':
        nodes_per_layer = np.exp(np.linspace(1, 0, total_layers))
    else:
        raise ValueError("Invalid distribution type")
    
    nodes_per_layer = np.round(nodes_per_layer / sum(nodes_per_layer) * total_nodes).astype(int)
    nodes_per_layer[0] = 1  # Ensure there's only one root node
    nodes_per_layer[-1] += total_nodes - sum(nodes_per_layer)  # Adjust last layer to match total_nodes
    
    # Generate colors for layers
    colors = [colorsys.hsv_to_rgb(i / total_layers, 0.7, 0.9) for i in range(total_layers)]
    colors = ['#{:02x}{:02x}{:02x}'.format(int(r * 255), int(g * 255), int(b * 255)) for r, g, b in colors]
    
    # Handle layer names
    if layer_names is None:
        layer_names = [f"Layer {i+1}" for i in range(total_layers)]
    elif len(layer_names) < total_layers:
        base_name = layer_names[-1].split()[0]
        layer_names.extend([f"{base_name} {i+1}" for i in range(len(layer_names), total_layers)])
    
    # Create nodes
    node_id = 0
    for layer, num_nodes in enumerate(nodes_per_layer):
        for _ in range(num_nodes):
            G.add_node(node_id, layer=layer, color=colors[layer], layer_name=layer_names[layer])
            node_id += 1
    
    # Create edges
    for layer in range(1, total_layers):
        current_layer_nodes = [n for n, d in G.nodes(data=True) if d['layer'] == layer]
        previous_layer_nodes = [n for n, d in G.nodes(data=True) if d['layer'] == layer - 1]
        
        for node in current_layer_nodes:
            if layer >= clear_cut_layer:
                # Above or at clear cut layer: Connect to multiple parents
                num_parents = np.random.randint(1, min(3, len(previous_layer_nodes) + 1))
                parents = np.random.choice(previous_layer_nodes, num_parents, replace=False)
            else:
                # Below clear cut layer: Connect to a single parent
                parents = [np.random.choice(previous_layer_nodes)]
            
            for parent in parents:
                G.add_edge(parent, node)
    
    return G

In [55]:
import networkx as nx
import numpy as np
import colorsys

def synthesize_graph(
    total_nodes: int,
    total_layers: int,
    distribution: str,
    clear_cut_layer: int,
    layer_names: list = None
) -> nx.DiGraph:
    """
    Synthesize a directed graph based on given parameters.
    
    :param total_nodes: Total number of nodes in the graph
    :param total_layers: Total number of layers in the graph
    :param distribution: Distribution of nodes across layers ('uniform', 'normal', 'pos_exp', 'neg_exp')
    :param clear_cut_layer: Layer number below which there's only one parent per node
    :param layer_names: Optional list of layer names. If provided, should have at least one entry.
    :return: NetworkX DiGraph object
    """
    G = nx.DiGraph()
    
    # Distribute nodes across layers
    if distribution == 'uniform':
        nodes_per_layer = [total_nodes // total_layers] * total_layers
    elif distribution == 'normal':
        nodes_per_layer = np.random.normal(total_nodes / total_layers, total_nodes / (4 * total_layers), total_layers)
    elif distribution == 'pos_exp':
        nodes_per_layer = np.exp(np.linspace(0, 1, total_layers))
    elif distribution == 'neg_exp':
        nodes_per_layer = np.exp(np.linspace(1, 0, total_layers))
    else:
        raise ValueError("Invalid distribution type")
    
    nodes_per_layer = np.round(nodes_per_layer / sum(nodes_per_layer) * total_nodes).astype(int)
    nodes_per_layer[0] = 1  # Ensure there's only one root node
    nodes_per_layer[-1] += total_nodes - sum(nodes_per_layer)  # Adjust last layer to match total_nodes
    
    # Generate colors for layers
    colors = [colorsys.hsv_to_rgb(i / total_layers, 0.7, 0.9) for i in range(total_layers)]
    colors = ['#{:02x}{:02x}{:02x}'.format(int(r * 255), int(g * 255), int(b * 255)) for r, g, b in colors]
    
    # Handle layer names
    if layer_names is None:
        layer_names = [f"Layer{i+1}" for i in range(total_layers)]
    elif len(layer_names) < total_layers:
        base_name = layer_names[-1].split()[0]
        layer_names.extend([f"{base_name}{i+1}" for i in range(len(layer_names), total_layers)])
    
    # Create nodes with new naming convention
    node_id = 0
    for layer, num_nodes in enumerate(nodes_per_layer):
        layer_name = layer_names[layer]
        for node_in_layer in range(num_nodes):
            if layer < len(layer_names) - 1:
                node_name = f"{layer_name} {node_in_layer + 1}"
            else:
                node_name = f"{layer_name} {layer + 1} {node_in_layer + 1}"
            G.add_node(node_name, layer=layer, color=colors[layer], layer_name=layer_name)
            node_id += 1
    
    # Create edges
    for layer in range(1, total_layers):
        current_layer_nodes = [n for n, d in G.nodes(data=True) if d['layer'] == layer]
        previous_layer_nodes = [n for n, d in G.nodes(data=True) if d['layer'] == layer - 1]
        
        for node in current_layer_nodes:
            if layer >= clear_cut_layer:
                # Above or at clear cut layer: Connect to multiple parents
                num_parents = np.random.randint(1, min(3, len(previous_layer_nodes) + 1))
                parents = np.random.choice(previous_layer_nodes, num_parents, replace=False)
            else:
                # Below clear cut layer: Connect to a single parent
                parents = [np.random.choice(previous_layer_nodes)]
            
            for parent in parents:
                G.add_edge(parent, node)
    
    return G

# Example usage:
# G = synthesize_graph(total_nodes=100, total_layers=5, distribution='uniform', clear_cut_layer=3, layer_names=['Root', 'Intermediate', 'Advanced', 'Expert'])

In [56]:
def save_graph_to_json(G: nx.DiGraph, filename: str) -> None:
    """
    Save the graph to a JSON file.
    
    :param G: NetworkX DiGraph object
    :param filename: Name of the file to save the JSON data
    """
    data = nx.node_link_data(G)
    with open(filename, 'w') as f:
        json.dump(data, f)

In [57]:
def save_graph_to_csv(G: nx.DiGraph, nodes_filename: str, edges_filename: str) -> None:
    """
    Save the graph to CSV files (one for nodes, one for edges).
    
    :param G: NetworkX DiGraph object
    :param nodes_filename: Name of the file to save node data
    :param edges_filename: Name of the file to save edge data
    """
    # Save nodes
    nodes_df = pd.DataFrame.from_dict(dict(G.nodes(data=True)), orient='index')
    nodes_df.to_csv(nodes_filename)
    
    # Save edges
    edges_df = pd.DataFrame(G.edges(), columns=['source', 'target'])
    edges_df.to_csv(edges_filename, index=False)

In [58]:
def visualize_graph(G: nx.DiGraph, filename: str) -> None:
    """
    Visualize the graph and save it as an image.
    
    :param G: NetworkX DiGraph object
    :param filename: Name of the file to save the visualization
    """
    pos = nx.spring_layout(G)
    plt.figure(figsize=(12, 8))
    nx.draw(G, pos, with_labels=True, node_color='lightblue', 
            node_size=500, arrowsize=20, width=0.5)
    
    # Add layer information to the visualization
    layer_labels = nx.get_node_attributes(G, 'layer')
    nx.draw_networkx_labels(G, pos, labels={node: f"{node}\n(L{layer})" for node, layer in layer_labels.items()})
    
    plt.title("Synthesized Network Visualization")
    plt.axis('off')
    plt.tight_layout()
    plt.savefig(filename, format='png', dpi=300, bbox_inches='tight')
    plt.close()

In [59]:
def create_hierarchical_layout(G: nx.DiGraph) -> Dict[int, Tuple[float, float]]:
    """
    Create a hierarchical layout for the graph.
    
    :param G: NetworkX DiGraph object
    :return: Dictionary of node positions
    """
    layers = nx.get_node_attributes(G, 'layer')
    max_layer = max(layers.values())
    
    # Perform topological sort
    topo_sort = list(nx.topological_sort(G))
    
    # Create layout
    pos = {}
    layer_counts = {i: 0 for i in range(max_layer + 1)}
    layer_widths = {i: sum(1 for n in G.nodes if layers[n] == i) for i in range(max_layer + 1)}
    
    for node in topo_sort:
        layer = layers[node]
        x = layer_counts[layer] - (layer_widths[layer] - 1) / 2
        y = max_layer - layer  # Invert y-coordinate for top-down layout
        pos[node] = (x, y)
        layer_counts[layer] += 1
    
    return pos

In [60]:
import networkx as nx
import matplotlib.pyplot as plt
from matplotlib.colors import to_rgba

def create_hierarchical_layout(G: nx.DiGraph) -> dict:
    """Create a hierarchical layout for the graph."""
    pos = {}
    layers = nx.get_node_attributes(G, 'layer')
    max_layer = max(layers.values())
    nodes_by_layer = {layer: [] for layer in range(max_layer + 1)}
    
    for node, layer in layers.items():
        nodes_by_layer[layer].append(node)
    
    for layer, nodes in nodes_by_layer.items():
        y = max_layer - layer
        width = len(nodes)
        for i, node in enumerate(nodes):
            pos[node] = (i - width / 2, y)
    
    return pos

def visualize_graph_hierarchical(G: nx.DiGraph, filename: str) -> None:
    """
    Visualize the graph with a hierarchical layout and save it as an image.
    
    :param G: NetworkX DiGraph object
    :param filename: Name of the file to save the visualization
    """
    pos = create_hierarchical_layout(G)
    
    plt.figure(figsize=(16, 12))
    
    # Draw edges
    nx.draw_networkx_edges(G, pos, edge_color='gray', arrows=True, arrowsize=10, width=0.5)
    
    # Draw nodes
    node_colors = [data.get('color', '#1f78b4') for _, data in G.nodes(data=True)]  # Default color if 'color' is missing
    nx.draw_networkx_nodes(G, pos, node_color=node_colors, node_size=700, node_shape='o')
    
    # Add node labels
    labels = {node: f"{node}\n({data.get('layer_name', f'Layer {data.get('layer', 0)}')})" for node, data in G.nodes(data=True)}
    nx.draw_networkx_labels(G, pos, labels, font_size=8)
    
    # Add layer lines and labels
    layers = nx.get_node_attributes(G, 'layer')
    max_layer = max(layers.values())
    layer_names = nx.get_node_attributes(G, 'layer_name')
    unique_layer_names = list(dict.fromkeys(layer_names.values()))
    
    for layer in range(max_layer + 1):
        y = max_layer - layer
        plt.axhline(y=y, color='gray', linestyle='--', alpha=0.5)
        layer_name = unique_layer_names[min(layer, len(unique_layer_names) - 1)] if unique_layer_names else f"Layer {layer}"
        plt.text(-1, y, layer_name, verticalalignment='center', fontweight='bold')
    
    plt.title("Hierarchical Network Visualization", fontsize=16, fontweight='bold')
    plt.axis('off')
    plt.tight_layout()
    plt.savefig(filename, format='png', dpi=300, bbox_inches='tight')
    plt.close()

# Example usage:
# G = synthesize_graph(total_nodes=50, total_layers=5, distribution='uniform', clear_cut_layer=3, layer_names=['Root', 'Intermediate', 'Advanced', 'Expert'])
# visualize_graph_hierarchical(G, 'hierarchical_graph.png')

In [61]:
import networkx as nx
import matplotlib.pyplot as plt
from matplotlib.colors import to_rgba
from collections import defaultdict

def minimize_crossings(G, layers):
    """Attempt to minimize edge crossings by reordering nodes within layers."""
    for i in range(1, len(layers)):
        upper_layer = layers[i-1]
        current_layer = layers[i]
        
        # Calculate barycenter for each node in the current layer
        barycenters = {}
        for node in current_layer:
            parents = list(G.predecessors(node))
            if parents:
                barycenters[node] = sum(upper_layer.index(p) for p in parents) / len(parents)
            else:
                barycenters[node] = len(upper_layer) / 2  # Place nodes without parents in the middle
        
        # Sort current layer based on barycenters
        layers[i] = sorted(current_layer, key=lambda n: barycenters[n])
    
    return layers

def create_improved_hierarchical_layout(G: nx.DiGraph) -> dict:
    """Create an improved hierarchical layout for the graph with minimal edge crossings."""
    layers = defaultdict(list)
    for node, data in G.nodes(data=True):
        layers[data['layer']].append(node)
    
    # Sort layers by their key (layer number)
    sorted_layers = [layers[i] for i in sorted(layers.keys())]
    
    # Minimize crossings
    sorted_layers = minimize_crossings(G, sorted_layers)
    
    pos = {}
    max_layer = len(sorted_layers) - 1
    for layer, nodes in enumerate(sorted_layers):
        y = max_layer - layer
        width = len(nodes)
        for i, node in enumerate(nodes):
            pos[node] = (i - width / 2, y)
    
    return pos

def visualize_graph_hierarchical(G: nx.DiGraph, filename: str) -> None:
    """
    Visualize the graph with an improved hierarchical layout and save it as an image.
    
    :param G: NetworkX DiGraph object
    :param filename: Name of the file to save the visualization
    """
    pos = create_improved_hierarchical_layout(G)
    
    plt.figure(figsize=(20, 15))
    
    # Draw edges with reduced opacity
    nx.draw_networkx_edges(G, pos, edge_color='gray', arrows=True, arrowsize=10, width=0.5, alpha=0.3)
    
    # Draw nodes
    node_colors = [data.get('color', '#1f78b4') for _, data in G.nodes(data=True)]
    nx.draw_networkx_nodes(G, pos, node_color=node_colors, node_size=500, node_shape='o')
    
    # Add node labels
    labels = {node: f"{node}" for node, data in G.nodes(data=True)}
    nx.draw_networkx_labels(G, pos, labels, font_size=6)
    
    # Add layer lines and labels
    layers = nx.get_node_attributes(G, 'layer')
    max_layer = max(layers.values())
    layer_names = nx.get_node_attributes(G, 'layer_name')
    unique_layer_names = list(dict.fromkeys(layer_names.values()))
    
    for layer in range(max_layer + 1):
        y = max_layer - layer
        plt.axhline(y=y, color='gray', linestyle='--', alpha=0.5)
        layer_name = unique_layer_names[min(layer, len(unique_layer_names) - 1)] if unique_layer_names else f"Layer {layer}"
        plt.text(-1, y, layer_name, verticalalignment='center', fontweight='bold')
    
    plt.title("Improved Hierarchical Network Visualization", fontsize=16, fontweight='bold')
    plt.axis('off')
    plt.tight_layout()
    plt.savefig(filename, format='png', dpi=300, bbox_inches='tight')
    plt.close()

# Example usage:
# G = synthesize_graph(total_nodes=100, total_layers=12, distribution='uniform', clear_cut_layer=3, layer_names=['Root', 'Intermediate', 'Advanced', 'Expert'])
# visualize_graph_hierarchical(G, 'improved_hierarchical_graph.png')

In [62]:
# Synthesize the graph
G = synthesize_graph(
    total_nodes=100,
    total_layers=12,
    distribution="pos_exp",
    clear_cut_layer=5,
    layer_names=[
        "Organization",
        "Business Group",
        "Product Family",
        "Product Offering",
        "Modules",
        "Parts",
    ],
)

# Save to JSON
save_graph_to_json(G, 'synthesized_graph.json')

# Save to CSV
save_graph_to_csv(G, 'nodes.csv', 'edges.csv')

# Visualize the graph
visualize_graph(G, "graph_visualization.png")

visualize_graph_hierarchical(G, "graph_visualization_hierarchical.png")

  plt.tight_layout()
