In [1]:
import feedparser
import requests
import json
import os
import numpy as np
from datetime import datetime
from typing import List, Dict, Any
from dotenv import load_dotenv
from llama_index.embeddings.openai import OpenAIEmbedding, OpenAIEmbeddingModelType
from llama_index.llms.openai import OpenAI
from llama_index.core import VectorStoreIndex, Document, StorageContext
from llama_index.core import Settings
from constants import embed_model, llm_model, INDEX_PERSIST_PATH, ENHANCED_RAG_TEMPLATE, CISA_ICS_RSS_URL, MAX_ADVISORIES, REFINED_MITRE_PROMPT_TEMPLATE

In [2]:
def fetch_cisa_advisories() -> List[Dict[str, Any]]:
    """
    Fetch ICS security advisories from CISA RSS feed
    """
    try:
        # Parse RSS feed
        feed = feedparser.parse(CISA_ICS_RSS_URL)
        
        advisories = []
        for i, entry in enumerate(feed.entries[:MAX_ADVISORIES]):
            advisory = {
                'id': entry.get('id', f'advisory_{i}'),
                'title': entry.get('title', 'No Title'),
                'summary': entry.get('summary', 'No Summary'),
                'link': entry.get('link', ''),
                'published': entry.get('published', str(datetime.now())),
                'content': entry.get('summary', '') + ' ' + entry.get('title', '')
            }
            advisories.append(advisory)
            print(f"Fetched advisory {i+1}: {advisory['title']}")
            
        return advisories
        
    except Exception as e:
        print(f"Error fetching CISA advisories: {e}")
        return []

In [3]:
def create_mitre_embeddings(embed_model):
    """
    Create embeddings for all MITRE ATT&CK techniques
    """
    mitre_embeddings = {}
    with open("assets/mitre-ics.json", "r", encoding="utf-8") as f:
        mitre_data = json.load(f)

    for item in mitre_data:
        # Create searchable text combining name and description
        technique_text = f"{item['name']} {item['description']} {item['tactics']}"
        technique_id = item['Id']
        # Generate embedding
        embedding = embed_model.get_text_embedding(technique_text)
        mitre_embeddings[technique_id] = {
            'embedding': embedding,
            'text': technique_text,
            'details': item
        }
        print(f"Generated embedding for {technique_id}")
    
    return mitre_embeddings

In [4]:
def find_similar_mitre_techniques(advisory_content: str, mitre_embeddings: Dict, top_k: int = 5):
    """
    Find top-k most similar MITRE ATT&CK techniques using embedding similarity
    """
    # Generate embedding for advisory content
    advisory_embedding = embed_model.get_text_embedding(advisory_content)
    
    # Calculate cosine similarity with all MITRE techniques
    similarities = []
    
    for technique_id, data in mitre_embeddings.items():
        mitre_emb = np.array(data['embedding'])
        advisory_emb = np.array(advisory_embedding)
        
        # Cosine similarity
        cosine_sim = np.dot(mitre_emb, advisory_emb) / (
            np.linalg.norm(mitre_emb) * np.linalg.norm(advisory_emb)
        )
        
        similarities.append({
            'technique_id': technique_id,
            'similarity': float(cosine_sim),
            'details': data['details']
        })
    
    # Sort by similarity and return top-k
    similarities.sort(key=lambda x: x['similarity'], reverse=True)
    return similarities[:top_k]

