## model - DeepSeek-R1-Distill-Qwen-14B-W4A16 model
#### Analysis performed to gain insights into CVE data. This model was chosen as enterprise OpenAI account was not readily available.
#### About 4.6% was found to be code related

In [1]:
import os
import time
import json
import requests
import re
import pandas as pd
from dotenv import load_dotenv
from typing import List, Dict

# Load environment variables
load_dotenv()

# Constants for the self-hosted model
API_URL = "https://deepseek-r1-qwen-14b-w4a16-maas-apicast-production.apps.prod.rhoai.rh-aiservices-bu.com:443"
API_KEY = "bd639419826367f606eabb2e09f22490"  # This should ideally be in your .env file

# NVD API constants
NVD_API_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"
NVD_API_KEY = os.getenv("NVD_API_KEY")
MAX_CVE_PER_BATCH = 3  # Limit number of CVEs per batch to manage token count
MAX_DESCRIPTION_LENGTH = 200  # Limit length of descriptions
MAX_PACKAGES_SHOWN = 3  # Limit number of affected packages shown

In [2]:
# Custom LLM class to replace ChatOpenAI
class SelfHostedLLM:
    def __init__(self, api_url, api_key, model="r1-qwen-14b-w4a16", temperature=0.2):
        self.api_url = api_url
        self.api_key = api_key
        self.model = model
        self.temperature = temperature
    
    def invoke(self, prompt_data):
        prompt_text = prompt_data["cve_details"]
        
        messages = [
            {"role": "system", "content": """You are an expert cybersecurity analyst specializing in vulnerability assessment. 
             Your task is to analyze CVEs and identify which ones are code-related based on specific criteria."""},
            {"role": "user", "content": prompt_text}
        ]
        
        try:
            response = requests.post(
                url=self.api_url + '/v1/chat/completions',
                json={
                    "model": self.model,
                    "messages": messages,
                    "max_tokens": 4000,
                    "temperature": self.temperature
                },
                headers={'Authorization': 'Bearer ' + self.api_key}
            )
            
            if response.status_code == 200:
                completion = response.json()
                return type('obj', (object,), {
                    'content': completion.get('choices', [{}])[0].get('message', {}).get('content', '')
                })
            else:
                print(f"Error: API returned status code {response.status_code}")
                print(f"Response: {response.text}")
                return type('obj', (object,), {'content': f"Error: {response.status_code}"})
        
        except Exception as e:
            print(f"Exception during API call: {e}")
            return type('obj', (object,), {'content': f"Error: {str(e)}"})



In [3]:
def fetch_cves(limit=10000):
    base_url = "https://access.redhat.com/labs/securitydataapi/cve.json"
    cves = []
    page = 1
    per_page = 100  # Number of CVEs per page (max allowed by the API)

    while len(cves) < limit:
        url = f"{base_url}?page={page}&per_page={per_page}"
        response = requests.get(url)
        
        if response.status_code != 200:
            print(f"Failed to fetch data: {response.status_code}")
            break
        
        data = response.json()
        if not data:
            break  # No more data available
        
        cves.extend(data)
        page += 1

        print(f"Fetched {len(cves)} CVEs so far...")

        if len(data) < per_page:
            break  # No more pages available

    return cves[:limit]

In [4]:
def extract_year_from_cve(cve_id):
    """
    Extracts the year from a CVE identifier.

    :param cve_id: A string representing the CVE ID (e.g., "CVE-2022-49043").
    :return: The year as an integer (e.g., 2022).
    """
    # Split the CVE ID by '-' and get the second part (the year)
    try:
        year = int(cve_id.split('-')[1])
        return year
    except (IndexError, ValueError) as e:
        print(f"Error extracting year from CVE ID: {e}")
        return None



