In [None]:
!pip install -U langchain-ollama
!pip install pandas

## Create a list of unique scenarios

In [None]:
import pandas as pd

scenarios_file = "scenarios.csv"
unique_scenarios_file = "scenarios_unique.csv"

def select_unique_scenarios(input, output):
    """Selecting unique scenarios for creating prompt"""

    df = pd.read_csv(input, delimiter=";")
    df['Scenario ID'] = df['Scenario ID'].str.strip()
    df['User'] = df['User'].str.strip()

    unique = df.drop_duplicates(subset=['Scenario ID'], keep='first')[
        ['Scenario ID', 'User']]
   
    unique.to_csv(output, index=False, sep=";")

    print(f"Unique scenarios saved to {output}")


select_unique_scenarios(scenarios_file, unique_scenarios_file)

In [None]:
# Load input and output files
original_df = pd.read_csv(scenarios_file, delimiter=";")
result_df = pd.read_csv(unique_scenarios_file, delimiter=";")

# Clean input just like in the function
original_df['Scenario ID'] = original_df['Scenario ID'].str.strip()
original_df['User'] = original_df['User'].str.strip()

# Build expected DataFrame
expected_df = original_df.drop_duplicates(subset=['Scenario ID'], keep='first')[['Scenario ID', 'User']]

# Strip result again just in case
result_df['Scenario ID'] = result_df['Scenario ID'].str.strip()
result_df['User'] = result_df['User'].str.strip()

# Compare
try:
    pd.testing.assert_frame_equal(result_df.reset_index(drop=True), expected_df.reset_index(drop=True))
    print("✅ Test passed: Output matches expected unique scenarios.")
except AssertionError as e:
    print("❌ Test failed:")
    print(e)

## Generate prompts

In [None]:
import pandas as pd

def generate_llm_prompt(scenario_id, scenarios_df, threats_df, vulnerabilities_df):
    """
    Generates a formatted LLM prompt based on the given scenario, threats, and vulnerabilities.
    
    Args:
        scenario_id (str): The scenario ID to pull the correct description.
        scenarios_df (pd.DataFrame): DataFrame containing scenarios data.
        threats_df (pd.DataFrame): DataFrame containing threat data.
        vulnerabilities_df (pd.DataFrame): DataFrame containing vulnerability data.
        
    Returns:
        str: The LLM prompt formatted as a string.
    """
    scenario_row = scenarios_df[scenarios_df['Scenario ID'] == scenario_id]
    if not scenario_row.empty:
        scenario_description = scenario_row['User'].iloc[0] 
    else:
        print(f"No scenario found with ID: {scenario_id}")
    
    risk_description = scenario_row['Assistant - Risk description'] if 'Assistant - Risk description' in scenario_row else "No risk description available."
    vulnerability_description = scenario_row['Assistant - Vulnerability description'] if 'Assistant - Vulnerability description' in scenario_row else "No vulnerability description available."
    
    threats_text = "\n\n".join([
    f"ThreatID: {row['THREAT ID']}\n"
    f"ThreatName: {row['THREAT']}\n"
    f"ThreatDescription: {row['DESCRIPTION']}\n"
    "---------------------------------"  
    for _, row in threats_df.iterrows()
])

    vulnerabilities_text = "\n\n".join([
    f"VulnID: {row['ID']}\n"
    f"VulnerabilityName: {row['VULNERABILITY']}\n"
    f"VulnDescription: {row['DESCRIPTION']}\n"
    "---------------------------------"  
    for _, row in vulnerabilities_df.iterrows()
])

    prompt = f"""
    
    ROLE: You are an assistant specializing in security risk analysis. Your task is to assess the provided scenario and identify any potential threats from the given list.
    Do not generate new threats. Use values that I provide

### **Instructions:**

1. **Read the scenario carefully.** This is the only scenario you need to evaluate. Do not invent any details or make assumptions. Always respond in English.

    - ScenarioID: {scenario_id}
    - Scenario description: "{scenario_description}"

2. **Identify all the applicable threats.** Review the scenario and compare it against the "Threats List" below. If a security threat is present, please explain what the security threat is. Only choose threats that are exact matches from the list. Do not select any threats that are not in the provided list.
    - **Threats List:**
    {threats_text}

    **DO NOT create new threats or mention anything outside of the provided Threats List.** If no threats apply, leave the "Threats" array empty.

3. **Create a new JSON object for every threat you find.** For each identified threat, create a JSON object with the following format:

    {{
        "ScenarioID": "{scenario_id}",
        "Scenario": "{scenario_description}",
        "Threats": [
            {{
                "ThreatID": "<THREAT_ID>",
                "ThreatName": "<THREAT_NAME>",
                "ThreatDescription": "<THREAT_DESCRIPTION>"
            }}
        ]
    }}

    **For Example**
    {{
        "ScenarioID": "{scenario_id}",
        "Scenario": "{scenario_description}",
        "Threats": [
            {{
                "ThreatID": "T2"
                "ThreatName": "Power supply"
                "ThreatDescription": "Power failure to devices which may cause loss or corruption of processed data"
            }}
        ]
    }}


    - Ensure that Threats matches list i provided.

    ### **Important Notes:**
    - **Do not** generate new threats. Use only the values provided in the "Threats List".
    - Ensure that the JSON output is valid and correctly formatted.
    """
    
    return prompt


