In [1]:
!wget -nc https://raw.githubusercontent.com/mitre/cti/master/enterprise-attack/enterprise-attack.json

File ‘enterprise-attack.json’ already there; not retrieving.



In [135]:
from bs4 import BeautifulSoup
from collections import defaultdict
from enum import Enum
from fake_useragent import UserAgent
import iocextract as ie
from mitreattack.stix20 import MitreAttackData
import nltk
import ollama
import pandas as pd
import pprint
from pydantic import BaseModel
import random
import re
import requests
import sqlite3
import time

In [143]:
nltk.download('punkt')
nltk.download('words')

[nltk_data] Downloading package punkt to /home/udp/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package words to /home/udp/nltk_data...
[nltk_data]   Unzipping corpora/words.zip.


True

In [4]:
URL1 = "https://www.welivesecurity.com/en/eset-research/deceptivedevelopment-targets-freelance-developers/"
URL2 = "https://research.checkpoint.com/2025/large-scale-exploitation-of-legacy-driver/"
URL3 = "https://www.sentinelone.com/blog/macos-adload-prolific-adware-pivots-just-days-after-apples-xprotect-clampdown/"

In [5]:
# Create sqlite database
conn = sqlite3.connect("reports.db")
cur = conn.cursor()

In [6]:
with open("schema.sql") as f:
    try:
        cur.executescript(f.read())
    except Exception as e:
        print(f"error applying schema: {e}")

In [7]:
class ReportType(Enum):
    HTML = 1
    PDF = 2
    TEXT = 3

In [8]:
class ReportSource(Enum):
    UNKNOWN = 0

In [9]:
class Answer(BaseModel):
    answer: bool

In [10]:
def ioc_validation_prompt(ioc, ioc_type, context):
    return f"""
    Based on the following text, answer "true" or "false". Is {ioc} an {ioc_type}?

    -----------
    {context}
    """

def ioc_relevance_prompt(ioc, ioc_type, context):
    return f"""
    Based on the following text, answer "true" or "false". Is the {ioc_type} {ioc} an indicator of compromise?

    -----------
    {context}
    """

In [166]:
class Report:
    cve_re = re.compile(r"\bCVE-\d{4}-\d{4,}\b", re.IGNORECASE)

    ipv4s = set()
    ipv6s = set()
    sha256s = set()
    md5s = set()
    sha1s = set()
    domains = set()
    yara_rules = set()
    cves = set()
    mitre = defaultdict(set)

    def parse_mitre(self):
        def get_external_ids(elems):
            return [r.external_id for e in elems for r in e.external_references if r.source_name == 'mitre-attack']
        
        def get_names(elems):
            return [e.name for e in elems]

        # Load mitre attack data
        mitre_attack_data = MitreAttackData("enterprise-attack.json")

        tactics = mitre_attack_data.get_tactics()
        techniques = mitre_attack_data.get_techniques()
        groups = mitre_attack_data.get_groups()
        software = mitre_attack_data.get_software()
        campaigns = mitre_attack_data.get_campaigns()
        datasources = mitre_attack_data.get_datasources()

        tactics_ids = set(get_external_ids(tactics))
        techniques_ids = set(get_external_ids(techniques))
        groups_names = set(get_names(groups))
        groups_ids = set(get_external_ids(groups))
        software_names = set(get_names(software))
        software_ids = set(get_external_ids(software))
        campaign_names = set(get_names(campaigns))
        campaign_ids = set(get_external_ids(campaigns))
        datasources_ids = set(get_external_ids(datasources))

        mitre_data = {
            "tactics": tactics_ids,
            "techniques": techniques_ids,
            "mitigations": mitigations_ids,
            "group_names": groups_names,
            "group_ids": groups_ids,
            "software_ids": software_ids,
            "campaign_names": campaign_names,
            "campaign_ids": campaign_ids,
            "datasources": datasources_ids,
        }

        # Tokenize contents
        words = set(nltk.corpus.words.words())
        tokens = set(nltk.tokenize.word_tokenize(self.body)).difference(words)
        for key, values in mitre_data.items():
            self.mitre[key] = tokens.intersection(values)
        
    def __init__(self, contents, title, body, report_type, source, url=None):
        self.contents = contents # original contents
        self.title = title
        self.body = body
        self.url = url
        self.report_type = report_type
        self.source = source

        # Derived
        self.upload_date = time.time()
        self.creation_date = time.time()

    @classmethod
    def from_url(cls, url, source):
        headers = {
            "User-Agent": UserAgent().random,
        }
        resp = requests.get(url, headers=headers)
        resp.raise_for_status()

        soup = BeautifulSoup(resp.text, 'html.parser')
        body = soup.find('body')
        title = soup.find('title')

        return Report(resp.text, title.text, body.text, ReportType.HTML, source, url=url)

    def parse_iocs(self):
        self.domains = set(ie.extract_urls(self.body))
        self.ipv4s = set(ie.extract_ipv4s(self.body))
        self.ipv6s = set(ie.extract_ipv6s(self.body))
        self.cves = self.cve_re.findall(self.body)
        self.sha1s = set(ie.extract_sha1_hashes(self.body))
        self.md5s = set(ie.extract_md5_hashes(self.body))
        self.sha256s = set(ie.extract_sha256_hashes(self.body))