In [5]:
def fetch_cve_details(cve_ids: List[str]) -> List[Dict]:
    """
    Fetch CVE details from the NVD API for a given list of CVE IDs using an API key.
    """
    cve_details = []

    for cve_id in cve_ids:
        try:
            # Make the API request with the API key
            headers = {"apiKey": NVD_API_KEY} if NVD_API_KEY else {}
            response = requests.get(f"{NVD_API_URL}?cveId={cve_id}", headers=headers)
            response.raise_for_status()  # Raise an exception for HTTP errors

            # Parse the response
            data = response.json()
            vulnerabilities = data.get("vulnerabilities", [])

            if not vulnerabilities:
                print(f"No data found for CVE: {cve_id}")
                continue

            # Extract relevant details
            for vuln in vulnerabilities:
                cve_data = vuln.get("cve", {})
                
                # Get the description
                descriptions = cve_data.get("descriptions", [])
                description = next((desc["value"] for desc in descriptions if desc.get("lang") == "en"), 
                                  "No description available")
                
                # Get metrics information including CVSS score
                metrics = cve_data.get("metrics", {})
                severity_level = "Unknown"
                cvss_score = "Unknown"
                
                # Try to extract CVSS metrics in order of preference
                if "cvssMetricV31" in metrics:
                    cvss_data = metrics["cvssMetricV31"][0]["cvssData"]
                    severity_level = cvss_data.get("baseSeverity", "Unknown")
                    cvss_score = str(cvss_data.get("baseScore", "Unknown"))
                elif "cvssMetricV30" in metrics:
                    cvss_data = metrics["cvssMetricV30"][0]["cvssData"]
                    severity_level = cvss_data.get("baseSeverity", "Unknown")
                    cvss_score = str(cvss_data.get("baseScore", "Unknown"))
                elif "cvssMetricV2" in metrics:
                    cvss_data = metrics["cvssMetricV2"][0]["cvssData"]
                    severity_level = cvss_data.get("baseSeverity", "Unknown")
                    cvss_score = str(cvss_data.get("baseScore", "Unknown"))
                
                # Get affected packages and potential attack vectors
                affected_packages = []
                
                configurations = cve_data.get("configurations", [])
                for config in configurations:
                    nodes = config.get("nodes", [])
                    for node in nodes:
                        cpe_matches = node.get("cpeMatch", [])
                        for cpe_match in cpe_matches:
                            affected_packages.append(cpe_match.get("criteria", "Unknown"))
                            
                # Get weakness information that might indicate vulnerability type
                weakness_info = []
                weaknesses = cve_data.get("weaknesses", [])
                for weakness in weaknesses:
                    for desc in weakness.get("description", []):
                        weakness_info.append(desc.get("value", ""))
                
                cve_details.append({
                    "cve_id": cve_id,
                    "description": description,
                    "severity": {
                        "level": severity_level,
                        "score": cvss_score
                    },
                    "affected_packages": affected_packages,
                    "weakness_info": weakness_info
                })

        except requests.exceptions.RequestException as e:
            print(f"Error fetching data for CVE {cve_id}: {e}")

    return cve_details

In [6]:
def truncate_text(text, max_length):
    """Truncate text to a maximum length and add ellipsis if truncated."""
    if len(text) <= max_length:
        return text
    return text[:max_length] + "..."

