## Libraries

In [None]:
!pip install langchain langchain_core langchain_community langchain-huggingface torch accelerate bitsandbytes docarray unstructured jq openpyxl matplotlib numpy pandas

## Environment Variables and Constants

Set the API keys and environment variables required for running the app

In [None]:
import os
os.environ["HUGGINGFACEHUB_API_TOKEN"] = ""
os.environ['LANGCHAIN_API_KEY'] = ""
os.environ["OPENAI_API_KEY"] = ""

# this defines the location where the models will be downloaded
# it is suggested to set this to a location with sufficient storage
# space. Not specifying it will default to `${HOME}/.cache/hugging_face`
os.environ["HF_HUB_CACHE"] = ""
os.environ["HF_HOME"] = ""

TEMPLATE = """
You are an assistant in security risk analysis.
You will be provided with risk scenarios that have certain threats and vulnerabilities. For the threats you will also be provided with possible counter measures.
You will be provided with a user scenario and based on that you will be provided with context of related scenarios from you retrieval vector store.
You will also be provided with possible countermeasures for all the retrieved scenarios.
You need to suggest the appropriate countermeasures for the user scenario and give reasoning as to why it is appropriate. There could be multiple appropriate countermeasures so you need to provide a reasoning for each individually.
Answer the question based only on the following context. If the question does not relate with the context, just reply 'I don't know'

User: {user}

Scenarios: {scenarios}

Countermeasures: {countermeasures}

{format_instructions}
"""


## Requirements

Install the requirements and import relevant modules

In [None]:
import csv
import json
import os
import re
from collections import Counter
from json import JSONDecodeError
from typing import Any, List

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
from langchain_community.document_loaders import (JSONLoader,
                                                  UnstructuredExcelLoader)
from langchain_community.vectorstores import DocArrayInMemorySearch
from langchain_core.documents.base import Document
from langchain_core.exceptions import OutputParserException
from langchain_core.output_parsers import JsonOutputParser, StrOutputParser
from langchain_core.outputs import Generation
from langchain_core.prompts import ChatPromptTemplate, PromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables import RunnablePassthrough
from langchain_core.runnables.base import RunnableLambda, RunnableSequence
from langchain_huggingface import (ChatHuggingFace, HuggingFaceEmbeddings,
                                   HuggingFacePipeline)
from pandas import DataFrame
from transformers import BitsAndBytesConfig


## Data Loaders

Methods used for loading the data

In [None]:
def load_remediations_file(file_path: str) -> DataFrame:
    """
    Loads the remediations table into a pandas dataframe.

    Args:
        * file_path (str): The path to the remidations excel file

    Returns:
        A pandas `DataFrame` object with the remediations data
    """
    
    # the row to use as column names
    header=2

    # for multiindex i.e the file has merged cells
    index_col=[0,1,2,3,4]

    # renamming the columns
    cols=[
        "threat_id",
        "threat_desc",
        "vuln_id",
        "vuln_desc",
        "vthe",
        "countermeasure_id",
        "countermeasure_desc",
        "tech_nature"
    ]

    df = pd.read_excel(
        file_path,
        header=header,
        index_col=index_col
    )

    # resetting the index as some cells are merged and 
    # result in `NaN` values
    df = df.reset_index()

    # renamming the columns
    df.columns = cols

    df = fill_remediation_na(df)

    return df

def load_scenarios_file(file_path: str) -> DataFrame:
    """
    Loads the scenarios table into a pandas dataframe.

    Args:
        * file_path (str): The path to the scenarios excel file

    Returns:
        A pandas `DataFrame` object with the scenarios data
    """

    # renamming the columns
    cols = [
        "scenario_id",
        "scenario_desc",
        "extended",
        "short",
        "details",
        "risk_id",
        "risk_desc",
        "vuln_id",
        "vuln_desc",
        "risk_occur_type"
    ]

    df = pd.read_excel(file_path)

    # renamming the columns
    df.columns = cols

    return df

