In [4]:
import pandas as pd
import json
import os
from statistics import mode
from collections import Counter
import requests
import re
import copy
import pprint

In [5]:
def load_config():
    # Get the absolute path of the project root (one directory up)
    project_root = os.path.abspath(os.path.join(os.getcwd(), '../..'))

    # Normalize the project_root to ensure it's correctly formatted
    project_root = os.path.normpath(project_root)
    
    config_path = os.path.join(project_root, 'config.json')

    if not os.path.exists(config_path):
        raise FileNotFoundError(f"Config file not found at expected location: {config_path}")

    with open(config_path, 'r') as f:
        config = json.load(f)

    return config, project_root


In [6]:
config, project_root = load_config()


In [7]:
mitre_cve_map_file = os.path.normpath(os.path.join(project_root, "group_profile_analysis", "group_analysis_json_outputs", "MITRE_cve_group_analysis.json"))
malpedia_cve_map_file = os.path.normpath(os.path.join(project_root, "group_profile_analysis", "group_analysis_json_outputs", "Malpedia_cve_group_analysis.json"))

updated_union_map_file = os.path.normpath(os.path.join(os.getcwd(), "intermediate_json_outputs", "malpedia_attack_group_software_union_profile.json"))

In [8]:
# Function to normalize group names
def normalize_group_name(name):
    
    if not isinstance(name, str):
        return ''
    
    # Convert to lowercase for case-insensitive comparison
    name = name.lower().strip()

    # Remove 'team' from names like 'Sandworm Team'
    if name.endswith(' team'):
        name = name.replace(' team', '')

    # Replace 'threat group-' with 'tg-' (e.g., 'Threat Group-1314' -> 'TG-1314')
    name = re.sub(r'threat group[- ]', 'tg-', name)

    # Remove 'temp.' or similar prefixes (e.g., 'Temp.Pittytiger' -> 'Pittytiger')
    name = re.sub(r'^temp[\. ]+', '', name)

    # Normalize spaces and dots (e.g., 'pitty tiger' == 'pitty.tiger')
    name = re.sub(r'[\. ]+', ' ', name)

    # Remove common suffixes like 'framework' or 'group' (e.g., 'Inception Framework' -> 'Inception')
    name = re.sub(r' (framework|group)$', '', name)

    # Standardize 'Confucius' and 'Confucious' to 'confucius'
    name = re.sub(r'confucious', 'confucius', name)

    # Normalize specific known prefixes (apt, unc, g)
    name = re.sub(
        r'\b(apt|unc|g)[\s\.-]*([a-z]*)[\s\.-]*(\d{1,4})\b',
        lambda m: m.group(1) + m.group(2) + m.group(3),
        name
    )
    #re.sub(r'([a-z])[\s\.-]?(\d{2,4})', r'\1\2', name)


    return name

In [9]:
def load_json_metadata(metadata_path):
    """
    Loads the group metadata JSON file.

    Parameters
    ----------
    metadata_path : str
        Path to the JSON file containing group metadata.

    Returns
    -------
    dict
        Group metadata loaded from the file.
    """
    with open(metadata_path, 'r', encoding='utf-8') as f:
        metadata = json.load(f)
    return metadata

In [10]:
def load_cves_from_json(json_file):
    """
    Loads the CVE data from a JSON file and returns a dictionary of group IDs to associated CVEs.
    
    Parameters
    ----------
    json_file : str
        Path to the JSON file containing APT group CVE data.
    
    Returns
    -------
    dict
        A dictionary with group IDs as keys and a list of CVEs as values.
    """
    with open(json_file, 'r') as file:
        data = json.load(file)

    cve_map = {}
    for group_id, details in data.items():
        # Flatten the CVEs associated with all hashes for each group
        cves = set()
        for item in details.get('hashes', []):
            cves.update(item.get('cves', []))
        cve_map[group_id] = cves
    
    return cve_map