In [5]:
def map_to_mitre_attack(advisory_content: str, mitre_embeddings=None) -> Dict[str, Any]:
    """
    Enhanced MITRE ATT&CK mapping using two-stage approach:
    1. Embedding-based similarity filtering
    2. LLM-based refined analysis
    """

    
    try:
        # Stage 1: Use embeddings to find top candidate techniques
        candidate_techniques = None
        if mitre_embeddings:
            print("Stage 1: Finding similar MITRE techniques using embeddings...")
            candidates = find_similar_mitre_techniques(
                advisory_content, mitre_embeddings, top_k=5
            )
            
            # Prepare candidate techniques for LLM analysis
            candidate_techniques = {
                cand['technique_id']: cand['details'] 
                for cand in candidates
            }
            
            print(f"Top candidates: {list(candidate_techniques.keys())}")
        
        # Stage 2: Use LLM for refined mapping on filtered candidates
        print("Stage 2: LLM-based refined analysis...")

        with open("assets/mitre-ics.json", "r", encoding="utf-8") as f:
            mitre_data = json.load(f)
        
        print("Loaded full MITRE ATT&CK data for LLM analysis.")
        
        # Use candidate techniques if available, otherwise full set
        techniques_to_analyze = candidate_techniques if candidate_techniques else mitre_data
        
        print("Techniques to analyze for LLM:")
        print(json.dumps(techniques_to_analyze, indent=2))

        
        # Enhanced prompt for refined analysis
        refined_prompt = REFINED_MITRE_PROMPT_TEMPLATE.format(
            advisory_content=advisory_content,
            techniques_to_analyze=json.dumps(techniques_to_analyze, indent=2)
        )

        print(f"Refined prompt for LLM:\n{refined_prompt}")
        
        response = llm_model.complete(refined_prompt)
        print(f"LLM response: {response.text}")

        mapping = json.loads(response.text)
        print(f"Final mapping: {mapping}")
        return mapping
        
    except Exception as e:
        print(f"Error in map_to_mitre_attack: {e}")
        return {"mapped_techniques": []}

In [None]:
def create_index(advisories, mitre_embeddings):
    """
    Create a new vector store index with CISA advisories
    """

    documents = []
    processed_advisories = []
    
    for advisory in advisories:
        # Map to MITRE ATT&CK techniques using enhanced two-stage approach
        mitre_mapping = map_to_mitre_attack(
            advisory['content'], 
            mitre_embeddings,
        )
        
        # Create enhanced content with MITRE mapping
        enhanced_content = f"""
        Title: {advisory['title']}
        Summary: {advisory['summary']}
        Published: {advisory['published']}
        Link: {advisory['link']}
        
        MITRE ATT&CK Mapping:
        Techniques: {', '.join(mitre_mapping.get('mapped_techniques', []))}
        Reasoning: {mitre_mapping.get('reasoning', 'N/A')}
        Confidence: {mitre_mapping.get('confidence', 'N/A')}
        
        Full Content: {advisory['content']}
        """

        Settings.chunk_size = 2048
        Settings.chunk_overlap = 100

        print(f"Enhanced Content***: {enhanced_content}")
        
        # Create document with metadata
        doc = Document(
            text=enhanced_content,
                metadata={
                    'id': advisory['id'],
                    'title': advisory['title'],
                    'published': advisory['published'],
                    'link': advisory['link'],
                    'mitre_techniques': mitre_mapping.get('mapped_techniques', []),
                    'confidence': mitre_mapping.get('confidence', 'N/A')
                }
        )
        
        documents.append(doc)
        
        # Store processed advisory data
        advisory_data = advisory.copy()
        advisory_data['mitre_mapping'] = mitre_mapping
        processed_advisories.append(advisory_data)
    
    print("Creating vector store index...")
    os.makedirs(INDEX_PERSIST_PATH, exist_ok=True)
    index = VectorStoreIndex.from_documents(documents, embed_model=embed_model)
    print("Index created successfully.")
    return index


In [7]:
advisories = fetch_cisa_advisories()


Fetched advisory 1: Hitachi Energy MSM
Fetched advisory 2: Hitachi Energy Relion 670/650 and SAM600-IO Series
Fetched advisory 3: Voltronic Power and PowerShield UPS Monitoring Software
Fetched advisory 4: FESTO Hardware Controller, Hardware Servo Press Kit
Fetched advisory 5: FESTO CODESYS
Fetched advisory 6: FESTO Automation Suite, FluidDraw, and Festo Didactic Products
Fetched advisory 7: FESTO Didactic CP, MPS 200, and MPS 400 Firmware
Fetched advisory 8: TrendMakers Sight Bulb Pro
Fetched advisory 9: Mitsubishi Electric Air Conditioning Systems
Fetched advisory 10: MICROSENS NMP Web+