def generate_prompts_for_all_scenarios(scenarios_df, threats_df, vulnerabilities_df):
    """
    Generate LLM prompts for all scenarios in the CSV.

    Args:
        scenarios_df (pd.DataFrame): DataFrame containing scenarios.
        threats_df (pd.DataFrame): DataFrame containing threats.
        vulnerabilities_df (pd.DataFrame): DataFrame containing vulnerabilities.

    Returns:
        list: A list of prompts for each scenario.
    """
    prompts = []
    for scenario_id in scenarios_df['Scenario ID']:
        prompt = generate_llm_prompt(scenario_id, scenarios_df, threats_df, vulnerabilities_df)
        prompts.append(prompt)
    
    return prompts





scenarios_df = pd.read_csv(unique_scenarios_file, delimiter=';')
threats_df = pd.read_csv('threat.csv', delimiter=';')
vulnerabilities_df = pd.read_csv('vulnerability.csv', delimiter=';')

scenarios_df.columns = scenarios_df.columns.str.strip()
threats_df.columns = threats_df.columns.str.strip()
vulnerabilities_df.columns = vulnerabilities_df.columns.str.strip()

prompts = generate_prompts_for_all_scenarios(scenarios_df, threats_df, vulnerabilities_df)

print(prompts[1])  # Example: Print the first prompt


## Generate answers

In [None]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_ollama.llms import OllamaLLM
import csv

answer_limit = 193
csv_filename = "answers.csv"

template = """Question: {question}
Answer: Return only json"""
prompt = ChatPromptTemplate.from_template(template)

def generate_answer_for_prompt(question):
    """Generates answer for single prompt"""
    model = OllamaLLM(model="qwen2.5")  
    chain = prompt | model
    answer = chain.invoke({"question": question})
    return answer

def generate_answers_for_all_prompts(prompts, limit, answer_file):
    """Generates answers for all the prompts created earlier"""
    limit = min(limit, len(prompts))

    for i in range(limit):
        question = prompts[i]
        answer = generate_answer_for_prompt(question)  

        with open(answer_file, mode="a", newline="", encoding="utf-8") as file:
            writer = csv.writer(file)
            writer.writerow([answer]) 

        print(f"Answer saved to {answer_file}")


generate_answers_for_all_prompts(prompts, answer_limit, csv_filename)



In [None]:
import csv
import json
import re

input = "answers.csv"
output = "answer_in_csv.csv"

