In [2]:
import requests

def download_file(url, filename):
    try:
        print(f"Starting download from {url}")
        response = requests.get(url, stream=True)
        response.raise_for_status()  # Raise an error for HTTP status codes 4xx/5xx
        
        with open(filename, 'wb') as file:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:  # filter out keep-alive new chunks
                    file.write(chunk)
        
        print(f"Download completed. File saved as {filename}")
    except requests.exceptions.RequestException as e:
        print(f"An error occurred: {e}")

# URL and local file name
url = "https://github.com/mitre/cti/raw/master/capec/2.1/stix-capec.json"
filename = "stix-capec.json"

download_file(url, filename)

Starting download from https://github.com/mitre/cti/raw/master/capec/2.1/stix-capec.json
Download completed. File saved as stix-capec.json


In [12]:
import json

try:
    # Open and parse the JSON file
    with open("stix-capec.json", "rb") as file:
        data = json.load(file)

    # Check if the root contains an "objects" array
    objects = data.get("objects", [])
    
    # Initialize the output structure
    output = {"nodes": []}

    # Process each object in the "objects" array
    for item in objects:
        # Only process items of type "attack-pattern"
        if item.get("type") == "attack-pattern":
            # Safely handle `external_references` and iterate
            for reference in item.get("external_references", []):
                # Filter for CAPEC references only
                if reference.get("source_name", "").lower() == "capec":
                    node = {
                        "id": reference.get("external_id"),
                        "type": reference.get("source_name", "").upper(),
                        "features": {
                            "description": item.get("description", "")
                        }
                    }
                    # Add the node only if it has a valid `id` and `type`
                    if node["id"] and node["type"]:
                        output["nodes"].append(node)

    # Convert back to JSON for output
    formatted_json = json.dumps(output, indent=4)

    # Print to verify the output
    print(formatted_json)

    # Save the output to a file
    with open("capec_nodes.json", "w") as file:
        json.dump(output, file, indent=4)

    print("Transformation complete. Only CAPEC nodes saved to 'capec_nodes.json'.")

except FileNotFoundError:
    print("Error: File 'stix-capec.json' not found.")
except json.JSONDecodeError as e:
    print(f"Error decoding JSON: {e}")
except Exception as e:
    print(f"An error occurred: {e}")