In [11]:
def group_cves_by_threat_group(malpedia_cve_path: str) -> dict:
    """
    Groups CVEs under threat groups, considering only the URLs with source 'actor' or 'attribution'.
    
    Parameters
    ----------
    malpedia_cve_path : str
        Path to the JSON file containing Malpedia CVE data.
    
    Returns
    -------
    dict
        A dictionary with threat groups as keys and their associated CVEs as values.
    """
    with open(malpedia_cve_path, 'r') as file:
        data = json.load(file)
    
    grouped_cves = {}

    # Process the data to group CVEs
    for group in data:
        if group == "Unknown":
            continue  # Skip the "Unknown" group
        # Collect only URLs with 'actor' or 'attribution' source
        relevant_urls = [url['url'] for url in data[group].get('urls', []) 
                         if url['source'] in ['actor', 'attribution']]
        
        if relevant_urls:
            # Collect CVEs from the current group
            cves = set()
            for hash_entry in data[group].get('hashes', []):
                cves.update(hash_entry.get('cves', []))

            # Add the CVEs to the group if there are relevant URLs
            grouped_cves[group] = sorted(cves)

    return grouped_cves


In [12]:
mitre_cve_map_metadata = load_cves_from_json(mitre_cve_map_file)
union_map_metadata = load_json_metadata(updated_union_map_file)

In [13]:
malpedia_cve_map_metadata = group_cves_by_threat_group(malpedia_cve_map_file)

In [14]:
len(mitre_cve_map_metadata), len(malpedia_cve_map_metadata), len(union_map_metadata)

(86, 278, 331)

In [16]:
#{k: malpedia_cve_map_metadata[k] for k in list(malpedia_cve_map_metadata)[:2]}

In [17]:
def update_union_with_cves(union_map: dict, cve_map: dict) -> dict:
    """
    Updates the union map with attack CVEs from the CVE map. Adds new groups if they don't exist in the union.

    Parameters
    ----------
    union_map : dict
        Dictionary containing APT group metadata.
    cve_map : dict
        Dictionary mapping group IDs to sets of CVEs.

    Returns
    -------
    dict
        The updated union map.
    """
    for group_id, cves in cve_map.items():
        sorted_cves = sorted(cves)
        if group_id in union_map:
            union_map[group_id]["attack cve"] = sorted_cves
        else:
            print(f"New group added to union: {group_id}")
            union_map[group_id] = {
                "mitre name": "",
                "mitre alias": [],
                "malpedia name": "",
                "malpedia alias": [],
                "attack techniques": [],
                "technique name": [],
                "attack software": [],
                "malpedia techniques": [],
                "malpedia software": [],
                "attack cve": sorted_cves
            }
    
    # Ensure all existing groups in the union_map have the "attack cve" key, initialized as an empty list if missing
    for group_id in union_map:
        union_map[group_id].setdefault("attack cve", [])
    
    return union_map


In [18]:
union_map_metadata = update_union_with_cves(union_map_metadata, mitre_cve_map_metadata)


In [19]:
len(union_map_metadata)
#{k: union_map_metadata[k] for k in list(union_map_metadata)[:2]}

331

In [22]:
def update_union_map_with_malpedia_cves(union_map_metadata, malpedia_cve_metadata):
    """
    Updates enhanced_techniques_software_alias_map with Malpedia CVEs per group.

    Parameters
    ----------
    enhanced_techniques_software_alias_map : dict
        Dictionary mapping group IDs to their MITRE and Malpedia metadata.
    malpedia_cve_metadata : dict
        Dictionary containing CVEs per actor name from Malpedia.

    Returns
    -------
    dict
        Updated enhanced_techniques_software_alias_map including Malpedia CVEs.
    """

    count_cve = 0
    count_new_malpedia = 0

    next_mal_id = 1
    def get_next_id():
        nonlocal next_mal_id
        while True:
            gid = f"MAL{next_mal_id:03d}"
            next_mal_id += 1
            if gid not in union_map_metadata:
                return gid

    # Build normalized group name index for fast lookups
    group_name_index = {}
    for group_id, group_info in union_map_metadata.items():
        names = [
            normalize_group_name(group_info.get('mitre name', '')),
            normalize_group_name(group_info.get('malpedia name', ''))
        ] + [
            normalize_group_name(alias) for alias in group_info.get('mitre alias', []) + group_info.get('malpedia alias', [])
        ]
        for name in names:
            group_name_index[name] = group_id

        # Ensure empty lists exist for keys
        union_map_metadata[group_id].setdefault('malpedia cve', [])

    # Process CVEs
    for malpedia_group, cve_list in malpedia_cve_metadata.items():
        norm_name = normalize_group_name(malpedia_group)
        group_id = group_name_index.get(norm_name)

        if group_id:
            union_map_metadata[group_id]['malpedia cve'] = cve_list
            count_cve += 1
        else:
            # Add as new entry similar to techniques and software block
            new_entry = {
                "mitre name": "",
                "mitre alias": [],
                "malpedia name": malpedia_group,
                "malpedia alias": [],
                "attack techniques": [],
                "technique name": [],
                "attack software": [],
                "malpedia techniques": [],
                "malpedia software": [],
                "malpedia cve": cve_list
            }
            new_group_id = get_next_id()
            union_map_metadata[new_group_id] = new_entry
            group_name_index[norm_name] = new_group_id
            count_new_malpedia += 1  # Increment since we're adding a new one now

    print(f"Total matches with Malpedia CVEs: {count_cve}")
    print(f"Total number of newly added Malpedia groups: {count_new_malpedia}")

    return union_map_metadata