def fill_remediation_na(df: DataFrame) -> DataFrame:
    """
    Fills the `NaN` values in the remediations data frame.
    The remediation data loaded into a `DataFrame` has 
    some `threat_id` values that are not filled and are `NaN`. This method attempts to fix it
    by:
        * Identifying the the `NaN` `index`
        * determining the value at `index-1`
        * filling subsequent indices with value

    Args:
        df (DataFrame): `DataFrame` with the remediations data

    Returns:
        `DataFrame` with `NaN` values replaced by appropriate threat ids
    """
    
    nan_indices = df[df["threat_id"].isna()].index
    start = nan_indices[0]
    val = df.loc[start-1]["threat_id"]

    for i in range(0, len(nan_indices)):
        if nan_indices[i] != nan_indices[i-1]+1:
            # stop filling and change start index
            val = df.loc[nan_indices[i]-1]["threat_id"]
        # otherwise keep filling values
        df.loc[nan_indices[i], "threat_id"] = val

    return df

## Data Formatting

Methods used for formatting and converting the data

In [None]:
class RemediationData(BaseModel):
    scenario_id: str = Field(description="The scenario ID")
    threat_id: str = Field(description="The threat id")
    vuln_id: str = Field(description="The vulnerability ID")
    remediation_ids: list = Field(description="The ID's of the most appropriate countermeasures.")
    classification_desc: str = Field(description="The description of classification")
    reasoning: list[str] = Field(description="Your reasoning behind your suggestion of each countermeasure.")

class CustomJsonOutputParser(JsonOutputParser):

    def parse_result(self, result: List[Generation], *, partial: bool = False) -> Any:
        """Parse the result of an LLM call to a JSON object.

        Args:
            result: The result of the LLM call.
            partial: Whether to parse partial JSON objects.
                If True, the output will be a JSON object containing
                all the keys that have been returned so far.
                If False, the output will be the full JSON object.
                Default is False.

        Returns:
            The parsed JSON object.

        Raises:
            OutputParserException: If the output is not valid JSON.
        """

        text = result[0].text
        text = text.strip()

        try:
            # extracting the model thought and output
            match = re.search(r"<Thought>\s*(.*?)\s*</Thought>.*?<Output>\s*(\{.*?\})\s*</Output>", text, re.DOTALL)
        
            group_1 = match.group(1)
            group_2 = match.group(2)
        
            output = json.loads(group_2)
            output.update({"thought": group_1})
            return output
            
        except JSONDecodeError as e:
            msg = f"Invalid json output: {text}"
            try:
                raise OutputParserException(msg, llm_output=text) from e
            except OutputParserException as e:
                print(e)
        except:
            print("Error Occured")

def convert_df_to_dict(df: DataFrame, save_path=None) -> dict:
    """
    Converts a pandas `DataFrame` to python dictionary format

    Args:
        * df (DataFrame): A panads `DataFrame` object with required data
        * save_path (str): path if want to save the json (dict) data in a file
    Returns:
        data in `dict` format
    """

    json_data = df.to_json(orient="records")
    dict_data = json.loads(json_data)

    if save_path:
        with open(save_path, 'w') as f:
            json.dump(dict_data, f)
            
    return dict_data

def convert_llm_json_output_to_csv(llm_output: list[dict], file_path: str) -> None:
    """
    Takes the llms output as json and converts to csv.
    The method saves multiple json dicts into a csv file.

    Args:
        llm_output (list[dict]): `list` of json entries of multiple scenario outputs
        file_path (str): name of the csv file where the data will be saved
    """

    with open(file_path, 'w') as outfile:
        csv_writer = csv.writer(outfile)
        header = outputs[1].keys()
        csv_writer.writerow(header)
        for index, data in enumerate(outputs):
            rows = []
            # there could be multiple remediations
            # we want each seperetly in its own row
            try:
                for remediation, reason in zip(data['remediation_ids'], data['reasoning']):
                    row = {
                        'scenario_id': data['scenario_id'],
                        'threat_id': data['threat_id'],
                        'vuln_id': data['vuln_id'],
                        'remediation_id': remediation,
                        'classification_desc': data['classification_desc'],
                        'reasoning': reason,
                        'thought': data['thought']
                    }
                    rows.append(row)
            except:
                print(f"Error occured at index{index}")
                continue
            for row in rows:
                csv_writer.writerow(row.values())

## RAG Functions