{
    "nodes": [
        {
            "id": "CAPEC-1",
            "type": "CAPEC",
            "features": {
                "description": "In applications, particularly web applications, access to functionality is mitigated by an authorization framework. This framework maps Access Control Lists (ACLs) to elements of the application's functionality; particularly URL's for web apps. In the case that the administrator failed to specify an ACL for a particular element, an attacker may be able to access it with impunity. An attacker with the ability to access functionality not properly constrained by ACLs can obtain sensitive information and possibly compromise the entire application. Such an attacker can access resources that must be available only to users at a higher privilege level, can access management sections of the application, or can run queries for data that they otherwise not supposed to."
            }
        },
        {
            "id": "CAPEC-10",
            "type": "

In [14]:
import json
import torch
from transformers import BertTokenizer, BertModel

# Load pre-trained BERT model and tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained('bert-base-uncased')

# Function to create BERT embedding for a given text
def create_bert_embedding(text):
    inputs = tokenizer(text, return_tensors='pt', truncation=True, max_length=512)
    with torch.no_grad():
        outputs = model(**inputs)
    # Use the embeddings from the last hidden state
    last_hidden_state = outputs.last_hidden_state
    # Average the token embeddings to get a single embedding for the text
    embedding = torch.mean(last_hidden_state, dim=1).squeeze()
    return embedding.tolist()

# Load the JSON file
with open('capec_nodes.json', 'r', encoding='utf-8') as json_file:
    data = json.load(json_file)

# Iterate over the nodes and replace the description with its BERT embedding
for node in data['nodes']:
    description = node['features']['description']
    embedding = create_bert_embedding(description)
    node['features']['description_embedding'] = embedding
    # Remove the original description if needed
    del node['features']['description']

# Write the updated JSON to a file
with open('capec_nodes_emb.json', 'w', encoding='utf-8') as json_file:
    json.dump(data, json_file, indent=4)

In [1]:
import json

try:
    # Open and parse the JSON file
    with open("stix-capec.json", "rb") as file:
        data = json.load(file)

    # Check if the root contains an "objects" array
    objects = data.get("objects", [])
    
    # Initialize the output structure
    output = {"edges": []}

    # Process each object in the "objects" array
    for item in objects:
        # Only process items of type "attack-pattern"
        if item.get("type") == "attack-pattern":
            # Find the CAPEC ID for this attack pattern
            capec_id = None
            for reference in item.get("external_references", []):
                if reference.get("source_name", "").lower() == "capec":
                    capec_id = reference.get("external_id")
                    break  # Assume only one CAPEC ID per attack pattern

            # Skip if no CAPEC ID is found
            if not capec_id:
                continue

            # Create CWE to CAPEC edges
            for reference in item.get("external_references", []):
                if reference.get("source_name", "").lower() == "cwe":
                    output["edges"].append({
                        "source": reference.get("external_id"),
                        "target": capec_id,
                        "relationship": "cwe_to_capec"
                    })

            # Create CAPEC to Technique edges
            for reference in item.get("external_references", []):
                if reference.get("source_name", "").lower() == "attack":
                    output["edges"].append({
                        "source": capec_id,
                        "target": reference.get("external_id"),
                        "relationship": "capec_to_technique"
                    })

    # Convert back to JSON for output
    formatted_json = json.dumps(output, indent=4)

    # Print to verify the output
    print(formatted_json)

    # Save the output to a file
    with open("capec_edges.json", "w") as file:
        json.dump(output, file, indent=4)

    print("Transformation complete. Edges saved to 'capec_edges.json'.")

except FileNotFoundError:
    print("Error: File 'stix-capec.json' not found.")
except json.JSONDecodeError as e:
    print(f"Error decoding JSON: {e}")
except Exception as e:
    print(f"An error occurred: {e}")


Error: File 'stix-capec.json' not found.


In [5]:
import json

try:
    # Open and parse the JSON file
    with open("original_data/stix-capec.json", "rb") as file:
        data = json.load(file)

    # Check if the root contains an "objects" array
    objects = data.get("objects", [])
    
    # Create a lookup table for CAPEC IDs by attack-pattern IDs
    capec_lookup = {}
    for item in objects:
        if item.get("type") == "attack-pattern":
            for reference in item.get("external_references", []):
                if reference.get("source_name", "").lower() == "capec":
                    capec_lookup[item["id"]] = reference.get("external_id")
                    break

    # Initialize the output structure
    output = {"edges": []}

    # Process each object in the "objects" array
    for item in objects:
        # Only process items of type "attack-pattern"
        if item.get("type") == "attack-pattern":
            # Find the CAPEC ID for this attack pattern
            capec_id = None
            for reference in item.get("external_references", []):
                if reference.get("source_name", "").lower() == "capec":
                    capec_id = reference.get("external_id")
                    break  # Assume only one CAPEC ID per attack pattern

            # Skip if no CAPEC ID is found
            if not capec_id:
                continue

            # Create CAPEC to CAPEC relationships for "can_precede" and "child_of"
            for related_capec in item.get("x_capec_can_precede_refs", []):
                related_capec_id = capec_lookup.get(related_capec, related_capec)
                output["edges"].append({
                    "source": capec_id,
                    "target": related_capec_id,
                    "relationship": "can_precede"
                })

            for parent_capec in item.get("x_capec_child_of_refs", []):
                parent_capec_id = capec_lookup.get(parent_capec, parent_capec)
                output["edges"].append({
                    "source": capec_id,
                    "target": parent_capec_id,
                    "relationship": "child_of"
                })

            for child_capec in item.get("x_capec_parent_of_refs", []):
                child_capec_id = capec_lookup.get(child_capec, child_capec)
                output["edges"].append({
                    "source": capec_id,
                    "target": child_capec_id,
                    "relationship": "parent_of"
                })

    # Convert back to JSON for output
    formatted_json = json.dumps(output, indent=4)

    # Print to verify the output
    print(formatted_json)

    # Save the output to a file
    with open("capec_edges.json", "w") as file:
        json.dump(output, file, indent=4)

    print("Transformation complete. Edges saved to 'capec_edges.json'.")

except FileNotFoundError:
    print("Error: File 'stix-capec.json' not found.")
except json.JSONDecodeError as e:
    print(f"Error decoding JSON: {e}")
except Exception as e:
    print(f"An error occurred: {e}")


{
    "edges": [
        {
            "source": "CAPEC-1",
            "target": "CAPEC-17",
            "relationship": "can_precede"
        },
        {
            "source": "CAPEC-1",
            "target": "CAPEC-122",
            "relationship": "child_of"
        },
        {
            "source": "CAPEC-1",
            "target": "CAPEC-58",
            "relationship": "parent_of"
        },
        {
            "source": "CAPEC-1",
            "target": "CAPEC-679",
            "relationship": "parent_of"
        },
        {
            "source": "CAPEC-1",
            "target": "CAPEC-680",
            "relationship": "parent_of"
        },
        {
            "source": "CAPEC-1",
            "target": "CAPEC-681",
            "relationship": "parent_of"
        },
        {
            "source": "CAPEC-10",
            "target": "CAPEC-100",
            "relationship": "child_of"
        },
        {
            "source": "CAPEC-100",
            "target": "CAPEC-123",
 

CWE to CWE


In [17]:
import xml.etree.ElementTree as ET
import json

# Define the input XML file path
input_xml_file = "cwec_v4.16.xml"
output_json_file = "cwe_nodes.json"

# Parse the XML file
tree = ET.parse(input_xml_file)
root = tree.getroot()

# Namespace for the XML
namespace = {"cwe": "http://cwe.mitre.org/cwe-7"}

# Initialize the output structure
output = {"nodes": []}

# Iterate over each Weakness in the XML
for weakness in root.findall(".//cwe:Weaknesses/cwe:Weakness", namespace):
    cwe_id = weakness.get("ID")
    cwe_name = weakness.get("Name")
    cwe_description = weakness.find("cwe:Description", namespace)
    
    # Construct the node
    node = {
        "id": f"CWE-{cwe_id}",
        "type": "CWE",
        "features": {
            "description": cwe_description.text.strip() if cwe_description is not None else "No description available"
        }
    }
    
    # Add the node to the output
    output["nodes"].append(node)

# Save the output to a JSON file
with open(output_json_file, "w") as json_file:
    json.dump(output, json_file, indent=4)

print(f"Transformation complete. Output saved to '{output_json_file}'.")


Transformation complete. Output saved to 'cwe_nodes.json'.


In [18]:
import json
import torch
from transformers import BertTokenizer, BertModel

# Load pre-trained BERT model and tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained('bert-base-uncased')

# Function to create BERT embedding for a given text
def create_bert_embedding(text):
    inputs = tokenizer(text, return_tensors='pt', truncation=True, max_length=512)
    with torch.no_grad():
        outputs = model(**inputs)
    # Use the embeddings from the last hidden state
    last_hidden_state = outputs.last_hidden_state
    # Average the token embeddings to get a single embedding for the text
    embedding = torch.mean(last_hidden_state, dim=1).squeeze()
    return embedding.tolist()

# Load the JSON file
with open('cwe_nodes.json', 'r', encoding='utf-8') as json_file:
    data = json.load(json_file)

# Iterate over the nodes and replace the description with its BERT embedding
for node in data['nodes']:
    description = node['features']['description']
    embedding = create_bert_embedding(description)
    node['features']['description_embedding'] = embedding
    # Remove the original description if needed
    del node['features']['description']

# Write the updated JSON to a file
with open('cwe_nodes_emb.json', 'w', encoding='utf-8') as json_file:
    json.dump(data, json_file, indent=4)

In [19]:
import xml.etree.ElementTree as ET
import json

# Define the input XML file path
input_xml_file = "cwec_v4.16.xml"
output_json_file = "cwe_edges.json"

# Parse the XML file
tree = ET.parse(input_xml_file)
root = tree.getroot()

# Namespace for the XML
namespace = {"cwe": "http://cwe.mitre.org/cwe-7"}

# Initialize the output structure
output = {"edges": []}

# Iterate over each Weakness in the XML
for weakness in root.findall(".//cwe:Weaknesses/cwe:Weakness", namespace):
    cwe_id = weakness.get("ID")
    
    # Process related weaknesses
    related_weaknesses = weakness.findall("cwe:Related_Weaknesses/cwe:Related_Weakness", namespace)
    for related in related_weaknesses:
        related_cwe_id = related.get("CWE_ID")
        relationship = related.get("Nature", "Unknown")
        
        # Create an edge
        edge = {
            "source": f"CWE-{cwe_id}",
            "target": f"CWE-{related_cwe_id}",
            "relationship": relationship.lower()  # Relationship type (e.g., childof, parentof)
        }
        output["edges"].append(edge)

# Save the output to a JSON file
with open(output_json_file, "w") as json_file:
    json.dump(output, json_file, indent=4)

print(f"Transformation complete. Edges saved to '{output_json_file}'.")


Transformation complete. Edges saved to 'cwe_edges.json'.


Measure missing CWE andCAPEC links  

In [14]:
import json 

with open("../nodes/cwe_nodes_emb_complete.json", "rb") as file:
    output = json.load(file)
print(len(output["nodes"]))

965


CWE

In [29]:
import json

# Load the JSON file
with open('../edges/capec_edges_complete.json') as f:
    data = json.load(f)

# Access the edges list inside the data dictionary
edges = data.get("edges", [])
if not edges:
    print("No edges found in the data.")
    exit()

# Initialize counters
count_capec_to_technique = 0
count_capec_to_technique_dupl = 0

# Use a dictionary to track duplicates for "capec_to_technique"
source_count = {}

# Count occurrences of each source for "capec_to_technique" edges
for item in edges:
    if item.get("relationship") == "cwe_to_capec":
        source = item.get("source")
        if source in source_count:
            source_count[source] += 1
        else:
            source_count[source] = 1

# Count the duplicates (sources that appear more than once)
for source, count in source_count.items():
    if count > 1:
        count_capec_to_technique_dupl += count - 1  # Count only the duplicates, not the original

# Count total "capec_to_technique" edges
count_capec_to_technique = sum(1 for item in edges if item.get("relationship") == "cwe_to_capec")

# Print the results
print(count_capec_to_technique)  # Total number of "capec_to_technique" edges
print(count_capec_to_technique_dupl)  # Total number of duplicate "capec_to_technique" sources


1214
878


CAPEC

In [28]:
import json

# Load the JSON file
with open('../edges/capec_edges_complete.json') as f:
    data = json.load(f)

# Access the edges list inside the data dictionary
edges = data.get("edges", [])
if not edges:
    print("No edges found in the data.")
    exit()

# Initialize counters
count_capec_to_technique = 0
count_capec_to_technique_dupl = 0

# Use a dictionary to track duplicates for "capec_to_technique"
source_count = {}

# Count occurrences of each source for "capec_to_technique" edges
for item in edges:
    if item.get("relationship") == "capec_to_technique":
        source = item.get("source")
        if source in source_count:
            source_count[source] += 1
        else:
            source_count[source] = 1

# Count the duplicates (sources that appear more than once)
for source, count in source_count.items():
    if count > 1:
        count_capec_to_technique_dupl += count - 1  # Count only the duplicates, not the original

# Count total "capec_to_technique" edges
count_capec_to_technique = sum(1 for item in edges if item.get("relationship") == "capec_to_technique")

# Print the results
print(count_capec_to_technique)  # Total number of "capec_to_technique" edges
print(count_capec_to_technique_dupl)  # Total number of duplicate "capec_to_technique" sources


272
95
