In [5]:
import pickle
from collections import defaultdict, deque
from tqdm import tqdm  # For progress bars


def load_pickle_file(filename):
    """Load data from a pickle file."""
    with open(filename, 'rb') as file:
        return pickle.load(file)
    

gnn_input_file = "final_dataset/gnn_input.pkl"
data = load_pickle_file(gnn_input_file)

In [6]:




def build_graph(edges):
    """Build a graph as an adjacency list from the edges."""
    graph = defaultdict(list)
    for edge in edges:
        graph[edge["source"]].append(edge["target"])
        graph[edge["target"]].append(edge["source"])  # Assuming undirected graph
    return graph

def bfs(graph, start, target_type, nodes):
    """Perform BFS to find if a target_type node is reachable."""
    visited = set()
    queue = deque([start])

    while queue:
        current = queue.popleft()
        if current in visited:
            continue
        visited.add(current)

        # Check if the current node matches the target type
        if any(node['id'] == current and node['type'] == target_type for node in nodes):
            return True

        # Add neighbors to the queue
        queue.extend(graph[current])

    return False

def check_cwe_to_capec_reachability(data):
    """Check if all CWE nodes can reach at least one CAPEC node."""
    nodes = data["nodes"]
    edges = data["edges"]

    # Build the graph
    graph = build_graph(edges)

    # Get all CWE nodes
    cwe_nodes = [node["id"] for node in nodes if node["type"] == "CWE"]

    # Track non-connected CWE nodes
    non_connected_nodes = []

    # Check connectivity for each CWE node
    for cwe_node in tqdm(cwe_nodes, desc="Checking CWE -> CAPEC connectivity"):
        if not bfs(graph, cwe_node, "CAPEC", nodes):
            non_connected_nodes.append(cwe_node)
            # tqdm.write(f"Node {cwe_node} is not connected to any CAPEC node.")

    if non_connected_nodes:
        # tqdm.write("The following CWE nodes are not connected to any CAPEC node:")
        for node in non_connected_nodes:
           print(f" - {node}")
    else:
        print("All CWE nodes are connected to at least one CAPEC node.")

    return non_connected_nodes

# Load data from the pickle file


# Perform the reachability check
non_connected_nodes = check_cwe_to_capec_reachability(data)


Checking CWE -> CAPEC connectivity: 100%|██████████| 965/965 [48:32<00:00,  3.02s/it]  

 - CWE-1187
 - CWE-132
 - CWE-1324
 - CWE-216
 - CWE-217
 - CWE-218
 - CWE-225
 - CWE-247
 - CWE-249
 - CWE-292
 - CWE-365
 - CWE-373
 - CWE-423
 - CWE-443
 - CWE-458
 - CWE-516
 - CWE-533
 - CWE-534
 - CWE-542
 - CWE-545
 - CWE-592
 - CWE-596
 - CWE-71
 - CWE-769
 - CWE-92





In [None]:
import pickle
from collections import defaultdict, deque
from tqdm import tqdm  # For progress bars
from concurrent.futures import ThreadPoolExecutor, as_completed

def load_pickle_file(filename):
    """Load data from a pickle file."""
    with open(filename, 'rb') as file:
        return pickle.load(file)

def build_graph(edges):
    """Build a graph as an adjacency list from the edges."""
    graph = defaultdict(list)
    for edge in edges:
        graph[edge["source"]].append(edge["target"])
        graph[edge["target"]].append(edge["source"])  # Assuming undirected graph
    return graph

def bfs(graph, start, target_type, nodes):
    """Perform BFS to find if a target_type node is reachable."""
    visited = set()
    queue = deque([start])

    while queue:
        current = queue.popleft()
        if current in visited:
            continue
        visited.add(current)

        # Check if the current node matches the target type
        if any(node['id'] == current and node['type'] == target_type for node in nodes):
            return True

        # Add neighbors to the queue
        queue.extend(graph[current])

    return False

def check_single_cwe_node(cwe_node, graph, nodes):
    """Check if a single CWE node can reach any CAPEC node."""
    return cwe_node if not bfs(graph, cwe_node, "CAPEC", nodes) else None

def check_cwe_to_capec_reachability(data):
    """Check if all CWE nodes can reach at least one CAPEC node using multithreading."""
    nodes = data["nodes"]
    edges = data["edges"]

    # Build the graph
    graph = build_graph(edges)

    # Get all CWE nodes
    cwe_nodes = [node["id"] for node in nodes if node["type"] == "CWE"]

    # Track non-connected CWE nodes
    non_connected_nodes = []

    # Use ThreadPoolExecutor for parallel processing
    with ThreadPoolExecutor() as executor:
        # Submit BFS checks for each CWE node
        futures = {executor.submit(check_single_cwe_node, cwe_node, graph, nodes): cwe_node for cwe_node in cwe_nodes}

        # Process results with a progress bar
        for future in tqdm(as_completed(futures), total=len(cwe_nodes), desc="Checking CWE -> CAPEC connectivity"):
            result = future.result()
            if result:
                non_connected_nodes.append(result)

    if non_connected_nodes:
        print("The following CWE nodes are not connected to any CAPEC node:")
        for node in non_connected_nodes:
            print(f" - {node}")
    else:
        print("All CWE nodes are connected to at least one CAPEC node.")

    return non_connected_nodes

non_connected_nodes = check_cwe_to_capec_reachability(data)


Checking CWE -> CAPEC connectivity:   5%|▌         | 49/965 [01:50<34:25,  2.26s/it]  
