In [1]:
import json
import os
from pathlib import Path
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig

import torch

def load_data(input_file: str) -> list:
    """
    Load JSON data from input file.
    """
    try:
        with open(input_file, 'r', encoding='utf-8') as f:
            data = json.load(f)
        print(f"✅ Loaded {len(data)} records from {input_file}")
        return data
    except Exception as e:
        print(f"❌ Error loading {input_file}: {e}")
        return []

data_path ='../data/processed/merged_threat_intelligence.json'
data = load_data(data_path)
data[:10]

✅ Loaded 427 records from ../data/processed/merged_threat_intelligence.json


[{'title': 'FortiGuard Labs Threat Research',
  'content': 'FortiGuard Labs Threat Research FortiGuard Labs analyzes NailaoLocker ransomware, a unique variant using SM2 encryption and a built-in decryption function. Learn how it works, why it matters, and how Fortinet protects against it. ByKuan-Yen LiuandYen-Ting LeeJuly 18, 2025 FortiGuard Labs Threat Research FortiCNAPP Composite Alerts link weak signals into clear timelines—helping security teams detect cloud-native threats earlier and triage them faster. ByDavid AdamsonJuly 17, 2025 FortiGuard Labs Threat Research FortiCNAPP Labs uncovers Lcrypt0rx, a likely AI-generated ransomware variant used in updated H2Miner campaigns targeting cloud resources for Monero mining. ByAkshat PradhanJuly 16, 2025 FortiGuard Labs Threat Research Discover how FortiSandbox 5.0 detects Dark 101 ransomware, even with sandbox evasion tactics. Learn how advanced behavioral analysis blocks file encryption, system tampering, and ransom note deployment. ByB

In [2]:
# Device detection with MPS support for Apple Silicon
import gc

if torch.backends.mps.is_available():
    device = "mps"  # Apple Silicon GPU (M1/M2/M3)
elif torch.cuda.is_available():
    device = "cuda"  # NVIDIA GPU
else:
    device = "cpu"

print(f"Using device: {device}")
print(f'Torch version: {torch.__version__}')

# Clear memory based on device
if device == "cuda":
    torch.cuda.empty_cache()
    torch.cuda.reset_peak_memory_stats()
elif device == "mps":
    gc.collect()
    if hasattr(torch.mps, 'empty_cache'):
        torch.mps.empty_cache()
elif device == "cpu":
    gc.collect()


Using device: cuda
Torch version: 2.7.1+cu128


In [3]:
import torch
import os
from dotenv import load_dotenv
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline

# Load environment variables
load_dotenv()

# Get configuration from environment
HF_TOKEN = os.getenv('HF_TOKEN')
DEFAULT_MODEL = os.getenv('DEFAULT_MODEL', 'Qwen/Qwen2.5-1.5B-Instruct')
FALLBACK_MODEL = os.getenv('FALLBACK_MODEL', 'gpt2')

if not HF_TOKEN:
    print("⚠️  Warning: HF_TOKEN not found in .env file")
    print("   Create .env file with: HF_TOKEN=your_token_here")

# Thiết lập device tự động
device = (
    "cuda" if torch.cuda.is_available()
    else "mps" if torch.backends.mps.is_available()
    else "cpu"
)

def setup_model(model_name: str = None, hf_token: str = None):
    """
    Tải model từ Hugging Face với token từ environment variables.
    """
    model_name = model_name or DEFAULT_MODEL
    hf_token = hf_token or HF_TOKEN
    
    print(f"🤖 Đang tải mô hình: {model_name}")
    print(f"📱 Thiết bị: {device.upper()}")
    print(f"🔑 Token: {'✅ Found' if hf_token else '❌ Missing'}")

    try:
        # Load tokenizer
        tokenizer = AutoTokenizer.from_pretrained(
            model_name,
            token=hf_token,
            trust_remote_code=True
        )
        tokenizer.pad_token = tokenizer.pad_token or tokenizer.eos_token

        # Thiết lập kiểu dữ liệu và bản đồ thiết bị
        torch_dtype = torch.float16 if device == "cuda" else torch.float32
        device_map = "auto" if device == "cuda" else None

        # Load model
        model = AutoModelForCausalLM.from_pretrained(
            model_name,
            token=hf_token,
            trust_remote_code=True,
            torch_dtype=torch_dtype,
            device_map=device_map,
            use_cache=False
        )

        if device_map is None and device in ["mps", "cuda"]:
            model.to(device)

        if device_map is None:
            # Nếu không sử dụng device_map="auto", có thể chỉ định device
            pipe = pipeline(
                "text-generation",
                model=model,
                tokenizer=tokenizer,
                device=0 if device != "cpu" else -1,
                torch_dtype=torch_dtype,
                model_kwargs={"use_cache": False}
            )
        else:
            # Nếu sử dụng device_map="auto", không chỉ định device
            pipe = pipeline(
                "text-generation",
                model=model,
                tokenizer=tokenizer,
                torch_dtype=torch_dtype,
                model_kwargs={"use_cache": False}
            )

        print(f"✅ Đã tải thành công {model_name} trên {device.upper()}")
        return pipe

    except Exception as e:
        print(f"❌ Lỗi khi tải {model_name}: {e}")
        return setup_fallback_model(hf_token)


def setup_fallback_model(hf_token: str = None):
    """
    Tải fallback model nếu model chính lỗi.
    """
    fallback_name = FALLBACK_MODEL
    hf_token = hf_token or HF_TOKEN
    print(f"🔄 Đang tải mô hình dự phòng: {fallback_name}")

    try:
        tokenizer = AutoTokenizer.from_pretrained(fallback_name, token=hf_token)
        tokenizer.pad_token = tokenizer.pad_token or tokenizer.eos_token

        model = AutoModelForCausalLM.from_pretrained(
            fallback_name,
            token=hf_token,
            torch_dtype=torch.float32,
            use_cache=False
        )

        if device in ["cuda", "mps"]:
            model.to(device)

        pipe = pipeline(
            "text-generation",
            model=model,
            tokenizer=tokenizer,
            device=0 if device != "cpu" else -1,
            model_kwargs={"use_cache": False}
        )

        print(f"✅ {FALLBACK_MODEL} đã sẵn sàng trên {device.upper()}")
        return pipe

    except Exception as e:
        print(f"❌ Lỗi khi tải {FALLBACK_MODEL} fallback: {e}")
        return None


# Load model with environment token
model = setup_model()

🤖 Đang tải mô hình: Qwen/Qwen2.5-1.5B-Instruct
📱 Thiết bị: CUDA
🔑 Token: ✅ Found


Device set to use cuda:0


✅ Đã tải thành công Qwen/Qwen2.5-1.5B-Instruct trên CUDA


In [4]:
def create_prompt(title: str, content: str) -> str:
    """
    Create a prompt for threat intelligence classification with enhanced few-shot learning.
    """
    # Truncate content to 2000 characters
    content_truncated = (content[:2000] if content else "").replace('\n', ' ')

    prompt = f"""Determine if this is a threat intelligence related to cyberattacks, vulnerability exploits, malware, etc. based on the input. Please output 'yes' or 'no' and, if yes, output the object of the input on the next line in the format: [Entity Type]: [Name], where [Entity Type] is one of Malware, Vulnerability, Tool, Technique, or Actor, and [Name] is the specific threat name or identifier.

    EXAMPLES:
    INPUT: Lancefly: Group Uses Custom Backdoor to Target Orgs in Government, Aviation, Other Sectors. Merdoor backdoor is low prevalence and used in highly targeted attacks. Lancefly’s custom malware, which we have dubbed Merdoor, is a powerful backdoor that appears to have existed since 2018.
    OUTPUT: yes
    Malware: Merdoor

    INPUT: A smorgasbord for June’s Patch Tuesday. Between its own fixes and a slew of FYI alerts covering Edge, GitHub, and Autodesk, Microsoft piles 94 CVEs onto sysadmins’ plates; Fortinet and Adobe also at the table.
    OUTPUT: no

    INPUT: APT28 Deploys New Malware Zebrocy. The APT28 group has been observed using a new malware called Zebrocy to target government institutions.
    OUTPUT: yes
    Malware: Zebrocy

    INPUT: New Vulnerability in Apache Server. A critical vulnerability CVE-2023-1234 in Apache Server allows remote code execution.
    OUTPUT: yes
    Vulnerability: CVE-2023-1234

    INPUT: PowerShell Used in Recent Attacks. Attackers are leveraging PowerShell to execute malicious scripts.
    OUTPUT: yes
    Tool: PowerShell

    INPUT: Improving Cloud Intrusion Detection and Triage with FortiCNAPP Composite Alerts. FortiCNAPP helps detect intrusions in cloud environments.
    OUTPUT: yes
    Tool: FortiCNAPP

    INPUT: Emotet Returns with New Techniques. Emotet malware has re-emerged using advanced phishing techniques.
    OUTPUT: yes
    Malware: Emotet

    INPUT: Log4Shell Vulnerability Exploited. The Log4Shell vulnerability CVE-2021-44228 is actively exploited in the wild.
    OUTPUT: yes
    Vulnerability: CVE-2021-44228

    INPUT: Cobalt Strike Used in Targeted Attacks. The Cobalt Strike framework is being deployed by attackers.
    OUTPUT: yes
    Tool: Cobalt Strike

    INPUT: Lazarus Group Targets Crypto Firms. The Lazarus Group, a North Korean APT, has launched new campaigns.
    OUTPUT: yes
    Actor: Lazarus Group

    INPUT: {content_truncated}
    OUTPUT:"""

    return prompt
create_prompt(data[0]['title'], data[0]['content'])

"Determine if this is a threat intelligence related to cyberattacks, vulnerability exploits, malware, etc. based on the input. Please output 'yes' or 'no' and, if yes, output the object of the input on the next line in the format: [Entity Type]: [Name], where [Entity Type] is one of Malware, Vulnerability, Tool, Technique, or Actor, and [Name] is the specific threat name or identifier.\n\n    EXAMPLES:\n    INPUT: Lancefly: Group Uses Custom Backdoor to Target Orgs in Government, Aviation, Other Sectors. Merdoor backdoor is low prevalence and used in highly targeted attacks. Lancefly’s custom malware, which we have dubbed Merdoor, is a powerful backdoor that appears to have existed since 2018.\n    OUTPUT: yes\n    Malware: Merdoor\n\n    INPUT: A smorgasbord for June’s Patch Tuesday. Between its own fixes and a slew of FYI alerts covering Edge, GitHub, and Autodesk, Microsoft piles 94 CVEs onto sysadmins’ plates; Fortinet and Adobe also at the table.\n    OUTPUT: no\n\n    INPUT: APT2

In [5]:
def classify_article(pipe, title: str, content: str) -> dict:
    """
    Classify a single article using the model with improved output parsing.
    """
    try:
        prompt = create_prompt(title, content)

        # Generate response
        response = pipe(
            prompt,
            max_new_tokens=200,
            do_sample=False,
            pad_token_id=pipe.tokenizer.eos_token_id,
        )

        # Extract the generated text (remove the prompt)
        generated_text = response[0]['generated_text']
        answer = generated_text[len(prompt):].strip()

        # Parse the response
        lines = [line.strip() for line in answer.splitlines() if line.strip()]

        # Determine if it's a threat report
        is_threat = False
        main_object = ""

        if lines:
            first_line = lines[0].lower().strip()
            is_threat = any(yes_word in first_line for yes_word in ["yes", "true", "1"])

            # Extract main object from the response
            if is_threat and len(lines) > 1:
                # Look for the object in the next line after "yes"
                for i in range(1, len(lines)):
                    line = lines[i].strip()
                    if line and ":" in line and not line.startswith("output:") and not line.startswith("input:"):
                        # Assume the line contains [Entity Type]: [Name]
                        main_object = line
                        break
                if not main_object:
                    # If no object found in next line, check if it's combined with "yes" in first line
                    parts = first_line.split()
                    for part in parts:
                        if ":" in part:
                            main_object = part
                            break

            print(f'Is threat: {is_threat}\tMain object: {main_object}')

        return {
            "is_threat_report": is_threat,
            "main_object": main_object
        }

    except Exception as e:
        print(f"❌ Error classifying article '{title[:50]}...': {e}")
        return {
            "is_threat_report": False,
            "main_object": ""
        }

# test_classify_first_record = classify_article(gpt2, data[0]['title'], data[0]['content'])

test_classify_first_record = classify_article(model, data[1]['title'], data[1]['content'])
print(test_classify_first_record)

The following generation flags are not valid and may be ignored: ['temperature', 'top_p', 'top_k']. Set `TRANSFORMERS_VERBOSITY=info` for more details.


Is threat: True	Main object: Malware: NailaoLocker
{'is_threat_report': True, 'main_object': 'Malware: NailaoLocker'}


In [6]:
def process_data(data: list, pipe: pipeline, classify_fn: callable = classify_article) -> list:
    """
    Process all articles in the dataset.
    """
    print(f"🔍 Processing {len(data)} articles...")

    results = []

    for i, item in enumerate(data):
        print(f"Processing {i+1}/{len(data)}: {item.get('title', 'Unknown')[:50]}...")

        title = item.get('title', '')
        link = item.get('link', '')
        content = item.get('content', '')

        # Classify the article
        classification = classify_fn(pipe, title, content)

        # Create result object
        result = {
            "title": title,
            "link": link,
            "is_threat_report": classification["is_threat_report"],
            "main_object": classification["main_object"]
        }

        results.append(result)

        # Print progress every 10 items
        if (i + 1) % 10 == 0:
            print(f"  Processed {i+1}/{len(data)} articles")

    return results

process_data(data[:5], model)

The following generation flags are not valid and may be ignored: ['temperature', 'top_p', 'top_k']. Set `TRANSFORMERS_VERBOSITY=info` for more details.


🔍 Processing 5 articles...
Processing 1/5: FortiGuard Labs Threat Research...


The following generation flags are not valid and may be ignored: ['temperature', 'top_p', 'top_k']. Set `TRANSFORMERS_VERBOSITY=info` for more details.


Is threat: True	Main object: Actor: Threat actor
Processing 2/5: NailaoLocker Ransomware’s “Cheese”...


The following generation flags are not valid and may be ignored: ['temperature', 'top_p', 'top_k']. Set `TRANSFORMERS_VERBOSITY=info` for more details.


Is threat: True	Main object: Malware: NailaoLocker
Processing 3/5: Improving Cloud Intrusion Detection and Triage wit...


The following generation flags are not valid and may be ignored: ['temperature', 'top_p', 'top_k']. Set `TRANSFORMERS_VERBOSITY=info` for more details.


Is threat: True	Main object: Tool: FortiCNAPP
Processing 4/5: Old Miner, New Tricks...


The following generation flags are not valid and may be ignored: ['temperature', 'top_p', 'top_k']. Set `TRANSFORMERS_VERBOSITY=info` for more details.


Is threat: True	Main object: Tool: Cobalt Strike
Processing 5/5: How FortiSandbox 5.0 Detects Dark 101 Ransomware D...
Is threat: True	Main object: Tool: FortiSandbox


[{'title': 'FortiGuard Labs Threat Research',
  'link': 'https://www.fortinet.com/blog/threat-research',
  'is_threat_report': True,
  'main_object': 'Actor: Threat actor'},
 {'title': 'NailaoLocker Ransomware’s “Cheese”',
  'link': 'https://www.fortinet.com/blog/threat-research/nailaolocker-ransomware-cheese',
  'is_threat_report': True,
  'main_object': 'Malware: NailaoLocker'},
 {'title': 'Improving Cloud Intrusion Detection and Triage with FortiCNAPP Composite Alerts',
  'link': 'https://www.fortinet.com/blog/threat-research/improving-cloud-intrusion-detection-and-triage-with-forticnapp',
  'is_threat_report': True,
  'main_object': 'Tool: FortiCNAPP'},
 {'title': 'Old Miner, New Tricks',
  'link': 'https://www.fortinet.com/blog/threat-research/old-miner-new-tricks',
  'is_threat_report': True,
  'main_object': 'Tool: Cobalt Strike'},
 {'title': 'How FortiSandbox 5.0 Detects Dark 101 Ransomware Despite Evasion Techniques',
  'link': 'https://www.fortinet.com/blog/threat-research/fo

In [7]:
def save_results(results: list, output_file: str = "classified.json"):
    """
    Save classification results to JSON file with automatic directory creation.
    """
    try:
        # Create directory if it doesn't exist
        output_path = Path(output_file)
        output_path.parent.mkdir(parents=True, exist_ok=True)
        
        # Convert to absolute path to avoid relative path issues
        absolute_path = output_path.resolve()
        
        print(f"💾 Saving to: {absolute_path}")
        
        with open(absolute_path, 'w', encoding='utf-8') as f:
            json.dump(results, f, ensure_ascii=False, indent=2)

        print(f"✅ Successfully saved results to {output_file}")

        # Print statistics
        threat_reports = sum(1 for r in results if r['is_threat_report'])
        non_threat_reports = len(results) - threat_reports

        print(f"\n📊 CLASSIFICATION STATISTICS:")
        print(f"Total articles: {len(results)}")
        print(f"Threat reports: {threat_reports}")
        print(f"Non-threat reports: {non_threat_reports}")
        print(f"Threat report percentage: {(threat_reports/len(results)*100):.1f}%")

        # Show some examples of threat reports
        threat_examples = [r for r in results if r['is_threat_report']][:5]
        if threat_examples:
            print(f"\n🔍 SAMPLE THREAT REPORTS:")
            for i, example in enumerate(threat_examples, 1):
                print(f"  {i}. {example['title'][:60]}...")
                print(f"     Main object: {example['main_object']}")

    except Exception as e:
        print(f"❌ Error saving results: {e}")
        print(f"   Attempted path: {output_file}")
        print(f"   Current working directory: {os.getcwd()}")
        print(f"   Absolute path would be: {Path(output_file).resolve()}")

In [None]:
import datetime

results = process_data(data, model)
today = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

output_path = f"../data/topic-classification/topic_classification_{os.getenv('DEFAULT_MODEL', 'Qwen/Qwen2.5-1.5B-Instruct')}_test_{today}.json"
save_results(results, output_path)

In [None]:
# revert