def json_to_csv(input_file, output_file):
    with open(input_file, "r", encoding="utf-8") as file:
        content = file.read().strip()

    json_entries = re.findall(r'```json\n(.*?)\n```', content, re.DOTALL)

    data = []

    for json_str in json_entries:
        try:
            json_str = json_str.replace('""', '"')

            parsed_data = json.loads(json_str)

            scenario_id = parsed_data.get("ScenarioID", "")
            scenario = parsed_data.get("Scenario", "")

            if parsed_data.get("Threats"):
                for threat in parsed_data["Threats"]:
                    data.append([
                        scenario_id,
                        scenario,
                        threat.get("ThreatID", ""),
                        threat.get("ThreatName", ""),
                        threat.get("ThreatDescription", "")
                    ])
            else:
                data.append([scenario_id, scenario, "", "", ""])  

        except json.JSONDecodeError as e:
            print(f"Skipping invalid JSON: {json_str}")
            print(f"Error: {e}")

    with open(output_file, "w", newline="", encoding="utf-8") as file:
        writer = csv.writer(file, delimiter=';')
        writer.writerow(["ScenarioID", "Scenario", "ThreatID", "ThreatName", "ThreatDescription"])
        writer.writerows(data)

    print(f"Formatted CSV file saved as {output_file}")

json_to_csv(input, output)

In [None]:
import tempfile
import os
from io import StringIO

def test_json_to_csv_creates_files_in_current_folder():
    # Paths in current folder
    input_path = "test_input.json.txt"
    output_path = "test_output.csv"

    # Simulated LLM-like input
    json_text = """"```json
{
    ""ScenarioID"": ""S1"",
    ""Scenario"": ""The processing center is located in the basement. A sewer system runs under the building. The walls of the room that houses the processing center are not reinforced."",
    ""Threats"": [
        {
            ""ThreatID"": ""T3"",
            ""ThreatName"": ""Flooding"",
            ""ThreatDescription"": ""Flooding of the rooms where the systems and/or storage media are located""
        }
    ]
}
```"
"```json
{
    ""ScenarioID"": ""S2"",
    ""Scenario"": ""Confidential documents are stored in an archive constantly protected by armed guards, with three levels of biometric protection. The room that houses the archive is reinforced and burglar-proof."",
    ""Threats"": [
    ]
}
```"
"```json"""

    # Write the test input file
    with open(input_path, "w", encoding="utf-8") as f:
        f.write(json_text)

    # Run the function
    json_to_csv(input_path, output_path)

    # Load and check output CSV
    df = pd.read_csv(output_path, delimiter=';')

    assert len(df) == 2
    assert df.loc[0, "ScenarioID"] == "S1"
    assert df.loc[0, "ThreatID"] == "T3"
    assert df.loc[1, "ScenarioID"] == "S2"
    assert df.loc[1, ["ThreatID", "ThreatName", "ThreatDescription"]].isna().all()


    print("✅ Test passed: CSV output correctly extracted threat data and saved to current folder.")

# 🔥 Run the test
test_json_to_csv_creates_files_in_current_folder()

In [None]:
import pandas as pd