These are methods required to setup the RAG pipeline

In [None]:
def set_prompt(template, parser) -> ChatPromptTemplate:
    """
    Set up the chat prompt to be used with the model

    Args:
        * template (str): The prompt template to use
        * parser: the parser to use
    Returns:
        `ChatPromptTemplate` object
    """

    prompt = PromptTemplate(
        template=template,
        input_variables=["user", "scenarios", "remediations"],
        partial_variables={"format_instructions": parser.get_format_instructions()},
    )
    return prompt

def load_lrm_model_from_hf(model_id) -> ChatHuggingFace:
    """
    Loads lrm model from hugging face to a `ChatHuggingFace` model

    Args:
        model_id (str): the hugging face url of the model

    Returns:
        A `ChatHuggingFace` model
    """

    quantization_config = BitsAndBytesConfig(
        load_in_4bit=True,
    )

    llm = HuggingFacePipeline.from_model_id(
    model_id=model_id,
    task="text-generation",
    pipeline_kwargs=dict(
        max_new_tokens=10000,
        do_sample=False,
        repetition_penalty=1.03,
        return_full_text=False
        ),
    model_kwargs={"quantization_config": quantization_config},
    device_map="auto",
    )
    model = ChatHuggingFace(llm=llm)
    return model

def create_retrievar_from_vector_store(docs: list):
    """
    Create a retrievar from a vector store.
    Embeds documnents, stores into a vector store and creates
    a retrievar that can be used to retrieve relevant documents.

    Args:
        docs (list): A list of `Documents` which need to be embeded.

    Returns:
    
    """

    embed = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
    vector_store = DocArrayInMemorySearch.from_documents(docs, embed)
    retrievar = vector_store.as_retriever()

    return retrievar

def remediation_lookup(scenario_docs: list[Document]) -> dict:
    """
    Get the list of remediations knowing the threat and
    vulnerability
    The method performs a lookup to get possible countermeasures
    to provided threat and vulnerability.

    Args:

    Returns:
        dictionary of possible countermeasures
    """

    # TODO: handle NoneType
    # TODO: handle leading spaces in doc.page_content
    countermeasures = []
    remediations_df = load_remediations_file("./data/Remediations.xlsx")

    for doc in scenario_docs:
        
        scen_dict = json.loads(doc.page_content)
        if scen_dict["risk_id"] is None:
            continue
        threat_id = scen_dict["risk_id"].strip()
        vuln_id = scen_dict["vuln_id"].strip()
        x = remediations_df[remediations_df["threat_id"]==threat_id]
        df = x[x["vuln_id"]==vuln_id]
        
        countermeasures.extend(convert_df_to_dict(df))
    
    return countermeasures

def setup_rag_chain(model_id: str, template: str) -> RunnableSequence:
    """
    Create the rag chain.
    Creates the chain to be used for the RAG process

    Args:
        model_id (str): the name of the lrm model to use
        template (str): the template to provide to the lrm. Needs to have
                        the keys `(user, scenarios, countermeasures)`

    Returns:
        a `RunnableSequence` chain that implements the RAG workflow
    """
    scenarios_df = load_scenarios_file("./data/Scenarios.xlsx")
    scenarios_dict = convert_df_to_dict(scenarios_df, save_path="./data/scen.json")
    loader_scen = JSONLoader(file_path="./data/scen.json",jq_schema='.[]', text_content=False)
    scenarios_doc = loader_scen.load()
    scenarios = create_retrievar_from_vector_store(scenarios_doc)

    json_parser = CustomJsonOutputParser(pydantic_object=RemediationData)
    
    prompt = set_prompt(TEMPLATE, parser=json_parser)
    
    model = load_lrm_model_from_hf(model_id=model_id)

    
    output_parser = StrOutputParser()

    chain = (
        {
            "user": RunnablePassthrough(),
            "scenarios": scenarios,
            "countermeasures": scenarios | RunnableLambda(remediation_lookup),
        } 
        | prompt
        | model
        | json_parser        
    )

    return chain

