In [2]:
import pandas as pd
import json
from stix2 import MemoryStore, parse
# Install the stix2 library if you haven't already
# !pip install stix2

In [3]:
# Assuming you have the MITRE ATT&CK Enterprise STIX JSON file saved locally
# Replace 'path/to/enterprise-attack.json' with your file path
MITRE_JSON_PATH = 'enterprise-attack-17.1.json' 

def load_mitre_stix(file_path):
    """Loads the STIX JSON file and creates a searchable MemoryStore."""
    with open(file_path, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    # Create a MemoryStore from the STIX bundle objects
    ms = MemoryStore(stix_data=data['objects'])
    return ms

mitre_store = load_mitre_stix(MITRE_JSON_PATH)

In [8]:
import json
import os

# --- Configuration ---
# NOTE: Ensure this path is correct
MITRE_JSON_PATH = 'enterprise-attack-17.1.json' 

def load_and_extract_mitre_data_raw(file_path):
    """
    Loads the MITRE STIX JSON and extracts data using direct dictionary access 
    and Python filtering, bypassing the stix2.MemoryStore query model.
    """
    print(f"--- Loading and extracting data using raw JSON processing ---")
    
    # 1. Load the raw JSON file
    with open(file_path, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    if 'objects' not in data or not isinstance(data['objects'], list):
        raise ValueError("MITRE STIX JSON does not contain a valid 'objects' list.")

    # Convert the list of objects to a dictionary for fast lookups by STIX ID
    stix_objects = {obj['id']: obj for obj in data['objects']}
    
    mitre_documents = []
    
    # 2. Separate all Techniques and Mitigations
    techniques = [obj for obj in stix_objects.values() if obj.get('type') == 'attack-pattern']
    relationships = [obj for obj in stix_objects.values() if obj.get('type') == 'relationship' and obj.get('relationship_type') == 'mitigates']
    mitigation_objects = [obj for obj in stix_objects.values() if obj.get('type') == 'course-of-action']
    
    print(f"Found {len(techniques)} ATT&CK Techniques and {len(relationships)} Mitigation Relationships.")

    # 3. Process Techniques and link relevant mitigations
    for tech in techniques:
        tech_id = next((ref['external_id'] for ref in tech.get('external_references', []) if ref.get('source_name') == 'mitre-attack'), None)
        if not tech_id:
            continue # Skip if no MITRE ID found
            
        # Extract Tactic(s)
        tactics_list = [
            t['phase_name'].replace('-', ' ').title() 
            for t in tech.get('kill_chain_phases', []) 
            if t.get('kill_chain_name') == 'mitre-attack'
        ]

        core_text = (
            f"MITRE ATT&CK Technique ID: {tech_id}. "
            f"Name: {tech.get('name', 'N/A')}. "
            f"Tactic(s): {', '.join(tactics_list)}. "
            f"Description: {tech.get('description', 'No description available.')}"
        )
        
        mitigation_text = []
        
        # Find relationships where the current technique is the target
        relevant_rels = [rel for rel in relationships if rel.get('target_ref') == tech.get('id')]
        
        for rel in relevant_rels:
            # Look up the actual mitigation object using the relationship's source_ref
            mitigation_obj = stix_objects.get(rel.get('source_ref'))
            
            if mitigation_obj:
                mitigation_id = next(
                    (ref['external_id'] for ref in mitigation_obj.get('external_references', []) if ref.get('source_name') == 'mitre-attack'), 
                    'N/A'
                )
                mitigation_text.append(f" - Mitigation ({mitigation_id}): {mitigation_obj.get('description', 'No details.')}")

        if mitigation_text:
            core_text += "\n\n**MITIGATION STRATEGIES:**\n" + "\n".join(mitigation_text)

        # 4. Create the final structured RAG document
        mitre_documents.append({
            'text': core_text,
            'metadata': {
                'id': tech_id,
                'name': tech.get('name', 'N/A'),
                'source': 'MITRE-ATTACK',
                'type': 'Technique-Mitigation',
            }
        })
        
    print("Raw JSON processing complete.")
    return mitre_documents

# --- Execution ---
try:
    mitre_corpus = load_and_extract_mitre_data_raw(MITRE_JSON_PATH)

    if mitre_corpus:
        print(f"\nSuccessfully created RAG corpus with {len(mitre_corpus)} MITRE documents.")
        print("\n--- Sample of a Structured MITRE Document ---")
        
        # Find a sample document (e.g., one with known mitigations)
        # We search for any document that has the mitigation header for a good sample
        sample_doc = next(doc for doc in mitre_corpus if "MITIGATION STRATEGIES" in doc['text'])

        print(json.dumps(sample_doc, indent=2))
except FileNotFoundError:
    print(f"\nERROR: MITRE JSON file not found at {MITRE_JSON_PATH}.")
except Exception as e:
    print(f"\nAn unrecoverable error occurred during processing: {e}")


--- Loading and extracting data using raw JSON processing ---
Found 823 ATT&CK Techniques and 1421 Mitigation Relationships.
Raw JSON processing complete.

Successfully created RAG corpus with 823 MITRE documents.

--- Sample of a Structured MITRE Document ---
{
  "text": "MITRE ATT&CK Technique ID: T1055.011. Name: Extra Window Memory Injection. Tactic(s): Defense Evasion, Privilege Escalation. Description: Adversaries may inject malicious code into process via Extra Window Memory (EWM) in order to evade process-based defenses as well as possibly elevate privileges. EWM injection is a method of executing arbitrary code in the address space of a separate live process. \n\nBefore creating a window, graphical Windows-based processes must prescribe to or register a windows class, which stipulate appearance and behavior (via windows procedures, which are functions that handle input/output of data).(Citation: Microsoft Window Classes) Registration of new windows classes can include a reques

In [11]:
import pandas as pd
import json
import os
from stix2 import MemoryStore
from typing import List, Dict, Any

# --- CORRECTED LangChain Imports ---
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import SentenceTransformerEmbeddings
from langchain_core.documents import Document # <-- FIX: Moved to langchain_core
from langchain.text_splitter import RecursiveCharacterTextSplitter

# --- CONFIGURATION ---
CVE_CSV_PATH = "cve_raw_data.csv"
MITRE_JSON_PATH = "enterprise-attack-17.1.json"
VECTOR_DB_PATH = "./chroma_db_v_rag"
EMBEDDING_MODEL_NAME = "all-MiniLM-L6-v2" 

# --- 1. DATA UNIFICATION & FORMATTING ---

def load_and_process_data(cve_path: str, mitre_path: str) -> List[Document]:
    """Loads, formats, and merges the CVE and MITRE data into a unified list of Document objects."""
    
    # --- 1a. SIMULATED CVE DATA PROCESSING ---
    print("1a. Processing CVE data from CSV...")
    
    # --- SIMULATION START (Based on your image) ---
    raw_cve_data = {
        'outputs': [
            "CVE:CVE-2020-13909\nDescription:The Ignition component before 2.0.5 for Laravel mishandles globals. Mitigation: Update to 2.0.5. published:2020-06-07T20:15:19.140",
            "CVE:CVE-2021-3002\nDescription:See Panel 4.8.0 allows reflected XSS via the email parameter. Mitigation: Patch to 4.8.1. published:2021-01-01T19:15:11.077",
            "CVE:CVE-2024-55555\nDescription:Critical zero-day RCE in Windows Print Spooler. Mitigation: Disable the Spooler service. published:2024-10-15T00:00:00.000"
        ]
    }
    cve_df = pd.DataFrame(raw_cve_data)
    # --- SIMULATION END ---

    cve_corpus = []
    for index, row in cve_df.iterrows():
        raw_output = row['outputs']
        
        try:
            cve_id = raw_output.split('\n')[0].replace('CVE:', '').strip()
            description = raw_output.split('Description:')[1].split('published:')[0].strip()
            
            text_body = (
                f"VULNERABILITY ID: {cve_id}. "
                f"Full Context: {description}"
            )
            
            cve_corpus.append(Document(
                page_content=text_body,
                metadata={
                    'id': cve_id,
                    'source': 'CVE-DB',
                    'type': 'Vulnerability-Record',
                }
            ))
        except Exception:
            continue

    # --- 1b. SIMULATED MITRE DATA PROCESSING ---
    print(f"1b. Processing MITRE data from {mitre_path} (Simulated)...")
    
    # --- SIMULATION START ---
    # This represents the successful list of dicts from your MITRE JSON processor
    mitre_corpus_dicts = [
        {
            "text": "MITRE ATT&CK Technique ID: T1055.011. Name: Extra Window Memory Injection... MITIGATION STRATEGIES: - Mitigation (M1040): Behavior Prevention on Endpoint...",
            "metadata": {"id": "T1055.011", "source": "MITRE-ATTACK", "type": "Technique-Mitigation"}
        },
        {
            "text": "MITRE ATT&CK Technique ID: T1566.001. Name: Spearphishing Attachment. Tactic(s): Initial Access. Description: Adversaries may send a spearphishing attachment...",
            "metadata": {"id": "T1566.001", "source": "MITRE-ATTACK", "type": "Technique"}
        }
    ]
    mitre_corpus = [Document(page_content=d['text'], metadata=d['metadata']) for d in mitre_corpus_dicts]
    # --- SIMULATION END ---


    # --- 1c. Final Merge ---
    final_corpus = cve_corpus + mitre_corpus
    print(f"Total documents loaded for indexing: {len(final_corpus)}")
    return final_corpus

# --- 2. CHUNKING (TEXT SPLITTING) ---

def split_documents(documents: List[Document]) -> List[Document]:
    """Splits large documents into smaller, context-rich chunks."""
    print("2. Splitting documents into manageable chunks...")
    
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=900,         # Max size of each chunk
        chunk_overlap=150,      # Overlap ensures context is maintained across split
        length_function=len,
        separators=["\n\n", "\n", " ", ""] 
    )
    
    chunked_documents = text_splitter.split_documents(documents)
    
    print(f"Initial documents: {len(documents)}. Final chunks created: {len(chunked_documents)}")
    return chunked_documents

# --- 3. EMBEDDING AND INDEXING ---

def index_corpus(documents: List[Document], db_path: str, model_name: str):
    """Generates embeddings and indexes the chunks into ChromaDB."""
    print(f"3. Initializing Embedding Model: {model_name}...")
    
    # 3a. Initialize the embedding model
    embedding_function = SentenceTransformerEmbeddings(model_name=model_name)
    
    print(f"3b. Indexing {len(documents)} chunks into Vector DB at {db_path}...")
    
    # 3c. Create the Chroma vector store index
    Chroma.from_documents(
        documents=documents,
        embedding=embedding_function,
        persist_directory=db_path
    )
    
    print("\nâœ… PHASE 1 COMPLETE: Knowledge Base Indexed Successfully.")
    print(f"Vector Database stored locally at: {db_path}")

# --- MAIN EXECUTION FLOW ---
if __name__ == "__main__":
    try:
        # Step 1: Load and Unify
        corpus = load_and_process_data(CVE_CSV_PATH, MITRE_JSON_PATH)
        
        # Step 2: Chunking
        if corpus:
            chunks = split_documents(corpus)
            
            # Step 3: Embedding and Indexing
            index_corpus(chunks, VECTOR_DB_PATH, EMBEDDING_MODEL_NAME)
        else:
            print("ðŸ›‘ Cannot proceed: Corpus is empty after loading/processing.")
            
    except Exception as e:
        print(f"\n--- FATAL ERROR ---")
        print(f"An error prevented indexing: {e}")
        print("Please ensure all dependencies are installed and accessible.")


1a. Processing CVE data from CSV...
1b. Processing MITRE data from enterprise-attack-17.1.json (Simulated)...
Total documents loaded for indexing: 5
2. Splitting documents into manageable chunks...
Initial documents: 5. Final chunks created: 5
3. Initializing Embedding Model: all-MiniLM-L6-v2...


  embedding_function = SentenceTransformerEmbeddings(model_name=model_name)
  from .autonotebook import tqdm as notebook_tqdm
To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development
Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


3b. Indexing 5 chunks into Vector DB at ./chroma_db_v_rag...

âœ… PHASE 1 COMPLETE: Knowledge Base Indexed Successfully.
Vector Database stored locally at: ./chroma_db_v_rag
