In [4]:
import os
import sys 
from pprint import pprint 

module_path = os.path.abspath(os.path.join('../'))  
sys.path.insert(0, module_path)
 
print(module_path)
print(os.getcwd())

os.chdir(module_path)
print(os.getcwd())

d:\Programming\Repos\QM\codecartographer\src\codecarto
d:\Programming\Repos\QM\codecartographer\src\codecarto\notebooks
d:\Programming\Repos\QM\codecartographer\src\codecarto


In [63]:
%pip install gravis
%pip install igraph
%pip install networkx
%pip install -r ./requirements.txt

Note: you may need to restart the kernel to use updated packages.
Collecting igraphNote: you may need to restart the kernel to use updated packages.

  Downloading igraph-0.11.8-cp39-abi3-win_amd64.whl.metadata (3.9 kB)
Collecting texttable>=1.6.2 (from igraph)
  Downloading texttable-1.7.0-py2.py3-none-any.whl.metadata (9.8 kB)
Downloading igraph-0.11.8-cp39-abi3-win_amd64.whl (2.0 MB)
   ---------------------------------------- 0.0/2.0 MB ? eta -:--:--
   ---------------------------------------- 2.0/2.0 MB 36.5 MB/s eta 0:00:00
Downloading texttable-1.7.0-py2.py3-none-any.whl (10 kB)
Installing collected packages: texttable, igraph
Successfully installed igraph-0.11.8 texttable-1.7.0

Note: you may need to restart the kernel to use updated packages.


In [64]:
import os
import ast
import networkx as nx
from typing import Union
from collections import defaultdict
from models.source_data import File, Folder

class BaseASTVisitor(ast.NodeVisitor):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.graph = nx.DiGraph()  # Unified DiGraph for all files
        self.current_parent_id = None
        self.current_function_args = {}
        self.current_module_name = ""
        self.functions = []
        
        # Cross-file tracking 
        self.imports = {}  # Track imports {import_name: full_name}
        self.class_map = {}  # Global reference map {class_name: node_id}
        self.function_map = defaultdict(dict)  # {class_name: {method_name: node_id}}


    def parse(self, folder: Folder) -> nx.DiGraph:
        """Parse each file in the folder individually, then link across modules."""
        for file in folder.files:
            self._parse_file(file)
        self._resolve_cross_module_links()
        return self.graph

    def _parse_file(self, file: File):
        """Parse each file and build its AST in the graph."""
        self.current_module_name = os.path.splitext(file.name)[0]
        self.current_parent_id = None
        self.current_function_args = {}
        
        tree = ast.parse(file.raw)
        self.visit(tree)
    
    def create_node(self, label, node_type, module, parent=None):
        """Helper to create a node in the graph with unique ID."""
        node_id = f"{module}.{label}"
        if not self.graph.has_node(node_id):
            self.graph.add_node(
                node_id, 
                label=label, 
                type=node_type, 
                module=module, 
                parent=parent
            )
        if parent:
            self.graph.add_edge(parent, node_id)
        return node_id

    def _resolve_cross_module_links(self):
        """Link imports and resolve method calls across modules."""
        for import_name, full_name in self.imports.items():
            importing_module_id = f"{self.current_module_name}.{self.current_module_name}"
            if full_name in self.graph:
                import_node_id = self.create_node(import_name, "Import", self.current_module_name)
                self.graph.add_edge(importing_module_id, import_node_id)
                self.graph.add_edge(import_node_id, full_name)

