<a href="https://colab.research.google.com/github/rl-cyber/User-Engaged-Network-Diagnosis/blob/main/LLM_enhanced_extraction_for_spec_conflict.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import files
import pandas as pd
import io

# Function to prompt for file upload and return the loaded DataFrame
def upload_and_load_csv(prompt):
    print(prompt)
    uploaded = files.upload()
    if uploaded:
        filename = next(iter(uploaded))
        print(f"Loaded file: {filename}")
        return pd.read_csv(io.BytesIO(uploaded[filename]))
    else:
        print("No file uploaded.")
        return None

# Upload reference and generated captions CSV files
print("Please upload the reference file:")
ref_captions_df = upload_and_load_csv("Upload Reference Captions File:")

Please upload the reference file:
Upload Reference Captions File:


Saving conflict_segments_normalized_extracted.csv to conflict_segments_normalized_extracted.csv
Loaded file: conflict_segments_normalized_extracted.csv


In [None]:
# =============================================
# GPT-Enhanced 3GPP Spec Conflict Field Extractor
# =============================================
# This script loads a CSV of 3GPP spec inconsistencies and uses GPT-4o
# to extract: message, state, and effect fields from free-text explanations
!pip install --upgrade openai

import openai
import pandas as pd
import time
import re

# ---------------------- Setup ----------------------
openai.api_key = "XXXXX"  # <-- Replace with your actual API key

# Load normalized conflict segments with extracted explanation text
input_path = "conflict_segments_normalized_extracted.csv"
df = pd.read_csv(input_path)

# Only enhance entries where fields are UNKNOWN
to_enhance = df[(df['message'] == "UNKNOWN") | (df['state'] == "UNKNOWN")].copy()

# ---------------------- Prompt Template ----------------------

def build_prompt(text):
    return f"""
You are a cellular protocol analyst. Extract key fields from the following 3GPP inconsistency description.

Example 1:
Input: "When the UE receives a DETACH REQUEST while in EMM-REGISTERED, the behavior differs: some clauses say reject if unauthenticated, others say allow."
Output: message=DETACH REQUEST, state=EMM-REGISTERED, effect=Possible unauthorized deregistration

Example 2:
Input: "SERVICE REQUEST may be processed without a valid security context when the UE is idle, depending on clause."
Output: message=SERVICE REQUEST, state=IDLE, effect=May allow unprotected session setup

Now extract:
Input: {text}
Output:
"""

# ---------------------- GPT Call ----------------------

def gpt_extract_fields(text):
    try:
        response = openai.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": build_prompt(text)}],
            temperature=0.3,
            max_tokens=100
        )
        return response.choices[0].message.content
    except Exception as e:
        print("Error:", e)
        return "GPT_ERROR"

# ---------------------- Apply to Dataset ----------------------

messages, states, effects = [], [], []

for i, row in to_enhance.iterrows():
    print(f"Processing conflict_id {row['conflict_id']}...")
    out = gpt_extract_fields(row['conflict_explanation'])
    msg_match = re.search(r'message\s*=\s*([^,\n]+)', out)
    state_match = re.search(r'state\s*=\s*([^,\n]+)', out)
    effect_match = re.search(r'effect\s*=\s*([^,\n]+)', out)

    messages.append(msg_match.group(1).strip() if msg_match else "UNKNOWN")
    states.append(state_match.group(1).strip() if state_match else "UNKNOWN")
    effects.append(effect_match.group(1).strip() if effect_match else "UNKNOWN")

    time.sleep(1.2)  # prevent rate limit

# Add extracted fields to DataFrame
to_enhance['message_gpt'] = messages
to_enhance['state_gpt'] = states
to_enhance['effect_gpt'] = effects

# Merge back into full DataFrame
df = df.merge(to_enhance[['conflict_id', 'message_gpt', 'state_gpt', 'effect_gpt']],
              on='conflict_id', how='left')

# Save enhanced CSV
output_path = "conflict_segments_gpt_enhanced.csv"
df.to_csv(output_path, index=False)
print(f"GPT-enhanced extraction complete. Saved to {output_path}")


Processing conflict_id C001...
Processing conflict_id C002...
Processing conflict_id C004...
Processing conflict_id C005...
Processing conflict_id C006...
Processing conflict_id C007...
Processing conflict_id C008...
Processing conflict_id C009...
Processing conflict_id C010...
Processing conflict_id C011...
Processing conflict_id C012...
Processing conflict_id C013...
Processing conflict_id C014...
Processing conflict_id C015...
Processing conflict_id C017...
Processing conflict_id C019...
Processing conflict_id C020...
Processing conflict_id C021...
Processing conflict_id C023...
Processing conflict_id C024...
Processing conflict_id C025...
Processing conflict_id C026...
Processing conflict_id C027...
Processing conflict_id C029...
Processing conflict_id C031...
Processing conflict_id C033...
Processing conflict_id C034...
Processing conflict_id C035...
Processing conflict_id C036...
Processing conflict_id C037...
Processing conflict_id C038...
Processing conflict_id C039...
Processi

In [None]:
# =============================================
# CVE User Symptom Prediction using GPT-4o
# =============================================


# ========= SETUP OPENAI API FOR GPT-4o =========

!pip install --upgrade openai  # Install OpenAI library if not installed

df = pd.read_csv("/content/cve_dataset_with_descriptions.csv")

import requests
from openai import OpenAI

# Set up your OpenAI API key
api_key = "XXXXXXX"  # Replace with your actual API key
client = OpenAI(api_key=api_key)