In [7]:
def format_cve_details_for_llm(cve_details: List[Dict]) -> str:
    """
    Format CVE details in a token-efficient way for LLM analysis.
    """
    formatted_details = []
    
    for cve in cve_details:
        # Truncate description to control token count
        description = truncate_text(cve['description'], MAX_DESCRIPTION_LENGTH)
        
        cve_str = f"CVE ID: {cve['cve_id']}\n"
        cve_str += f"Description: {description}\n"
        cve_str += f"Severity: {cve['severity']['level']} (Score: {cve['severity']['score']})\n"
        
        # Format affected packages more efficiently
        if cve['affected_packages']:
            # Only include a limited number of packages
            packages = cve['affected_packages'][:MAX_PACKAGES_SHOWN]
            # Extract key components from CPE strings to save tokens
            simplified_packages = []
            for pkg in packages:
                # Extract product and vendor from CPE string if possible
                match = re.search(r':([^:]+):([^:]+):', pkg)
                if match:
                    vendor, product = match.groups()
                    simplified_packages.append(f"{vendor}/{product}")
                else:
                    # Use the last part of the CPE if parsing fails
                    parts = pkg.split(':')
                    simplified_packages.append(parts[-1] if len(parts) > 2 else pkg)
            
            cve_str += "Affected Components: " + ", ".join(simplified_packages)
            if len(cve['affected_packages']) > MAX_PACKAGES_SHOWN:
                cve_str += f" (+{len(cve['affected_packages']) - MAX_PACKAGES_SHOWN} more)"
            cve_str += "\n"
        
        # Include only the most relevant weakness info
        if cve['weakness_info']:
            # Just use the first weakness to save tokens
            cve_str += f"Weakness: {truncate_text(cve['weakness_info'][0], 100)}\n"
                
        formatted_details.append(cve_str)
    prompt_template = """
You are an expert cybersecurity analyst specializing in vulnerability assessment. Your task is to analyze a list of CVE (Common Vulnerabilities and Exposures) identifiers and identify which ones are specifically code-related.

## What Makes a CVE Code-Related
A code-related CVE involves vulnerabilities directly attributable to software implementation issues in specific programming languages rather than configuration, physical security, or policy problems. A vulnerability is ONLY considered code-related if it explicitly involves one of these languages:

1. Go
2. Python 
3. Dockerfile
4. Java
5. TypeScript
6. JavaScript

If the vulnerability is in any other language or technology, it should NOT be classified as code-related, even if it matches the categories listed below.

The following categories are only considered code-related if they specifically involve one of the six languages listed above:

1. Remote Code Execution (RCE)
2. Code/Command Injection
3. SQL Injection
4. Cross-Site Scripting (XSS)
5. Deserialization vulnerabilities
6. Buffer Overflows
7. Format String vulnerabilities
8. Memory corruption issues
9. Path traversal that could lead to code execution
10. Arbitrary file inclusion
11. Logic flaws in application code
12. Implementation flaws in libraries, frameworks, or programming languages
13. Race conditions affecting code execution
14. Improper cryptographic implementations
15. Client-side/server-side request forgery
16. Coding errors leading to memory corruption
17. Input validation failures
18. Insecure API implementations or calls
19. Authentication/authorization bypass due to code flaws
20. Use-after-free vulnerabilities
21. Integer overflow/underflow
22. Double-free vulnerabilities
23. Null pointer dereference
24. Uninitialized memory usage
25. Out-of-bounds read/write
26. DOM-based XSS
27. Prototype pollution
28. XML External Entity (XXE) processing
29. Cross-Site Request Forgery (CSRF)
30. HTTP request smuggling

## CVE Details to Analyze:
"""
    prompt_template += "\n" + "-" * 20 + "\n".join(formatted_details)
    
    prompt_template += """

## Response Format:
Return a JSON array of objects with the following format:
{
  "cve_id": "The CVE identifier",
  "code_related": true/false,
  "exclusion_reason": "If not code-related, specify the reason or indicate that it doesn't involve one of the six specified languages. Leave blank if code-related."
}

## Exclusion Criteria (if not code-related):
1. Not in specified languages (Go, Python, Dockerfile, Java, TypeScript, JavaScript)
2. Configuration errors only
3. Physical security issues
4. Social engineering without code exploits
5. Default credentials
6. Policy/procedure violations
7. Expired certificates
8. Denial of Service that doesn't involve code manipulation
9. Hardware-based issues with no software component
10. Documentation errors or insufficient warnings
"""
    
    return prompt_template


In [8]:
def chunk_cve_ids(cve_ids, batch_size=MAX_CVE_PER_BATCH):
    """Split CVE IDs into smaller batches for processing."""
    for i in range(0, len(cve_ids), batch_size):
        yield cve_ids[i:i + batch_size]