def generate_llm_prompt1(scenario_id, scenarios_df, vulnerabilities_df):
    """
    Generates a formatted LLM prompt based on the given scenario, pre-identified threats, and possible vulnerabilities.

    Args:
        scenario_id (str): The scenario ID to pull the correct description.
        scenarios_df (pd.DataFrame): DataFrame containing scenarios and threats.
        vulnerabilities_df (pd.DataFrame): DataFrame containing vulnerability data.

    Returns:
        str: The LLM prompt formatted as a string.
    """
    
    scenario_row = scenarios_df[scenarios_df['ScenarioID'] == scenario_id]
    if not scenario_row.empty:
        scenario_description = scenario_row['Scenario'].iloc[0]
        threat_id = scenario_row['ThreatID'].iloc[0]
        threat_name = scenario_row['ThreatName'].iloc[0]
        threat_description = scenario_row['ThreatDescription'].iloc[0]
    else:
        print(f"No scenario found with ID: {scenario_id}")
        return None 

    
    vulnerabilities_text = "\n\n".join([
        f"VulnID: {row['ID']}\n"
        f"VulnerabilityName: {row['VULNERABILITY']}\n"
        f"VulnDescription: {row['DESCRIPTION']}\n"
        "---------------------------------"
        for _, row in vulnerabilities_df.iterrows()
    ])

    
    prompt = f"""
    ROLE: You are an assistant specializing in security risk analysis. Your task is to assess the provided scenario and identify any applicable **vulnerabilities** from the given list.
    The threats have already been identified.

### **Instructions:**

1. **Read the scenario carefully.** This is the only scenario you need to evaluate. Do not invent any details or make assumptions. Always respond in English.

    - ScenarioID: {scenario_id}
    - Scenario description: "{scenario_description}"

2. **This is the already identified threat for this scenario:**
    ThreatID: {threat_id}
    ThreatName: {threat_name}
    ThreatDescription: {threat_description}

3. **Identify all the applicable vulnerabilities.** Review the scenario and compare it against the "Vulnerabilities List" below. If a vulnerability is present, explain why it applies. Only select vulnerabilities that match exactly from the provided list.
    - **Vulnerabilities List:**
    {vulnerabilities_text}

    **DO NOT create new vulnerabilities or mention anything outside of the provided list.** If no vulnerabilities apply, leave the "Vulnerabilities" array empty.

4. **Create a new JSON object for every identified vulnerability**, using the following format:

    {{
        "ScenarioID": "{scenario_id}",
        "Scenario": "{scenario_description}",    
        "ThreatID": "{threat_id}",
        "ThreatName": "{threat_name}",
        "ThreatDescription": "{threat_description}"
        "Vulnerabilities": [
            {{
                "VulnID": "<VULN_ID>",
                "VulnerabilityName": "<VULNERABILITY_NAME>",
                "VulnDescription": "<VULN_DESCRIPTION>"
            }}
        ]
    }}

    **Example Output**
    {{
        "ScenarioID": "{scenario_id}",
        "Scenario": "{scenario_description}",    
        "ThreatID": "{threat_id}",
        "ThreatName": "{threat_name}",
        "ThreatDescription": "{threat_description}"
        "Vulnerabilities": [
            {{
                "VulnID": "V1",
                "VulnerabilityName": "Communication channels not adequately protected",
                "VulnDescription": "Channels (carrying classified/sensitive corporate data) not subject to security procedures (encryption devices) or not physically inaccessible (e.g. connections with accessible data cables, to which interception tools can be connected)"
            }}
        ]
    }}

    - Ensure that **vulnerabilities match exactly** from the provided list.

### **Important Notes:**
- **Do not** generate new vulnerabilities. Use only the values provided in the "Vulnerabilities List".
- Ensure that the JSON output is valid and correctly formatted.
"""

    return prompt


def generate_prompts_for_all_scenarios1(scenarios_df, vulnerabilities_df):
    """
    Generate LLM prompts for all scenarios.

    Args:
        scenarios_df (pd.DataFrame): DataFrame containing scenarios and threats.
        vulnerabilities_df (pd.DataFrame): DataFrame containing vulnerabilities.

    Returns:
        list: A list of prompts for each scenario.
    """
    prompts = []
    for scenario_id in scenarios_df['ScenarioID']:
        prompt = generate_llm_prompt1(scenario_id, scenarios_df, vulnerabilities_df)
        if prompt:  
            prompts.append(prompt)
    
    return prompts



scenarios_df = pd.read_csv("formatted_answer.csv", delimiter=';')
vulnerabilities_df = pd.read_csv("vulnerability.csv", delimiter=';')


scenarios_df.columns = scenarios_df.columns.str.strip()
vulnerabilities_df.columns = vulnerabilities_df.columns.str.strip()


prompts_vulnerabilties = generate_prompts_for_all_scenarios1(scenarios_df, vulnerabilities_df)


print(prompts_vulnerabilties[0])  


In [None]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_ollama.llms import OllamaLLM
import csv


answer_limit = len(prompts_vulnerabilties)  
csv_filename = "answers1.csv"  

template = """Question: {question}
Answer: Return only JSON"""
prompt = ChatPromptTemplate.from_template(template)

def generate_answer_for_prompt1(question):
    """
    Invokes the LLM with a given question prompt and returns the JSON answer.
    """
    model = OllamaLLM(model="qwen2.5")  
    chain = prompt | model
    answer = chain.invoke({"question": question}) 
    return answer