In [8]:
mitre_embeddings = create_mitre_embeddings(embed_model)

Generated embedding for T0800
Generated embedding for T0830
Generated embedding for T0878
Generated embedding for T0802
Generated embedding for T0895
Generated embedding for T0803
Generated embedding for T0804
Generated embedding for T0805
Generated embedding for T0806
Generated embedding for T0892
Generated embedding for T0858
Generated embedding for T0807
Generated embedding for T0885
Generated embedding for T0884
Generated embedding for T0879
Generated embedding for T0809
Generated embedding for T0811
Generated embedding for T0893
Generated embedding for T0812
Generated embedding for T0813
Generated embedding for T0814
Generated embedding for T0815
Generated embedding for T0868
Generated embedding for T0816
Generated embedding for T0817
Generated embedding for T0871
Generated embedding for T0819
Generated embedding for T0820
Generated embedding for T0890
Generated embedding for T0866
Generated embedding for T0822
Generated embedding for T0823
Generated embedding for T0891
Generated 

In [2]:
result = REFINED_MITRE_PROMPT_TEMPLATE.format(
    advisory_content="Sample advisory content for testing",
    techniques_to_analyze="sample techniques to analyze")

print(result)


You are a cybersecurity expert analyzing an ICS (Industrial Control Systems) security advisory.
Your task is to map this advisory to the most relevant MITRE ATT&CK techniques.

IMPORTANT: You have been provided with pre-filtered candidate techniques that are most likely relevant.
Focus your analysis on these candidates, but you may also suggest if none are truly appropriate.

Advisory Content:
Sample advisory content for testing

Candidate MITRE ATT&CK Techniques (pre-filtered for relevance):
sample techniques to analyze

Instructions:
1. Analyze the advisory for attack vectors, vulnerabilities, and potential impacts
2. Map to 1-3 most relevant techniques from the candidates provided

