In [25]:
import re
from transformers import pipeline
import PyPDF2

# Initialize Hugging Face's NER pipeline
ner_pipeline = pipeline("ner", model="dbmdz/bert-large-cased-finetuned-conll03-english", aggregation_strategy="simple")

# Placeholder for MITRE ATT&CK mappings
MITRE_TTPs = {
    # Tactics
    "Initial Access": "TA0001",
    "Execution": "TA0002",
    "Persistence": "TA0003",
    "Privilege Escalation": "TA0004",
    "Defense Evasion": "TA0005",
    "Credential Access": "TA0006",
    "Discovery": "TA0007",
    "Lateral Movement": "TA0008",
    "Collection": "TA0009",
    "Command and Control": "TA0011",
    "Exfiltration": "TA0010",
    "Impact": "TA0040",
    
    # Techniques
    "Spear Phishing Attachment": "T1566.001",
    "Spear Phishing Link": "T1566.002",
    "Exploitation of Remote Services": "T1210",
    "Valid Accounts": "T1078",
    "PowerShell": "T1059.001",
    "Windows Command Shell": "T1059.003",
    "Scheduled Task/Job": "T1053",
    "Registry Run Keys/Startup Folder": "T1547.001",
    "Boot or Logon Autostart Execution": "T1547",
    "Exploitation for Privilege Escalation": "T1068",
    "Masquerading": "T1036",
    "Obfuscated Files or Information": "T1027",
    "Impair Defenses": "T1562",
    "Indicator Removal on Host": "T1070",
    "Credential Dumping": "T1003",
    "Network Service Discovery": "T1046",
    "Remote File Copy": "T1105",
    "Remote Services": "T1021",
    "System Information Discovery": "T1082",
    "File and Directory Discovery": "T1083",
    "Data Compressed": "T1560",
    "Encrypted Channel": "T1573",
    "Web Service": "T1102",
    "Data Staged": "T1074",
    "Data from Local System": "T1005",
    "Inhibit System Recovery": "T1490",
    "System Shutdown/Reboot": "T1529",
    "Data Encrypted for Impact": "T1486"
}

# Function to extract text from a PDF
def extract_text_from_pdf(pdf_path):
    with open(pdf_path, 'rb') as file:
        reader = PyPDF2.PdfReader(file)
        text = ""
        for page in reader.pages:
            text += page.extract_text()
    return text