# Improved few-shot examples based on real-world bug reports
failure_to_symptoms = {
    "memory corruption": [
        "Users report sudden phone restarts when receiving a call.",
        "Unexpected call drops occur, especially during VoLTE calls.",
        "Device randomly shuts down while connected to mobile data."
    ],
    "baseband crash": [
        "Users experience 'No Service' messages without warning.",
        "Phone gets stuck on 'Emergency Calls Only' mode until restarted.",
        "LTE connection disappears after switching between 4G and 5G."
    ],
    "denial of service": [
        "Calls fail to connect intermittently, even with full signal bars.",
        "Internet speeds slow down dramatically during peak hours.",
        "Users experience delays in receiving SMS/MMS messages."
    ],
    "integer overflow": [
        "Phone battery drains unusually fast even in idle mode.",
        "Device heats up when connected to Wi-Fi and cellular at the same time.",
        "Unstable network behavior reported after an OTA update."
    ],
    "remote code execution": [
        "Suspicious calls appear in call history without user action.",
        "Data usage spikes unexpectedly, possibly due to unauthorized access.",
        "Users report apps opening or closing automatically."
    ],
    "handshake failure": [
        "VoLTE calls fail to establish while regular calls work fine.",
        "VPN connections randomly disconnect over mobile data.",
        "Secure apps fail to authenticate or timeout frequently."
    ],
    "buffer overflow": [
        "Internet connection randomly drops despite strong signal.",
        "Users report delayed message sending or failed SMS transmissions.",
        "Device performance becomes sluggish during heavy network activity."
    ]
}

# Convert few-shot examples into structured format
few_shot_examples = "\n\n".join([
    f"### Example {i+1}\n"
    f"**CVE Description:** {failure}\n"
    f"**Real User Symptoms:** {', '.join(symptoms)}"
    for i, (failure, symptoms) in enumerate(failure_to_symptoms.items())
])

# Function to generate a structured prompt for GPT-4o
def generate_prompt(cve_id, description):
    return f"""
### Role:
You are a **cybersecurity analyst** with expertise in cellular networks and user experience analysis.
Your task is to predict possible **real-world user symptoms** based on a given CVE (Common Vulnerabilities and Exposures).

### Instructions:
1. **Analyze the CVE description** and determine how it impacts a cellular network or mobile device.
2. **Identify affected components** (e.g., cellular modem, baseband firmware, authentication system, OS kernel).
3. **Simulate real-world scenarios** where users might experience the issue.
4. **Categorize symptoms** into **three levels**:
   - **Mild (Minor inconvenience, e.g., slow data speeds)**
   - **Moderate (Disruptive, e.g., call drops, connectivity loss)**
   - **Severe (Critical failure, e.g., device crash, security breach, data loss)**
5. **Provide a structured response** in the format below.

{few_shot_examples}

### **CVE Information:**
- **CVE ID**: {cve_id}
- **CVE Description**: {description}

### **Predicted User-Experienced Symptoms:**
#### **Mild:**
- [List minor symptoms here]
#### **Moderate:**
- [List disruptive symptoms here]
#### **Severe:**
- [List critical failure symptoms here]
"""

# Function to get GPT-4o response
def get_gpt4o_response(prompt):

    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "You are a cybersecurity expert analyzing CVEs for real-world user impact."},
            {"role": "user", "content": prompt}
        ],
        temperature=0.3,
        max_tokens=70
    )

    return response.choices[0].message.content

# ========= STEP 3: PROCESS CVE DESCRIPTIONS USING GPT-4o =========

# Apply GPT-4o to infer user-experienced symptoms
predicted_symptoms = []

for index, row in df.iterrows():
    print(f"Processing {row['cve_id']}...")  # Progress tracking

    prompt = generate_prompt(
        row["cve_id"],
        row["detailed_description"],
        #severity=row.get("cvss_severity", "Unknown"),
        #attack_vector=row.get("attack_vector", "Unknown"),
        #affected_components=row.get("affected_components", "Unknown")
    )

    gpt4o_response = get_gpt4o_response(prompt)
    predicted_symptoms.append(gpt4o_response)

# Add GPT-4o predictions to the dataset
df["predicted_user_symptoms"] = predicted_symptoms

# ========= STEP 4: SAVE & DOWNLOAD ENHANCED DATASET =========

# Save the processed dataset
output_filename = "cve_dataset_with_gpt4o_symptoms.csv"
df.to_csv(output_filename, index=False)

# Download the updated dataset
files.download(output_filename)

print(f"Process complete! Download your enhanced dataset: {output_filename}")


Processing CVE-2016-11028...
Processing CVE-2018-5383...
Processing CVE-2019-20558...
Processing CVE-2019-20572...
Processing CVE-2019-20596...
Processing CVE-2020-10835...
Processing CVE-2020-25054...
Processing CVE-2021-0619...
Processing CVE-2021-0620...
Processing CVE-2021-0621...
Processing CVE-2021-0622...
Processing CVE-2021-0623...
Processing CVE-2021-0624...
Processing CVE-2021-0672...
Processing CVE-2021-0674...
Processing CVE-2021-0675...
Processing CVE-2021-22495...
Processing CVE-2021-25477...
Processing CVE-2021-31889...
Processing CVE-2021-40148...
Processing CVE-2022-20012...
Processing CVE-2022-20015...
Processing CVE-2022-20017...
Processing CVE-2022-20018...
Processing CVE-2022-20019...
Processing CVE-2022-20021...
Processing CVE-2022-20022...
Processing CVE-2022-20023...
Processing CVE-2022-20024...
Processing CVE-2022-20030...
Processing CVE-2022-20031...
Processing CVE-2022-20032...
Processing CVE-2022-20033...
Processing CVE-2022-20034...
Processing CVE-2022-2003

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Process complete! Download your enhanced dataset: cve_dataset_with_gpt4o_symptoms.csv