def execute_scenarios(scenarios: list[str], chain) -> list[str]:
    """
    Prompt the llm for multiple user scenarios.

    Args:
        scenarios (list[str]): list of multiple user scenarios
        chain: the rag chain

    Returns:
        the llm output as a list of strings for each user scenario
    """

    outputs = []
    for scen in scenarios:
        out = chain.invoke(scen)
        outputs.append(out)

    return outputs

def update_chain_prompt_template(chain: RunnableSequence, parser=CustomJsonOutputParser(pydantic_object=RemediationData), template: str = TEMPLATE ) -> RunnableSequence:
    """
    Update the prompt template for the chain sequence without having to create
    the chain again.

    Args:
        chain (RunnableSequence): The chain which needs to be updated
        parser: the parser to use for the template. Defaults to the `CustomJsonOutputParser`
        template (str): the new prompt template. Defaults to `TEMPLATE`
    Returns:
        the updated chain
    """

    prompt = set_prompt(template, parser)
    chain.assign(prompt=prompt)
    return chain
    

## RAG Workflow

This is where the RAG workflow begins

First load the user scenario data for input

In [None]:
# define user scenarios

# scenarios = [
#     "Only authorized users can open the cabinets containing classified documents and no tracking is required.",
#     "Computers and servers are delivered to the equipment maintenance technicians for repair with the mass storage media inserted."
# ]

scenarios = []
with open("./data/scen.txt") as f:
    scenarios = f.read().splitlines()

Then create the RAG chain

In [None]:
# create the RAG chain

chain = setup_rag_chain(model_id="O1-OPEN/OpenO1-LLama-8B-v0.1", template=TEMPLATE)

In [None]:
"""You can use this method to update the prompt for the chain
    without having to create the chain again
"""

# csv_parser = CustomCSVParser()
# chain = update_chain_prompt_template(chain)

Execute the user scenarios and store the json output

In [None]:
# Execute all user scnearios
outputs = execute_scenarios(scenarios, chain)


# or invoke one your self
# chain.invoke("The processing center is located in the basement. A sewer system runs under the building. The walls of the room that houses the processing center are not reinforced.")

Convert the model json output to a csv

In [None]:
# convert the outputs to human readable csv
convert_llm_json_output_to_csv(outputs, "./data/output.csv")

Also save the output as json for analysis

In [None]:
# Save the output as json 
with open("./data/output.json", "w") as f:
    json.dump(outputs, f)

# Analysis

This is the code used for analysis

In [None]:
with open("./data/output.json") as f:
    model_output = json.load(f)
with open("./data/remediations.json") as f:
    remediation_data = json.load(f)

# Loading the data and clearning it
df_output = pd.DataFrame([row for row in model_output if isinstance(row, dict)])
df_remediations = pd.DataFrame(remediation_data)
df_output_clean = df_output.dropna(subset=['scenario_id', 'vuln_id', 'remediation_ids'])
df_remediations_clean = df_remediations.dropna(subset=['threat_id', 'vuln_id', 'remediation_id'])

# Creating the lookups
remediation_lookup = df_remediations_clean.groupby(['threat_id', 'vuln_id'])['remediation_id'].apply(set).to_dict()
all_known_remediations = set(df_remediations_clean['remediation_id'].unique())

# Variables used for the evaluation
tp = fp = fn = 0
total_expected = total_covered = 0
actionability_scores = []
hidden_risks = []
false_positives = 0
anomaly_types = Counter()
case_records = []

# This is the main evaluation loop
for _, row in df_output_clean.iterrows():
    scenario_id = row['scenario_id']
    threat_id = row['threat_id']
    vuln_id = row['vuln_id']
    predicted = set(row['remediation_ids'])

    key = (threat_id, vuln_id)
    expected = remediation_lookup.get(key, set())
    matched = predicted & expected
    extras = predicted - expected
    missed = expected - predicted

    tp += len(matched)
    fp += len(extras)
    fn += len(missed)
    total_expected += len(expected)
    total_covered += len(matched)

    for remediation in predicted:
        if remediation in matched:
            score = 5
        elif remediation in all_known_remediations:
            score = 3
        else:
            score = 1
        actionability_scores.append(score)

        case_records.append({
            "scenario_id": scenario_id,
            "threat_id": threat_id,
            "vuln_id": vuln_id,
            "remediation_id": remediation,
            "actionability": score,
            "matched": remediation in expected,
            "hidden_risk": remediation not in expected and remediation not in all_known_remediations
        })

    for r in extras:
        if r not in all_known_remediations:
            hidden_risks.append(r)
        else:
            false_positives += 1

    if any(r not in all_known_remediations for r in extras):
        anomaly_types['Hallucination'] += 1
    if any(r in all_known_remediations for r in extras):
        anomaly_types['Incorrect Prioritization'] += 1
    if missed:
        anomaly_types['Omission'] += 1