def analyze_code_related_cves(cve_ids: List[str]) -> Dict:
    """
    Analyze CVEs in batches to manage token limits.
    """
    all_results = []
    all_cve_statuses = []  # List to store status of all CVEs for Excel
    total_analyzed = 0
    
    # Initialize the LLM with the self-hosted model
    llm = SelfHostedLLM(api_url=API_URL, api_key=API_KEY)
    
    # Process CVEs in batches to manage token limits
    batches = list(chunk_cve_ids(cve_ids))
    print(f"Processing {len(cve_ids)} CVEs in {len(batches)} batches...")
    
    for i, batch in enumerate(batches):
        print(f"\nProcessing batch {i+1}/{len(batches)} ({len(batch)} CVEs)...")
        
        # Fetch details for this batch
        cve_details = fetch_cve_details(batch)
        total_analyzed += len(cve_details)
        
        if not cve_details:
            print("No details found for this batch")
            continue
        
        # Format the CVE details for the LLM
        formatted_details = format_cve_details_for_llm(cve_details)

        # **Debug Statement: Print Input to LLM**
        print("\n=== Input to LLM ===")
        print(formatted_details[:500] + "..." if len(formatted_details) > 500 else formatted_details)
        print("====================\n")
        
        # Invoke the LLM with the CVE details
        print(f"Analyzing batch {i+1}...")
        result = llm.invoke({"cve_details": formatted_details}).content

        # **Debug Statement: Print Raw LLM Output**
        print("\n=== Raw LLM Output ===")
        print(result[:500] + "..." if len(result) > 500 else result)
        print("======================\n")
        
        # Try to parse the result as JSON
        try:
            # First, try to find JSON in the response
            json_match = re.search(r'\[[^\[\]]*(\\[[^\[\]]*\\][^\[\]]*)*\]', result)
            if json_match:
                parsed_result = json.loads(json_match.group(0))
            else:
                # Try looking for JSON array pattern
                json_match = re.search(r'\[\s*\{.*?\}\s*\]', result, re.DOTALL)
                if json_match:
                    parsed_result = json.loads(json_match.group(0))
                else:
                    # Last resort: try to parse the whole response
                    parsed_result = json.loads(result)
                
            # Store all CVE results for Excel
            all_cve_statuses.extend(parsed_result)
            
            # Store code-related CVEs for summary output
            code_related = [cve for cve in parsed_result if isinstance(cve, dict) and cve.get('code_related') is True]
            all_results.extend(code_related)
            
            print(f"Found {len(code_related)} code-related CVEs in this batch")
            
        except json.JSONDecodeError:
            print("Warning: Could not parse result as JSON.")
            print("Raw output (first 100 chars):", result[:100] + "...")
            # Add entries with error status for the current batch
            for cve_id in batch:
                all_cve_statuses.append({
                    "cve_id": cve_id,
                    "code_related": "Error",
                    "exclusion_reason": "Failed to parse LLM response"
                })
    
    # Fix if the item is not a dict
    # Before calling write_to_excel, add a validation step:
    validated_statuses = []
    for item in all_cve_statuses:
        if isinstance(item, dict):
            validated_statuses.append(item)
        else:
            # Log the problematic item
            print(f"Warning: Found non-dictionary item in all_cve_statuses: {item}")
            # You could also create a proper dictionary for this item
            validated_statuses.append({
                "cve_id": "unknown",
                "code_related": "Error",
                "exclusion_reason": f"Invalid data type: {type(item)}"
            })
    # Write results to Excel
    write_to_excel(validated_statuses)
    
    return {
        "code_related_cves": all_results,
        "total_analyzed": total_analyzed,
        "total_code_related": len(all_results)
    }


In [9]:
def write_to_excel(cve_statuses: List[Dict]):
    """
    Write CVE analysis results to an Excel file.
    """
    # Create DataFrame
    df = pd.DataFrame(cve_statuses)
    
    # Ensure column order and handle missing columns
    columns = ["cve_id", "code_related", "exclusion_reason"]
    for col in columns:
        if col not in df.columns:
            df[col] = ""
    
    # Reorder columns
    df = df[columns]
    
    # Fill empty exclusion reasons for code-related CVEs with "-"
    df.loc[df["code_related"] == True, "exclusion_reason"] = "-"
    
    # Write to Excel
    file_name = "cve_analysis_results_r1_qwen.xlsx"
    df.to_excel(file_name, index=False, sheet_name="CVE Analysis")
    
    print(f"\nResults written to '{file_name}'")
    print(f"Total CVEs processed: {len(df)}")
    print(f"Code-related CVEs: {sum(df['code_related'] == True)}")
    print(f"Non-code-related CVEs: {sum(df['code_related'] == False)}")
    print(f"Error processing: {sum(df['code_related'] == 'Error')}")