def generate_answers_for_all_prompts1(prompts, limit, answer_file):
    """
    Iterates through a list of prompts, gets responses from the LLM, and stores them in a CSV file.
    """
    limit = min(limit, len(prompts))

    for i in range(limit):
        question = prompts[i]  
        answer = generate_answer_for_prompt1(question)  

        with open(answer_file, mode="a", newline="", encoding="utf-8") as file:
            writer = csv.writer(file)
            writer.writerow([answer]) 

        print(f"Answer saved to {answer_file}")

generate_answers_for_all_prompts1(prompts_vulnerabilties, answer_limit, csv_filename)


In [None]:
import csv
import json
import re

input = "answers1.csv" 
output = "final_formatted_answers_to_csv.csv"

def json_to_csv2(input_file, output_file):
    """
    Creates a CSV file for each scenarios and its vulnerabilities and threats
    """
    
    with open(input_file, "r", encoding="utf-8") as file:
        content = file.read().strip()

    json_entries = re.findall(r'```json\n(.*?)\n```', content, re.DOTALL)

    data = []

    for json_str in json_entries:
        try:
            json_str = json_str.replace('""', '"')

            parsed_data = json.loads(json_str)

            scenario_id = parsed_data.get("ScenarioID", "")
            scenario_desc = parsed_data.get("Scenario", "")
            threat_id = parsed_data.get("ThreatID", "")
            threat_name = parsed_data.get("ThreatName", "")
            threat_desc = parsed_data.get("ThreatDescription", "")

            if "Vulnerabilities" in parsed_data and parsed_data["Vulnerabilities"]:
                for vuln in parsed_data["Vulnerabilities"]:
                    data.append([
                        scenario_id,
                        scenario_desc,
                        threat_id,
                        threat_name,
                        threat_desc,
                        vuln.get("VulnID", ""),
                        vuln.get("VulnerabilityName", ""),
                        vuln.get("VulnDescription", "")
                    ])
            else:
                data.append([scenario_id, scenario_desc, threat_id, threat_name, threat_desc, "", "", ""])

        except json.JSONDecodeError as e:
            print(f"Skipping invalid JSON: {json_str}")
            print(f"Error: {e}")

    with open(output_file, "w", newline="", encoding="utf-8") as file:
        writer = csv.writer(file, delimiter=';')
        writer.writerow([
            "ScenarioID", "Scenario", "ThreatID", "ThreatName", "ThreatDescription",
            "VulnID", "VulnerabilityName", "VulnDescription"
        ])
        writer.writerows(data)

    print(f"Formatted CSV file saved as {output_file}")

json_to_csv2(input, output)


In [None]:
def test_json_to_csv2_creates_file_with_vulns_and_no_vulns():
    input_path = "test_answers.json.txt"
    output_path = "test_formatted_output.csv"

    json_content = """"```json
{
    ""ScenarioID"": ""S1"",
    ""Scenario"": ""The processing center is located in the basement. A sewer system runs under the building. The walls of the room that houses the processing center are not reinforced."",    
    ""ThreatID"": ""T3"",
    ""ThreatName"": ""Flooding"",
    ""ThreatDescription"": ""Flooding of the rooms where the systems and/or storage media are located"",
    ""Vulnerabilities"": [
        {
            ""VulnID"": ""V26"",
            ""VulnerabilityName"": ""Inadequate fire protection"",
            ""VulnDescription"": ""Lack of a specific fire protection system to safeguard the system and the data contained within it (e.g. fireproof safes and automatic shutdown system)""
        },
        {
            ""VulnID"": ""V27"",
            ""VulnerabilityName"": ""Inadequate flood protection"",
            ""VulnDescription"": ""Lack of a specific anti-flooding system to safeguard the system and the data contained within it (e.g. watertight bulkheads)""
        }
    ]
}
```"""

    with open(input_path, "w", encoding="utf-8") as f:
        f.write(json_content)

    json_to_csv2(input_path, output_path)

    df = pd.read_csv(output_path, delimiter=';')
    assert len(df) == 2
    assert df.loc[0, "ScenarioID"] == "S1"
    assert df.loc[0, "VulnID"] == "V26"
    assert df.loc[1, "VulnID"] == "V27"

    print("✅ Test passed: json_to_csv2 correctly parsed vulnerabilities and empty cases.")

test_json_to_csv2_creates_file_with_vulns_and_no_vulns()