Only respond with a valid JSON object in this exact format and nothig else
{
  "type": "object",
  "properties": {
    "mapped_techniques": {
      "type": "array",
      "description": "List of MITRE ATT&CK technique IDs that apply the MOST to the advisory. This could be zero, one or more techniques but

In [25]:
index = create_index(advisories, mitre_embeddings)

Stage 1: Finding similar MITRE techniques using embeddings...
Top candidates: ['T0862', 'T0883', 'T0817', 'T0866', 'T0819']
Stage 2: LLM-based refined analysis...
Loaded full MITRE ATT&CK data for LLM analysis.
Techniques to analyze for LLM:
{
  "T0862": {
    "Id": "T0862",
    "name": "Supply Chain Compromise",
    "description": "Adversaries may perform supply chain compromise to gain control systems environment access by means of infected products, software, and workflows. Supply chain compromise is the manipulation of products, such as devices or software, or their delivery mechanisms before receipt by the end consumer. Adversary compromise of these products and mechanisms is done for the goal of data or system compromise, once infected products are introduced to the target environment. \n\nSupply chain compromise can occur at all stages of the supply chain, from manipulation of development tools and environments to manipulation of developed products and tools distribution mechani

In [12]:
index.storage_context.persist("index/")

In [13]:
def _extract_advisory_from_text(document_text: str) -> Dict[str, Any]:
        """Extract all advisory data from document text using regex patterns"""
        import re
        
        data = {}
        
        # Extract title
        title_match = re.search(r'Title: (.*?)\n', document_text)
        data['title'] = title_match.group(1).strip() if title_match else ''
        
        # Extract summary  
        summary_match = re.search(r'Summary: (.*?)\nPublished:', document_text, re.DOTALL)
        data['summary'] = summary_match.group(1).strip() if summary_match else ''
        
        # Extract published date
        published_match = re.search(r'Published: (.*?)\n', document_text)
        data['published'] = published_match.group(1).strip() if published_match else ''
        
        # Extract link
        link_match = re.search(r'Link: (.*?)\n', document_text)
        data['link'] = link_match.group(1).strip() if link_match else ''
        
        # Extract MITRE techniques
        techniques_match = re.search(r'Techniques: (.*?)\n', document_text)
        if techniques_match:
            techniques_str = techniques_match.group(1).strip()
            # Split by comma and clean up
            techniques = [t.strip() for t in techniques_str.split(',') if t.strip() and t.strip() != 'N/A']
            data['mitre_techniques'] = techniques
        else:
            data['mitre_techniques'] = []
        
        # Extract MITRE reasoning
        reasoning_match = re.search(r'Reasoning: (.*?)\nConfidence:', document_text, re.DOTALL)
        data['mitre_reasoning'] = reasoning_match.group(1).strip() if reasoning_match else 'N/A'
        
        # Extract confidence
        confidence_match = re.search(r'Confidence: (.*?)\n', document_text)
        data['confidence'] = confidence_match.group(1).strip() if confidence_match else 'N/A'
        
        # Extract original content (everything after "Full Content:")
        content_match = re.search(r'Full Content: (.*?)$', document_text, re.DOTALL)
        data['content'] = content_match.group(1).strip() if content_match else ''
        
        return data

In [20]:
def get_advisories_data() -> List[Dict[str, Any]]:
        """
        Get the processed advisories data from vector store
        """
        try:
            # if not self.index:
            #     self._load_or_create_index()
            
            # Query the vector store to get all advisory documents
            # We'll use a broad query to retrieve all documents
            retriever = index.as_retriever(similarity_top_k=100)
            
            # Use a generic query to get all documents
            nodes = retriever.retrieve("security advisory vulnerability ICS SCADA")
            
            advisories_data = []
            seen_ids = set()
            
            for node in nodes:
                metadata = node.metadata
                print(f"Processing node with data: {node.text}")
                
                # Avoid duplicates
                advisory_id = metadata.get('id')
                if advisory_id in seen_ids:
                    continue
                seen_ids.add(advisory_id)
                
                # Extract ALL data from document text
                extracted_data = _extract_advisory_from_text(node.text)
                
                # Convert metadata MITRE techniques back to list if needed
                metadata_techniques = metadata.get('mitre_techniques', '')
                if isinstance(metadata_techniques, str) and metadata_techniques:
                    metadata_techniques = [t.strip() for t in metadata_techniques.split(',') if t.strip()]
                else:
                    metadata_techniques = []
                
                # Reconstruct advisory data - prefer extracted data over metadata
                advisory_data = {
                    'id': advisory_id,
                    'title': extracted_data.get('title') or metadata.get('title', ''),
                    'summary': extracted_data.get('summary', ''),  # Always from document text
                    'published': extracted_data.get('published') or metadata.get('published', ''),
                    'link': extracted_data.get('link', ''),
                    'content': extracted_data.get('content', ''),
                    'mitre_mapping': {
                        'mapped_techniques': extracted_data.get('mitre_techniques') or metadata_techniques,
                        'confidence': extracted_data.get('confidence') or metadata.get('confidence', 'N/A'),
                        'reasoning': extracted_data.get('mitre_reasoning', 'N/A')
                    }
                }
                advisories_data.append(advisory_data)
            
            # Sort by published date (newest first)
            advisories_data.sort(
                key=lambda x: x.get('published', ''), 
                reverse=True
            )
            
            return advisories_data
            
        except Exception as e:
            print(f"Error getting advisories data from vector store: {e}")
            return []

In [21]:
data = get_advisories_data()




Processing node with data: MITIGATIONS</h2>
<p>FESTO recommends users enable password protection at login in case no password is set at the controller. Please note the password configuration file is not covered by the default FFT backup and restore mechanism. The related file must be selected manually.</p>
<p>For more information see the associated Festo SE security advisory FSA-202406: Several Codesys Gateway v2 vulnerabilities in Codesys provided by Festo <a href="https://media.festo.com/assets/attachment-files/1c01ee9debea5f6ad8ae2c1d5d966b3a100d164c2223f27589afcebb36a9631fbb278d40dfaadb7f6eaa886575a00917709edf1ac848b114918b97886fed2f5d/fsa-202406_Several_Codesys_Gateway_v2_vulnerabilities_in_Codesys_provided_by_Festo.pdf" target="_blank">PDF</a> or <a href="https://certvde.com/en/advisories/VDE-2024-059" target="_blank">VDE-2024-059: Several Codesys Gateway v2 vulnerabilities in Codesys provided by Festo</a>.</p>
<p>CISA recommends users take defensive measures to minimize the risk

In [26]:
for advisory in data:
    print(advisory)

{'id': '/node/23548', 'title': 'MICROSENS NMP Web+', 'summary': '', 'published': 'Tue, 24 Jun 25 12:00:00 +0000', 'link': '', 'content': '', 'mitre_mapping': {'mapped_techniques': [], 'confidence': 'N/A', 'reasoning': 'N/A'}}
{'id': '/node/23576', 'title': 'FESTO CODESYS', 'summary': '', 'published': 'Tue, 01 Jul 25 12:00:00 +0000', 'link': '', 'content': '', 'mitre_mapping': {'mapped_techniques': [], 'confidence': 'N/A', 'reasoning': 'N/A'}}
{'id': '/node/23579', 'title': 'Hitachi Energy Relion 670/650 and SAM600-IO Series', 'summary': '', 'published': 'Tue, 01 Jul 25 12:00:00 +0000', 'link': '', 'content': '', 'mitre_mapping': {'mapped_techniques': [], 'confidence': 'N/A', 'reasoning': 'N/A'}}
{'id': '/node/23580', 'title': 'Hitachi Energy MSM', 'summary': '', 'published': 'Tue, 01 Jul 25 12:00:00 +0000', 'link': 'https://www.cisa.gov/news-events/ics-advisories/icsa-25-182-07', 'content': '<p><a href="https://github.com/cisagov/CSAF" target="_blank"><strong>View CSAF</strong></a></p>

In [33]:
for advisory in advisories:
    print(advisory)

{'id': '/node/23580', 'title': 'Hitachi Energy MSM', 'summary': '<p><a href="https://github.com/cisagov/CSAF" target="_blank"><strong>View CSAF</strong></a></p>\n<h2 id="1-executive-summary">1. EXECUTIVE SUMMARY</h2>\n<ul>\n<li><strong>CVSS v4 5.3</strong></li>\n<li><strong>ATTENTION</strong>: Exploitable remotely/low attack complexity</li>\n<li><strong>Vendor</strong>: Hitachi Energy</li>\n<li><strong>Equipment</strong>: Modular Switchgear Monitoring (MSM)</li>\n<li><strong>Vulnerability</strong>: Improper Neutralization of Input During Web Page Generation (\'Cross-site Scripting\')</li>\n</ul>\n<h2 id="2-risk-evaluation">2. RISK EVALUATION</h2>\n<p>Successful exploitation of this vulnerability could allow attackers to execute untrusted code, potentially leading to unauthorized actions or system compromise.</p>\n<h2 id="3-technical-details">3. TECHNICAL DETAILS</h2>\n<h3 id="31-affected-products">3.1 AFFECTED PRODUCTS</h3>\n<p>Hitachi Energy reports the following products are affected

In [None]:
processed_advisories = []
# Process each advisory to map to MITRE ATT&CK techniques
for advisory in advisories:
    mitre_mapping = map_to_mitre_attack(
        advisory['content'], 
        mitre_embeddings,
        )
    print(advisory)
    advisory_data = advisory.copy()
    advisory_data['mitre_mapping'] = mitre_mapping
    processed_advisories.append(advisory_data)
    
metadata_path = os.path.join(INDEX_PERSIST_PATH, "advisories_metadata.json")
os.makedirs(INDEX_PERSIST_PATH, exist_ok=True)   
with open(metadata_path, 'w') as f:
    json.dump(self.advisories_data, f, indent=2)