In [10]:
# Fetch CVEs
print("Fetching CVEs from Red Hat Security Data API...")
cves = fetch_cves(limit=2000)  # Start with a small number for testing

# Extract CVE IDs
final_cves = [cve['CVE'] for cve in cves]
print(f"Extracted {len(final_cves)} CVE IDs")

# Analyze code-related CVEs
print("Starting analysis of code-related CVEs...")
analysis_results = analyze_code_related_cves(final_cves)

# Print results
print("\n----- ANALYSIS RESULTS -----")
print(f"Total CVEs analyzed: {analysis_results.get('total_analyzed', 0)}")

if 'total_code_related' in analysis_results:
    print(f"Code-related CVEs found: {analysis_results['total_code_related']}")
    
    # Print each code-related CVE in a readable format
    for i, cve in enumerate(analysis_results['code_related_cves'], 1):
        print(f"\n{i}. {cve['cve_id']}")
        print(f"   Code-related: {cve.get('code_related', True)}")
        # Only print other fields if they exist
        if 'vulnerability_type' in cve:
            print(f"   Vulnerability Type: {cve.get('vulnerability_type', 'Unknown')}")
        if 'affected_component' in cve:
            print(f"   Affected Component: {cve.get('affected_component', 'Unknown')}")
        if 'requires_code_change' in cve:
            print(f"   Requires Code Change: {cve.get('requires_code_change', True)}")
        if 'severity' in cve:
            print(f"   Severity: {cve.get('severity', {}).get('level', 'Unknown')} ({cve.get('severity', {}).get('score', 'Unknown')})")
else:
    print("No structured results available or parsing error occurred.")


Fetching CVEs from Red Hat Security Data API...
Fetched 100 CVEs so far...
Fetched 200 CVEs so far...
Fetched 300 CVEs so far...
Fetched 400 CVEs so far...
Fetched 500 CVEs so far...
Fetched 600 CVEs so far...
Fetched 700 CVEs so far...
Fetched 800 CVEs so far...
Fetched 900 CVEs so far...
Fetched 1000 CVEs so far...
Fetched 1100 CVEs so far...
Fetched 1200 CVEs so far...
Fetched 1300 CVEs so far...
Fetched 1400 CVEs so far...
Fetched 1500 CVEs so far...
Fetched 1600 CVEs so far...
Fetched 1700 CVEs so far...
Fetched 1800 CVEs so far...
Fetched 1900 CVEs so far...
Fetched 2000 CVEs so far...
Extracted 2000 CVE IDs
Starting analysis of code-related CVEs...
Processing 2000 CVEs in 667 batches...

Processing batch 1/667 (3 CVEs)...
No data found for CVE: CVE-2024-36347

=== Input to LLM ===

You are an expert cybersecurity analyst specializing in vulnerability assessment. Your task is to analyze a list of CVE (Common Vulnerabilities and Exposures) identifiers and identify which ones are s

  json_match = re.search(r'\[[^\[\]]*(\\[[^\[\]]*\\][^\[\]]*)*\]', result)



=== Input to LLM ===

You are an expert cybersecurity analyst specializing in vulnerability assessment. Your task is to analyze a list of CVE (Common Vulnerabilities and Exposures) identifiers and identify which ones are specifically code-related.

## What Makes a CVE Code-Related
A code-related CVE involves vulnerabilities directly attributable to software implementation issues in specific programming languages rather than configuration, physical security, or policy problems. A vulnerability is ONLY considered code...

Analyzing batch 2...

=== Raw LLM Output ===
<think>
Alright, I need to analyze these three CVEs to determine if they're code-related based on the given criteria. Let's go through each one step by step.

Starting with CVE-2025-1943. The description mentions memory safety bugs in Firefox and Thunderbird, leading to memory corruption. The weakness is CWE-122, which is about improper handling of memory, often leading to buffer overflows or other memory issues. Firefox is 