class PythonAST(BaseASTVisitor):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs) 

    def visit_Module(self, node):
        module_id = self.create_node(self.current_module_name, "Module", self.current_module_name)
        self.module = module_id
        previous_parent_id = self.current_parent_id
        self.current_parent_id = module_id
        self.generic_visit(node)
        self.current_parent_id = previous_parent_id
        return module_id  

    def visit_ImportFrom(self, node):
        """Handles 'from module import name' statements."""
        for alias in node.names:
            import_name = alias.name
            full_name = f"{node.module}.{import_name}"
            self.imports[import_name] = full_name

            # Create an import node to track cross-file imports
            import_node_id = self.create_node(full_name, "Import", self.current_module_name, parent=self.current_parent_id)
            self.graph.add_edge(f"{self.current_module_name}.{self.current_module_name}", import_node_id)
            self.graph.add_edge(import_node_id, f"{node.module}.{node.module}")

    def visit_ClassDef(self, node):
        class_id = self.create_node(node.name, "Class", self.current_module_name, parent=self.current_parent_id)
        module_node_id = f"{self.current_module_name}.{self.current_module_name}"
        self.graph.add_edge(module_node_id, class_id)
        
        # Register class in global map
        self.class_map[node.name] = class_id

        previous_parent_id = self.current_parent_id
        self.current_parent_id = class_id

        # Handle inheritance links
        for base in node.bases:
            if isinstance(base, ast.Name) and base.id in self.imports:
                imported_class = self.imports[base.id]
                self.graph.add_edge(class_id, imported_class)
                self.class_map[node.name] = imported_class  # Avoid duplicating HelloWorld in Greetings

        for child in node.body:
            if isinstance(child, ast.FunctionDef):
                func_id = self.create_node(child.name, "Function", self.current_module_name, parent=class_id)
                self.function_map[node.name][child.name] = func_id
            self.visit(child)

        self.current_parent_id = previous_parent_id
        return class_id

    def visit_FunctionDef(self, node):
        func_id = self.create_node(node.name, "Function", self.current_module_name, parent=self.current_parent_id)
        self.functions.append(func_id)
        previous_parent_id = self.current_parent_id
        self.current_parent_id = func_id

        self.current_function_args = {}
        for arg in node.args.args:
            arg_id = self.visit(arg)
            self.current_function_args[arg.arg] = arg_id

        for stmt in node.body:
            self.visit(stmt)

        self.current_function_args = {}
        self.current_parent_id = previous_parent_id
        return func_id

    def visit_Expr(self, node):
        """Track calls within expressions."""
        if isinstance(node.value, ast.Call):
            call_name = ""
            if isinstance(node.value.func, ast.Attribute):
                # Attempt to resolve the class the method belongs to
                base_name = node.value.func.value.id if isinstance(node.value.func.value, ast.Name) else None
                method_name = node.value.func.attr
                call_name = f"{base_name}.{method_name}" if base_name else method_name

                if base_name in self.imports:
                    imported_class_full_name = self.imports[base_name]
                    imported_method_full_name = f"{imported_class_full_name}.{method_name}"

                    if imported_class_full_name in self.class_map:
                        actual_method_id = self.function_map.get(imported_class_full_name, {}).get(method_name)
                        if actual_method_id:
                            self.graph.add_edge(self.current_parent_id, actual_method_id)
                            return actual_method_id

            # Local or unresolved function call
            call_id = self.create_node(call_name, "Call", self.current_module_name, parent=self.current_parent_id)
            previous_parent_id = self.current_parent_id
            self.current_parent_id = call_id
            for arg in node.value.args:
                self.visit(arg)
            self.current_parent_id = previous_parent_id
            return call_id

    def visit_Name(self, node):
        if node.id in self.imports:
            import_id = self.imports[node.id]
            self.graph.add_edge(self.current_parent_id, import_id)
            return import_id
        name_id = self.create_node(node.id, "Name", self.current_module_name, parent=self.current_parent_id)
        return name_id
    
    def visit_arg(self, node):
        arg_id = self.create_node(node.arg, "Argument", self.current_module_name, parent=self.current_parent_id)
        return arg_id
    
    def visit_JoinedStr(self, node):
        joined_str_id = self.create_node("JoinedStr", "JoinedStr", self.current_module_name, parent=self.current_parent_id)
        previous_parent_id = self.current_parent_id
        self.current_parent_id = joined_str_id
        for value in node.values:
            self.visit(value)
        self.current_parent_id = previous_parent_id
        return joined_str_id

    def visit_FormattedValue(self, node):
        formatted_value_id = self.create_node("FormattedValue", "FormattedValue", self.current_module_name, parent=self.current_parent_id)
        previous_parent_id = self.current_parent_id
        self.current_parent_id = formatted_value_id
        self.visit(node.value)
        self.current_parent_id = previous_parent_id
        return formatted_value_id

    def visit_For(self, node):
        for_id = self.create_node("ForLoop", "For", self.current_module_name, parent=self.current_parent_id)
        previous_parent_id = self.current_parent_id
        self.current_parent_id = for_id
        self.visit(node.target)
        self.visit(node.iter)
        for stmt in node.body:
            self.visit(stmt)
        for stmt in node.orelse:
            self.visit(stmt)
        self.current_parent_id = previous_parent_id
        return for_id
 
    def visit_Constant(self, node):
        label = f"Const: {repr(node.value)}"
        const_id = self.create_node(label, "Constant", self.current_module_name, parent=self.current_parent_id)
        return const_id


In [65]:
def get_visitor(folder: Folder , print_graph=False):
    visitor = PythonAST()
    visitor.parse(folder)

    if print_graph:
        # Print the graph nodes and their attributes
        for node_id, data in visitor.graph.nodes(data=True):
            print(f"Node ID: {node_id}, Data: {data}")

        # Print the edges in the graph
        print("Edges:")
        for edge in visitor.graph.edges:
            print(edge)

    return visitor