# Calculating the metrics
precision = tp / (tp + fp) if (tp + fp) else 0
recall = tp / (tp + fn) if (tp + fn) else 0
f1 = (2 * precision * recall) / (precision + recall) if (precision + recall) else 0
comprehensiveness = total_covered / total_expected if total_expected else 0
avg_actionability = np.mean(actionability_scores) if actionability_scores else 0
valid_hidden = len(set(hidden_risks))
hrdr = valid_hidden / (valid_hidden + false_positives) if (valid_hidden + false_positives) else 0

# Printing the output summary
print("Summary:")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"Comprehensiveness: {comprehensiveness:.4f}")
print(f"Avg. Actionability: {avg_actionability:.2f}/5")
print(f"Hidden Risk Discovery Rate: {hrdr * 100:.2f}%")
print("Anomalies:", dict(anomaly_types))

# Preaparing the data for visiualizartions
df_case = pd.DataFrame(case_records)
df_case['key'] = df_case['threat_id'] + "_" + df_case['vuln_id']
accuracy_by_case = df_case.groupby('scenario_id')['matched'].mean().reset_index()

expected_counts = df_remediations_clean.groupby(['threat_id', 'vuln_id'])['remediation_id'].count().reset_index()
expected_counts['key'] = expected_counts['threat_id'] + "_" + expected_counts['vuln_id']
matched_counts = df_case[df_case['matched']].groupby('key')['remediation_id'].count().reset_index(name='matched')
comprehensiveness_df = pd.merge(expected_counts, matched_counts, on='key', how='left').fillna(0)
comprehensiveness_df['comprehensiveness'] = comprehensiveness_df['matched'] / comprehensiveness_df['remediation_id']
comprehensiveness_df['scenario'] = comprehensiveness_df['key']

# Actionability Score Distribution
plt.figure(figsize=(8, 5))
df_case['actionability'].hist(bins=[1, 2, 3, 4, 5, 6], rwidth=0.9, align='left')
plt.title("Distribution of Actionability Scores")
plt.xlabel("Actionability Score (1–5)")
plt.ylabel("Number of Remediation Suggestions")
plt.xticks([1, 2, 3, 4, 5])
plt.grid(True)
plt.tight_layout()
plt.show()

# Hidden Risks Per Scenario
hidden_risk_counts = df_case[df_case['hidden_risk']].groupby('scenario_id').size()
plt.figure(figsize=(10, 5))
hidden_risk_counts.plot(kind='bar')
plt.title("Hidden Risks Identified Per Scenario")
plt.xlabel("Scenario ID")
plt.ylabel("Count")
plt.xticks(rotation=45, ha='right')
plt.grid(True)
plt.tight_layout()
plt.show()

# Normalized Metric Overview
normalized_metrics = {
    "Precision": precision,
    "Recall": recall,
    "F1 Score": f1,
    "Comprehensiveness": comprehensiveness,
    "Actionability": avg_actionability / 5,
    "HRDR": hrdr
}
plt.figure(figsize=(10, 6))
plt.bar(normalized_metrics.keys(), normalized_metrics.values())
plt.title("Normalized Evaluation Metrics")
plt.ylabel("Score (0–1)")
plt.ylim(0, 1)
plt.grid(True)
plt.tight_layout()
plt.show()

# Anomaly Breakdown Pie Chart
if anomaly_types:
    plt.figure(figsize=(7, 7))
    plt.pie(anomaly_types.values(), labels=anomaly_types.keys(), autopct='%1.1f%%', startangle=140)
    plt.title("Anomaly Type Distribution")
    plt.tight_layout()
    plt.show()