In [23]:
final_metadata = update_union_map_with_malpedia_cves(union_map_metadata, malpedia_cve_map_metadata )

Total matches with Malpedia CVEs: 191
Total number of newly added Malpedia groups: 87


In [24]:
len(final_metadata)

418

In [25]:
def dump_to_json(data, file_path):
    """
    Dumps data to a JSON file in the specified output folder.

    Parameters
    ----------
    data : dict
        The data to be written to the JSON file.
    output_folder : str
        The folder where the JSON file will be saved.
    filename : str
        The name of the output JSON file (including .json extension).

    Returns
    -------
    bool
        True if the data was successfully written to the file, False otherwise.
    """

    try:
        with open(file_path, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=4)
        return True
    except Exception as e:
        print(f"Error writing to file {file_path}: {e}")
        return False


In [27]:
base_dir = os.getcwd()
json_output_data_dir = os.path.join(base_dir, "intermediate_json_outputs")
# Ensure the output folder exists
os.makedirs(json_output_data_dir, exist_ok=True)
filename = "malpedia_attack_group_software_cve_profile.json"
# Full path to the output file
file_path = os.path.join(json_output_data_dir, filename)

In [28]:
dump_to_json(final_metadata, file_path)

True

In [54]:
def find_unmapped_groups(malpedia_cve_map: dict, union_map_metadata: dict) -> dict:
    """
    Checks for groups in malpedia_cve_map that have CVEs but are not present in the union_map_metadata.

    Parameters
    ----------
    malpedia_cve_map : dict
        Dictionary mapping group names to sets of CVEs from Malpedia.
    union_map_metadata : dict
        Dictionary containing APT group metadata from Union Map.

    Returns
    -------
    dict
        A dictionary containing groups from malpedia_cve_map that are not in union_map_metadata.
    """
    unmapped_groups = {}

    # Iterate over each group in malpedia_cve_map
    for malpedia_name, cves in malpedia_cve_map.items():
        normalized_malpedia_name = normalize_group_name(malpedia_name)
        found_match = False
        
        # Check if this group is found in union_map_metadata
        for group_id, group_data in union_map_metadata.items():
            # Normalize group names from union map for comparison
            mitre_name = normalize_group_name(group_data.get("mitre name", ""))
            malpedia_names = [normalize_group_name(name) for name in group_data.get("malpedia alias", [])]
            malpedia_names.append(normalize_group_name(group_data.get("malpedia name", "")))
            
            # If match is found
            if normalized_malpedia_name == mitre_name or normalized_malpedia_name in malpedia_names:
                found_match = True
                break
        
        if not found_match:
            # If no match found in union_map_metadata, add to unmapped_groups
            unmapped_groups[malpedia_name] = cves

    return unmapped_groups


In [56]:
len(unmapped_groups)

115