In [66]:
import gravis as gv
from models.graph_data import GraphBase
from services.plotter_service import PlotterService, PlotOptions
from services.palette_service import DefaultPalette
from services.position_service import Positions

async def gravisify(graph_name: str, graph: nx.DiGraph, title: str = "Spectral", type: str = "d3"):
    # make possible multigraphs into a digraph
    ntxGraph = nx.DiGraph(graph)
    graphBase = GraphBase()
    graphBase.graph = ntxGraph
    root = "notebooks/"

    # Set and scale up the postiions
    plot = PlotterService()
    plot.plot_graph(graphBase, DefaultPalette, PlotOptions())
    pos = Positions().get_node_positions(
        graph=graph, layout_name=f"{title.lower()}_layout"
    )
    spread = 100
    if title == "Spectral":
        spread = 500
    for id, (x, y) in pos.items():
        node = graph.nodes[id]
        node["x"] = float(x) * spread
        node["y"] = float(y) * spread

    # Scale nodes based on edges
    for node, data in graph.nodes(data=True):
        # Set size based on the number of outgoing edges
        data["size"] = (
            1 + (len(graph.out_edges(node)) * 10) + (len(graph.in_edges(node)) * 10)
        )

    # Convert the graph to gJGF for the notebook
    return gv.convert.any_to_gjgf(graph)

def gravis(gJFG): 
    return gv.d3(
        data = gJFG, 
        zoom_factor = 1.5, 

        node_size_factor = 1,
        node_label_size_factor = 0.5,
        node_hover_neighborhood = True, 
        use_node_size_normalization = True,
        node_size_normalization_max = 30,  
        node_label_data_source = 'label',

        links_force_strength = 0.5, 
        links_force_distance = 55,
        edge_label_data_source = 'label',
        use_edge_size_normalization = True,
        
        layout_algorithm_active = False,
        use_centering_force = False, 
        
    )

In [67]:
source_code = '''
class HelloWorld:
    def SayHello(self, their_name: str, my_name: str):
        for i in range(10):
            print(f"Hello {their_name}, my name is {my_name}") 
'''

source_code2 = '''
from for_hello import HelloWorld
class Greetings(HelloWorld):
    def Hola(self, his_name: str, her_name: str):
        self.SayHello(his_name, her_name)
'''

file = File(name="for_hello.py", size=100, raw=source_code)
file2 = File(name="greetings.py", size=100, raw=source_code2)
folder = Folder(name="root", files=[file, file2])
visitor = get_visitor(folder)
graph = visitor.graph
gJFG = await gravisify("ParserService", graph=graph, title="Spiral")
#gravis(gJFG)


In [None]:
import igraph as ig

def assign_properties(g):
    # Centrality calculation
    node_centralities = g.betweenness()
    edge_centralities = g.edge_betweenness()

    # Community detection
    communities = g.community_fastgreedy().as_clustering().membership

    # Graph properties
    g['node_border_size'] = 1.5
    g['node_border_color'] = 'black'
    g['edge_opacity'] = 0.75 

    # Node properties: Size by centrality, color by community
    colors = ['red', 'blue', 'green', 'orange', 'pink', 'brown', 'yellow', 'cyan', 'magenta', 'violet']
    g.vs['size'] = [10.0 + val / 50.0 for val in node_centralities]
    g.vs['color'] = [colors[community_index % len(colors)] for community_index in communities]

    # Edge properties: Size by centrality, color by community (within=community color, between=black)
    g.es['size'] = [0.5 + val / 100.0 for val in edge_centralities]
    g.es['color'] = [colors[communities[i] % len(colors)] if communities[i] == communities[j] else 'black'
                     for i, j in g.get_edgelist()]

# Create a graph with a generator function
g = ig.Graph.GRG(200, 0.14)
g.from_networkx(graph)

# Scale the coordinates provided by this particular graph generator (alternative: delete them)
g.vs['x'] = [val * 2000 - 1000 for val in g.vs['x']]  # del g.vs['x']
g.vs['y'] = [val * 2000 - 1000 for val in g.vs['y']]  # del g.vs['y'] 

# Assign properties
assign_properties(g)

# Plot it
gv.d3(g, zoom_factor=0.2, node_label_data_source = "g.cls.graph_attrs['label']")


In [99]:

graph = nx.DiGraph()
graph.add_node(1, label="root")
g = ig.Graph()
g.from_networkx(graph)
pprint(g.vs["label"])
pprint(g.vs.attributes())

KeyError: 'Attribute does not exist'