def extract_iocs(report):
    # Extract IP addresses
    ips = re.findall(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', report)
    
    # Extract domains and filter for only .com extensions
    domains = re.findall(r'(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}', report)
    com_domains = [domain for domain in domains if domain.endswith('.com')]
    
    return {"IP addresses": ips, "Domains": com_domains}


def extract_ttps(report):
    tactics = set()  # Use a set to avoid duplicates
    techniques = []

    # Define keyword-to-TTP mapping
    keywords_mapping = {
        # Tactics
        "initial access": ("Tactics", "Initial Access"),
        "execution": ("Tactics", "Execution"),
        "persistence": ("Tactics", "Persistence"),
        "privilege escalation": ("Tactics", "Privilege Escalation"),
        "defense evasion": ("Tactics", "Defense Evasion"),
        "discovery": ("Tactics", "Discovery"),
        "lateral movement": ("Tactics", "Lateral Movement"),
        "collection": ("Tactics", "Collection"),
        "command and control": ("Tactics", "Command and Control"),
        "exfiltration": ("Tactics", "Exfiltration"),
        "impact": ("Tactics", "Impact"),

        # Techniques
        "spear-phishing": ("Techniques", "Spear Phishing Attachment"),
        "phishing link": ("Techniques", "Spear Phishing Link"),
        "exploitation of remote services": ("Techniques", "Exploitation of Remote Services"),
        "valid accounts": ("Techniques", "Valid Accounts"),
        "powershell": ("Techniques", "PowerShell"),
        "command shell": ("Techniques", "Windows Command Shell"),
        "scheduled task": ("Techniques", "Scheduled Task/Job"),
        "registry run keys": ("Techniques", "Registry Run Keys/Startup Folder"),
        "remote file copy": ("Techniques", "Remote File Copy"),
        "network discovery": ("Techniques", "Network Service Discovery"),
        "data compressed": ("Techniques", "Data Compressed"),
    }

    # Add inferred tactic mapping (techniques to tactics)
    technique_to_tactic = {
        "Spear Phishing Attachment": "Initial Access",
        "PowerShell": "Execution",
        "Windows Command Shell": "Execution",
        "Remote File Copy": "Lateral Movement",
        "Network Service Discovery": "Discovery",
    }

    # Search for keywords in the report
    for keyword, (category, description) in keywords_mapping.items():
        if keyword in report.lower():
            if category == "Tactics":
                tactics.add((MITRE_TTPs[description], description))
            elif category == "Techniques":
                techniques.append((MITRE_TTPs[description], description))
                # Infer tactic from technique
                if description in technique_to_tactic:
                    inferred_tactic = technique_to_tactic[description]
                    tactics.add((MITRE_TTPs[inferred_tactic], inferred_tactic))

    # Convert sets to lists for output
    return {
        "Tactics": list(tactics),
        "Techniques": techniques
    }

def extract_entities(report):
    # Use the Hugging Face NER model to extract entities
    ner_results = ner_pipeline(report)
    
    threat_actors = set()
    targeted_entities = set()
    
    # Inspect and process each entity
    for entity in ner_results:
        if entity['entity_group'] == 'ORG':  # Organization (threat actors)
            if "APT" in entity['word'] or "group" in entity['word'].lower() or "actor" in entity['word'].lower():
                threat_actors.add(entity['word'])
        
        if entity['entity_group'] == 'LOC':  # Location (targeted entities, i.e., geopolitical entities)
            targeted_entities.add(entity['word'])
        
    return {
        "Threat Actor(s)": list(threat_actors),
        "Targeted Entities": list(targeted_entities)
    }

def extract_malware(report):
    # List to store malware details found in the report
    malware_details = []

    # Define known malware families (this list can be expanded over time)
    known_malware = {
        "Shamoon": {
            "md5": "5a105e8b9d40e1329780d62ea2265d8a",
            "sha1": "2fd4e1c67a2d28fced849ee1bb76e7391b93eb12",
            "sha256": "3a7bd3e2360a4dfafad47e17d0c4a3d7252e98d6e3c8c3f3ed0a792d8fa45b4b",
            "ssdeep": "3:ai6hn3nP:BBin",
            "TLSH": "54D15E7E17F9D81AF5E131BEFAE13E1A5E2F",
            "tags": "destructive, data_wiper"
        },
        # Add more known malware as needed
    }

    # Check for each known malware type in the report
    for malware_name, details in known_malware.items():
        if malware_name.lower() in report.lower():
            # Add the malware details to the result
            malware_details.append({
                "Name": malware_name,
                "md5": details["md5"],
                "sha1": details["sha1"],
                "sha256": details["sha256"],
                "ssdeep": details["ssdeep"],
                "TLSH": details["TLSH"],
                "tags": details["tags"]
            })

    # Return the list of detected malware details
    return malware_details

def extract_threat_intelligence(pdf_path):
    # Extract text from the PDF
    report_text = extract_text_from_pdf(pdf_path)

    # Extract IoCs, TTPs, Entities, and Malware from the report
    iocs = extract_iocs(report_text)
    ttps = extract_ttps(report_text)
    entities = extract_entities(report_text)
    malware = extract_malware(report_text)

    return {
        "IoCs": iocs,
        "TTPs": ttps,
        "Threat Actor(s)": entities["Threat Actor(s)"],
        "Malware": malware,
        "Targeted Entities": entities["Targeted Entities"]
    }

# Example PDF input path
pdf_path = '/Users/nischayverma/Downloads/C3i_HACKATHON_FINAL_ROUND_Q1_DATA/Mandiant_LIGHTSHOW-2-LIGHTSHIFT-and-LIGHTSHOW(03-09-2023).pdf'

# Run extraction on the PDF
result = extract_threat_intelligence(pdf_path)

# Print results
import json
print(json.dumps(result, indent=4))

Some weights of the model checkpoint at dbmdz/bert-large-cased-finetuned-conll03-english were not used when initializing BertForTokenClassification: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Device set to use mps:0


{
    "IoCs": {
        "IP addresses": [],
        "Domains": [
            "mandiant.com"
        ]
    },
    "TTPs": {
        "Tactics": [
            [
                "TA0002",
                "Execution"
            ]
        ],
        "Techniques": []
    },
    "Threat Actor(s)": [],
    "Malware": [],
    "Targeted Entities": [
        "North Korea"
    ]
}