In [None]:
def compare_apt_groups_detailed(techniques_map, software_map, cve_map, group_ids):
    """
    Compare APT groups based on their techniques, software, and CVEs.
    Provides detailed differences including common, unique, and union, as well as the Jaccard Index.
    
    Parameters:
    ----------
    techniques_map : dict
        A mapping of APT group IDs to techniques used.
    software_map : dict
        A mapping of APT group IDs to software used.
    cve_map : dict
        A mapping of APT group IDs to CVEs.
    group_ids : list
        A list of APT group IDs to compare.

    Returns:
    -------
    dict
        A dictionary containing:
        - common_techniques: Common techniques used by all the selected groups.
        - common_software: Common software used by all the selected groups.
        - common_cves: Common CVEs between all the selected groups.
        - unique_techniques: Unique techniques for each group.
        - unique_software: Unique software for each group.
        - unique_cves: Unique CVEs for each group.
        - union_techniques: Union of all techniques used by the selected groups.
        - union_software: Union of all software used by the selected groups.
        - union_cves: Union of all CVEs used by the selected groups.
        - jaccard_index_techniques: Jaccard Index for techniques.
        - jaccard_index_software: Jaccard Index for software.
        - jaccard_index_cves: Jaccard Index for CVEs.
    """
    common_techniques = None
    common_software = None
    common_cves = None
    unique_techniques = {}
    unique_software = {}
    unique_cves = {}

    # To keep track of all techniques, software, and CVEs for union and jaccard index
    all_techniques_sets = []
    all_software_sets = []
    all_cves_sets = []

    # Loop over the selected group IDs
    for group_id in group_ids:
        if group_id not in techniques_map or group_id not in software_map:
            raise ValueError(f"Group ID {group_id} not found in techniques_map or software_map")

        # Get the techniques and software for the current group
        group_techniques = set(techniques_map.get(group_id, {}).get('items', []))
        group_software = set(software_map.get(group_id, {}).get('items', []))

        # Handle cases where a group might not have CVEs
        group_cves = cve_map.get(group_id, set())  # Default to an empty set if not found

        # Update common_techniques, common_software, and common_cves by intersecting with the first group
        if common_techniques is None:
            common_techniques = group_techniques
        else:
            common_techniques &= group_techniques

        if common_software is None:
            common_software = group_software
        else:
            common_software &= group_software

        if common_cves is None:
            common_cves = group_cves
        else:
            common_cves &= group_cves

        # Collect unique techniques, software, and CVEs for the current group
        unique_techniques[group_id] = group_techniques - common_techniques
        unique_software[group_id] = group_software - common_software
        unique_cves[group_id] = group_cves - common_cves

        # Collect sets of techniques, software, and CVEs for Jaccard index and union
        all_techniques_sets.append(group_techniques)
        all_software_sets.append(group_software)
        all_cves_sets.append(group_cves)

    # Union of all techniques, software, and CVEs
    union_techniques = set.union(*all_techniques_sets)
    union_software = set.union(*all_software_sets)
    union_cves = set.union(*all_cves_sets)

    # Jaccard Index for techniques, software, and CVEs
    jaccard_index_techniques = len(common_techniques) / len(union_techniques) if len(union_techniques) > 0 else 0
    jaccard_index_software = len(common_software) / len(union_software) if len(union_software) > 0 else 0
    jaccard_index_cves = len(common_cves) / len(union_cves) if len(union_cves) > 0 else 0

    # Prepare the result
    result = {
        'common_techniques': list(common_techniques),
        'common_software': list(common_software),
        'common_cves': list(common_cves),
        'unique_techniques': unique_techniques,
        'unique_software': unique_software,
        'unique_cves': unique_cves,
        'union_techniques': list(union_techniques),
        'union_software': list(union_software),
        'union_cves': list(union_cves),
        'jaccard_index_techniques': jaccard_index_techniques,
        'jaccard_index_software': jaccard_index_software,
        'jaccard_index_cves': jaccard_index_cves
    }

    return result


In [None]:
malpedia_mitre_map = {
    'apt17': ['G0025', 'G0001'],
    'apt19': ['G0073', 'G0009'],
    'apt30': ['G0013', 'G0030'],
    'lazarus': ['G0082', 'G0138', 'G0032'],
    'apt41': ['G0096', 'G0044'],
    'earth lusca': ['G0143', 'G1006'],
    'fin7': ['G0008', 'G0046'],
    'dragonok': ['G0017', 'G0002'],
    'mustang panda': ['G1014', 'G0129']
}

In [None]:
## add the techniques_map and software_map from combining group profile and reading mitre excel datasheet

for actor, group_ids in malpedia_mitre_map.items():
    try:
        result = compare_apt_groups_detailed(techniques_map, software_map, cve_map, group_ids)

        print(f"Comparison for {actor} ({', '.join(group_ids)})")
        print(f"  - Common Techniques: {len(result['common_techniques'])}")
        print(f"  - Common Software: {len(result['common_software'])}")
        print(f"  - Common CVEs: {len(result['common_cves'])}")
        print(f"  - Jaccard (Techniques): {result['jaccard_index_techniques']:.2f}")
        print(f"  - Jaccard (Software): {result['jaccard_index_software']:.2f}")
        print(f"  - Jaccard (CVEs): {result['jaccard_index_cves']:.2f}")

    except ValueError as e:
        print(f"Skipping {actor} due to error: {e}")