In [74]:
# Import necessary libraries
import json
import pydot

# Load the graph (this takes time, so we do it once)
def load_callgraph(dot_file):
    print("Loading call graph...")
    graph = pydot.graph_from_dot_file(dot_file)[0]
    print(f"Graph loaded with {len(graph.get_nodes())} nodes and {len(graph.get_edges())} edges.")
    return graph

# Load vulnerabilities from JSON
def extract_vulnerable_function_names(json_file):
    with open(json_file, 'r') as f:
        vulnerabilities = json.load(f)
    vulnerable_functions = {vuln["procedure"] for vuln in vulnerabilities}
    print(f"Loaded {len(vulnerabilities)} vulnerabilities.")
    return vulnerable_functions

def map_function_names_to_nodes(graph, function_names):
    node_mapping = {func_name: [] for func_name in function_names}  # Map function to list of Node0x
    for node in graph.get_nodes():
        label = node.get_attributes().get('label', '')
        for func_name in function_names:
            func_parts = func_name.split("::")
            if all(part in label for part in func_parts):
                node_mapping[func_name].append(node.get_name().strip('"'))
                break
    print(f"Mapped {len(node_mapping)} functions to graph nodes.")
    return node_mapping

# Load manual mappings
def load_mappings(json_file):
    with open(json_file, 'r') as f:
        mappings = json.load(f)
    return mappings

# Find related functions
def find_related_functions(graph, manual_mappings, depth):
    """
    Find related functions (callers and callees) for each manually mapped node
    up to the specified depth, assuming one node per function name.
    """
    related_functions = {}

    for procedure, node in manual_mappings.items():
        related_functions[procedure] = {"callers": set(), "callees": set()}
        
        queue = [(node, 0)]  # (current_node, current_depth)
        visited = set()
        visited.add(node)
        
        while queue:
            current_node, current_depth = queue.pop(0)
            
            if current_depth >= depth:
                continue
            
            for edge in graph.get_edges():
                # Caller: current_node is destination
                if edge.get_destination() == current_node:
                    caller_node = edge.get_source().split(":")[0]
                    if caller_node not in visited:
                        related_functions[procedure]["callers"].add(caller_node)
                        visited.add(caller_node)
                        queue.append((caller_node, current_depth + 1))
                
                # Callee: current_node is source
                elif edge.get_source().split(":")[0] == current_node:
                    callee_node = edge.get_destination()
                    if callee_node not in visited:
                        related_functions[procedure]["callees"].add(callee_node)
                        visited.add(callee_node)
                        queue.append((callee_node, current_depth + 1))
        
        # Convert sets to lists for JSON serialization
        related_functions[procedure]["callers"] = list(related_functions[procedure]["callers"])
        related_functions[procedure]["callees"] = list(related_functions[procedure]["callees"])

    return related_functions


# Save node mappings for manual verification (sorted version)
def save_sorted_node_mappings(node_mapping, output_file):
    # Sort the node_mapping by function name (keys) and then sort the node lists
    sorted_mapping = {
        func: sorted(nodes) for func, nodes in sorted(node_mapping.items())
    }
    # Save the sorted mapping to a file
    with open(output_file, 'w') as f:
        json.dump(sorted_mapping, f, indent=4)
    print(f"Sorted node mappings saved to {output_file}")

def save_related_functions(related_functions, output_file):
    # Save the related functions to a file
    with open(output_file, 'w') as f:
        json.dump(related_functions, f, indent=4)
    print(f"Functions saved to {output_file}")

# Explore graph interactively
def explore_graph(graph, function_name):
    """Print some basic information about a specific function in the graph."""
    print(f"Exploring function: {function_name}")
    callers = [
        edge.get_source().strip('"').split(":")[0] for edge in graph.get_edges()
        if edge.get_destination().strip('"') == function_name
    ]
    callees = [
        edge.get_destination().strip('"') for edge in graph.get_edges()
        if edge.get_source().strip('"').split(":")[0] == function_name
    ]
    print(f"Callers: {callers}")
    print(f"Callees: {callees}")


# File paths (update as necessary)
dot_file = "callgraph.dot"
vulnerabilities_file = "report.json"
output_file = "annotated_vulnerabilities.json"

In [6]:
# Start of the notebook workflow
# Load the graph and vulnerabilities
callgraph = load_callgraph(dot_file)

Loading call graph...
Graph loaded with 3196 nodes and 38449 edges.


In [46]:
# Read vuln. function names
vulnerabilities = extract_vulnerable_function_names(vulnerabilities_file)

Loaded 51 vulnerabilities.


In [None]:
# Map functions to nodes as in callgraph
node_mapping = map_function_names_to_nodes(callgraph, vulnerabilities)
save_sorted_node_mappings(node_mapping, "mappings.json")

Mapped 31 functions to graph nodes.
Sorted node mappings saved to mappings.json


In [76]:
explore_graph(callgraph, "Node0x55eb7455a290")

Exploring function: Node0x55eb7455a290
Callers: []
Callees: ['Node0x55eb74646980', 'Node0x55eb74646840', 'Node0x55eb7453adc0', 'Node0x55eb7453adc0', 'Node0x55eb746460c0', 'Node0x55eb746460c0', 'Node0x55eb74646ac0', 'Node0x55eb746460c0', 'Node0x55eb7464b980', 'Node0x55eb7453aed0', 'Node0x55eb74648f00', 'Node0x55eb74646ac0', 'Node0x55eb7453adc0', 'Node0x55eb746460c0', 'Node0x55eb746465c0', 'Node0x55eb74647d80', 'Node0x55eb7454d840', 'Node0x55eb7464b840', 'Node0x55eb7454dac0', 'Node0x55eb7454dc00', 'Node0x55eb746460c0', 'Node0x55eb746460c0', 'Node0x55eb74646840', 'Node0x55eb74647ec0', 'Node0x55eb7453adc0', 'Node0x55eb74645e40', 'Node0x55eb74646c00', 'Node0x55eb7454d980', 'Node0x55eb7453aed0', 'Node0x55eb74645bc0', 'Node0x55eb746460c0', 'Node0x55eb7453adc0', 'Node0x55eb7453adc0', 'Node0x55eb74648f00', 'Node0x55eb7464b700']


In [78]:
mappings = load_mappings("manual_mappings.json")
mappings

{'light_create_default_file_info': 'Node0x55eb7464b700',
 'light_create_file_info': 'Node0x55eb7464cd80',
 'light_pcapng_open_read': 'Node0x55eb74649cc0',
 'light_pcapng_open_write': 'Node0x55eb7464b840',
 'pcpp::IDnsResource::decodeName': 'Node0x55eb745b1450',
 'pcpp::IDnsResource::encodeName': 'Node0x55eb745b0e10',
 'pcpp::IPFilter::convertToIPAddressWithLen': 'Node0x55eb74562130',
 'pcpp::IPv4Layer::parseNextLayer': 'Node0x55eb7456b270',
 'pcpp::IPv6Layer::parseExtensions': 'Node0x55eb745740f0'}

In [83]:
related_functions = find_related_functions(callgraph, mappings, 1)
len(related_functions['pcpp::IPv4Layer::parseNextLayer']['callees'])

60

In [None]:
save_related_functions(related_functions, "function_relations.json")