In [170]:
r1 = Report.from_url(URL1, source=ReportSource.UNKNOWN)
r1.parse_iocs()

In [171]:
r2 = Report.from_url(URL2, source=ReportSource.UNKNOWN)
r2.parse_iocs()

In [172]:
r3 = Report.from_url(URL3, source=ReportSource.UNKNOWN)
r3.parse_iocs()

In [168]:
r1.parse_mitre()

In [173]:
r1.domains

{'135.125.248[.]56',
 '147.124.214[.]129',
 '147.124.214[.]237',
 '185.235.241[.]208',
 '23.106.253[.]194',
 '45.61.131[.]218',
 '67.203.7[.]171',
 '95.164.17[.]24',
 'http://<C&C_IP>:<C&C_port>/anydesk.exe',
 'http://ip-api.com/json',
 'ipcheck[.]cloud',
 'mirotalk[.]net'}

In [174]:
ioc = 'http://<C&C_IP>:<C&C_port>/anydesk.exe'
idx = r1.body.index(ioc)
context = r1.body[idx-1000:idx+len(ioc)+1000]
context

'they need to be decrypted before exfiltration. The encryption keys used for this are obtained based on the operating system in use. On Windows, they are extracted from the browser’s Local\xa0State file, on Linux they are obtained through the secretstorage package, and on macOS they are obtained through the security utility, as illustrated in Figure 15.\n\nFigure 15. Extracting the encryption keys for browser databases on Windows, Linux, and macOS\n\nThe collected information (see Figure 16) is then sent to the C&C server via an HTTP POST request to the /keys API endpoint.\n\nFigure 16. Information submitted by the browser module to the C&C server\n\nAnyDesk module\nThe adc module is the only persistence mechanism found in this compromise chain, setting up AnyDesk access to the victim’s computer using a configuration file containing hardcoded login credentials.\nOn Windows, it checks whether the C:/Program Files (x86)/AnyDesk/AnyDesk.exe exists. If\xa0not, it downloads anydesk.exe from

In [175]:
%%time
resp = ollama.chat(
    model='qwen2.5:14b',
    messages=[
        {
            'role': 'user',
            'content': ioc_validation_prompt(ioc, "URL", context),
        },
    ],
    format=Answer.model_json_schema(),
)
answer = Answer.model_validate_json(resp.message.content)
answer

CPU times: user 20.8 ms, sys: 4.12 ms, total: 25 ms
Wall time: 28.2 s


Answer(answer=True)

In [176]:
%%time
resp = ollama.chat(
    model='qwen2.5:14b',
    messages=[
        {
            'role': 'user',
            'content': ioc_relevance_prompt(ioc, "URL", context),
        },
    ],
    format=Answer.model_json_schema(),
)
answer = Answer.model_validate_json(resp.message.content)
answer

CPU times: user 6 ms, sys: 5.78 ms, total: 11.8 ms
Wall time: 19.7 s


Answer(answer=True)