#Replication package for "Beyond Words: On Large Language Models Actionability in Mission-Critical Risk Analysis"

## Instructions
This Python notebook facilitates the replication of the findings presented in the manuscript. Please follow the prescribed order in which the scripts are presented. Interactive prompts will request any files necessary for fine-tuning the model, create the RAG-assisted models, testing it, and preparing the evaluations for the human reviewers.

## Outputs
All script outputs will be automatically downloaded to your browser. Specifically, the full responses generated by the all the models will be automatically downloaded.

## Prerequisites
- OpenAPI Key saved as a secret with the name "openaikey" in your environment (located on the left side of this notebook).
- Installation of the openai library (version >= 7.2.0) in Python.

#Auxilliary functions and setup
In this section, we provide all the auxilliary function to replicate our study and the constant values the other functions expect to find in the global namespace.


##Libraries

In [None]:
!pip install openai

##Imports and Constants

In [None]:
import csv
import json
from google.colab import files
import json
from google.colab import files
import pandas as pd
from sklearn.metrics import confusion_matrix, accuracy_score, precision_recall_fscore_support
import os
import openai
import json
import re
from google.colab import userdata
from openai import OpenAI
import pdb;


file_encoding = 'utf-8-sig'
csv_delimiter =";"
csv_header = [
    "ID Scenario",
    "Scenario Description",
    "Short",
    "Extended",
    "RiskID",
    "RiskDesc",
    "VulnID",
    "VulnDesc",
    "RiskType"
]

system_message='''You are an assistant in security risk analysis.
      You need to determine if the current user message contains a security threat.
      If a security threat is present, please explain what the security threat is.
      You must reply with \"more\"  in the \"Short\" field if you think additional details should be provided along with the vulnerability already discovered
      You must reply with \"no\"  in the \"Short\" field  if you think NO vulnerabilities are present
      You must reply with \"yes\"  in the \"Short\" field  if you think there is at least one vulnerability
      You must NEVER HALLUCINATE
      You always have to reply in ITALIAN.
      Always respond with an array of valid JSON output, for each vulnerability you find, create an item as the following and put into an array of json:
      {
      \"Extended\": \"[Extended description]\",
      \"Short\":	 \"[Vulnerability Present: YES/NO/MORE]\",
      \"Details\": \"[Vulnerability Description]\",
      \"RiskID\":	\"[Risk ID]\",
      \"RiskDesc\": \"[Risk Description]\",
      \"VulnID\":	\"[Vulnerability ID]\",
      \"VulnDesc\": \"[Vulnerability Description]\",
      \"RiskType\": \"[Reale/Potenziale]\"
      },'''

system_message_rag='''You are an assistant in security risk analysis.
      You need to determine if the current user message contains a security threat.
      If a security threat is present, please explain what the security threat is.
      You must reply with \"more\" if you think additional details should be provided along with the vulnerability already discovered
      You must reply with \"no\"  if you think NO vulnerabilities are present
      You must reply with \"yes\"  if you think there is at least one vulnerability
      You must NEVER HALLUCINATE
      You always have to reply in ITALIAN.
      If you think "yes" or "more" You MUST list the identified vulnerability (vulnearbilità) and threat (minaccia) with the appropriate Identifiers refer to the document in your retrieval vectorstore
      For the acronym  refer to the document in your retrieval vectorstore
      Give all the information without asking the user more input
      '''

system_message_reformat='''You are an assistant in security risk analysis.
      You need to format the user message as follows
      If a security threat is present, please explain what the security threat is.
      You must reply with \"more\"  in the \"Short\" field if you think additional details should be provided along with the vulnerability already discovered
      You must reply with \"no\"  in the \"Short\" field  if you think NO vulnerabilities are present
      You must reply with \"yes\"  in the \"Short\" field  if you think there is at least one vulnerability
      You must NEVER HALLUCINATE
      You always have to reply in ITALIAN.
      If the message ONLY contain "si"/"yes" simply say in the short field "yes", and for the rest leave blank
      If the message ONLY contain "no" simply say in the short field "no", and for the rest leave blank
      Always respond with an array of valid JSON output, for each vulnerability/threat you find, create an item as the following and put into an array of json:
      {
      \"Extended\": \"[Extended description]\",
      \"Short\":	 \"[Vulnerability Present: YES/NO/MORE]\",
      \"Details\": \"[Vulnerability Description]\",
      \"RiskID\":	\"[Risk ID]\",
      \"RiskDesc\": \"[Risk Description]\",
      \"VulnID\":	\"[Vulnerability ID]\",
      \"VulnDesc\": \"[Vulnerability Description]\",
      \"RiskType\": \"[Reale/Potenziale]\"
      },'''


os.environ["OPENAI_API_KEY"] = userdata.get('openaikey')
openai.api_key = os.environ["OPENAI_API_KEY"]
client = OpenAI()

##Data Management Handlers

In [None]:

def csv_to_dict(csv_file):
    """
    Converts a CSV file into a dictionary format.

    Args:
        csv_file (str): The path to the CSV file.

    Returns:
        dict: A dictionary representing the data from the CSV file.
    """
    data_dict = {}
    with open(csv_file, 'r',encoding=file_encoding) as file:
        csv_reader = csv.DictReader(file, delimiter=csv_delimiter)
        for row in csv_reader:
            scenario_id = row['ID Scenario'].replace("'","")
            user_message = row['User'].replace("'","")
            result=row['Assistant - Short'].replace("'","")
            if(result=="No"):
              assistant_message = {
                'Extended':row['Assistant - Extended'].replace("'",""),
                'Short':row['Assistant - Short'].replace("'",""),
              }
            else:

              assistant_message = {
                  'Extended':row['Assistant - Extended'].replace("'",""),
                  'Short':row['Assistant - Short'].replace("'",""),
                  'Details': row['Assistant - Details'].replace("'",""),
                  'RiskID': row['Assistant - Risk ID'].replace("'",""),
                  'RiskDesc': row['Assistant - Risk description'].replace("'",""),
                  'VulnID': row['Assistant - Vulnerability ID'].replace("'",""),
                  'VulnDesc': row['Assistant - Vulnerability description'].replace("'",""),
                  'RiskType': row['Assistant - Risk occurrence type'].replace("'","")
              }

            if scenario_id not in data_dict:
                data_dict[scenario_id] = {'UserMessage': user_message, 'AssistantMessages': [], 'TotalAssistantMessages' : 0}
            data_dict[scenario_id]['AssistantMessages'].append(assistant_message)
    return data_dict



def upload_file_and_get_path():
    """
    Uploads a file and returns its path.

    Returns:
        str: The path of the uploaded file.
    """
    uploaded = files.upload()
    for filename in uploaded.keys():
        return filename



def convert_rawcsv_to_json_dict(csv_file):
    """
    Converts a raw CSV file to a JSON dictionary.

    Args:
        csv_file (str): The path to the raw CSV file.

    Returns:
        dict: A JSON dictionary containing the converted data.
    """
    result = csv_to_dict(csv_file)
    result = add_total_assistant_messages(result)
    return result

def store_json_file(destination, content):
    """
    Stores JSON content to a file.

    Args:
        destination (str): The path to save the JSON file.
        content (dict): The JSON content to be stored.
    """
    with open(destination, 'w', encoding=file_encoding) as json_file:
      json.dump(content, json_file, indent=4, ensure_ascii=False)

def load_json_file(source):
    """
    Loads JSON content from a file.

    Args:
        source (str): The path of the JSON file.

    Returns:
        dict: The loaded JSON content.
    """
    with open(source, 'r', encoding=file_encoding) as json_file:
        data = json.load(json_file)
    return data


def max_assistant_messages(json_data):
    """
    Finds the maximum number of assistant messages among scenarios.

    Args:
        json_data (dict): JSON data containing scenario information.

    Returns:
        int: The maximum number of assistant messages.
    """
    max_messages = 0
    for scenario_data in json_data.values():
        assistant_messages_count = scenario_data['TotalAssistantMessages']
        if assistant_messages_count > max_messages:
            max_messages = assistant_messages_count
    return max_messages


def add_total_assistant_messages(json_data):
    """
    Adds total assistant message count to each scenario in JSON data.

    Args:
        json_data (dict): JSON data containing scenario information.

    Returns:
        dict: JSON data with total assistant message counts added.
    """
    for scenario_id, scenario_data in json_data.items():
        total_messages = len(scenario_data['AssistantMessages'])
        scenario_data['TotalAssistantMessages'] = total_messages
    return json_data


def create_questionaire(json_data):
  """
  Creates a questionnaire CSV file based on JSON data.

  Args:
      json_data (dict): JSON data containing scenario information.
  """
  # Create a list to hold the rows of the CSV
  csv_rows = []

  # Append header to the CSV rows
  header = [
    "ID Scenario",
    "Scenario Description",
    "Result",
    "Risk ID",
    "Vulnerability ID",
    "Risk occurrence type"
  ]

  csv_rows.append(header)

  # Iterate through the scenarios in the JSON data
  for scenario_id, scenario_data in json_data.items():
      # Extract Scenario ID and User Message
      scenario_info = [scenario_id, scenario_data['UserMessage'],"","","",""]
      # Append the extracted info to the CSV rows
      csv_rows.append(scenario_info)

  # Write CSV
  with open('questionaire.csv', 'w', newline='', encoding=file_encoding) as csv_file:
      csv_writer = csv.writer(csv_file,delimiter=csv_delimiter)
      csv_writer.writerows(csv_rows)

  # Print message
  print("questionaire CSV file has been created.")



def print_risk_type_set(json_data):
    """
    Prints a set of unique risk types found in the JSON data.

    Args:
        json_data (dict): JSON data containing scenario information.
    """
    # Insieme per memorizzare i Risk type unici
    risk_types = set()

    # Itera attraverso gli scenari per estrarre i Risk type unici
    for scenario_data in json_data.values():
        for assistant_message in scenario_data['AssistantMessages']:
            if('RiskType' in assistant_message):
              risk_types.add(assistant_message['RiskType'])

    # Stampare l'insieme dei Risk type unici
    print("Insieme dei Risk type unici:")
    for risk_type in risk_types:
        print(risk_type)







from google.colab import files

def write_string_to_file_and_download(string_data, filename):
    """
    Writes a string to a file and downloads it.

    Args:
        string_data (str): The string data to be written to the file.
        filename (str): The name of the file.

    """
    with open(filename, 'w', encoding=file_encoding) as file:
        file.write(string_data)
    files.download(filename)





def splitTrainingAndValidation(jsonl_data):
  """
    Splits JSONL data into training and validation sets and writes them to files.

    Args:
        jsonl_data (dict): JSONL data containing scenario information.
  """
  # Calculate the number of entries for test and validation
  total_entries = len(jsonl_data)
  test_entries = int(0.7 * total_entries)
  validation_entries = total_entries - test_entries

  # Slice the original dictionary to create test and validation dictionaries
  test = dict(list(jsonl_data.items())[:test_entries])
  validation = dict(list(jsonl_data.items())[test_entries:])

  # Call the function getJSONL on test and validation dictionaries
  training = getJSONL(test)
  write_string_to_file_and_download(training,"training.jsonl")
  validation = getJSONL(validation)
  write_string_to_file_and_download(validation,"validation.jsonl")

def dump_array_to_json_and_download(data_array, filename):
    # Write JSON data to a file
    store_json_file(filename,data_array)

    # Provide download link
    files.download(filename)





def json_to_csv(json_data, csv_header,model,isAssistant):
    # Open a CSV file in write mode
    with open(f'output_{model}.csv', 'w', newline='', encoding=file_encoding) as csvfile:
        writer = csv.writer(csvfile,delimiter=csv_delimiter)

        # Write the CSV header
        writer.writerow(csv_header)
        #pdb.set_trace()
        # Iterate over each JSON entry
        for key,value in json_data.items():
            if key:
                if(isAssistant):

                    for item in value:
                        row = []
                        if(item.strip("\n")==testing_dict[key]["UserMessage"]):
                          continue
                        for header in csv_header:
                            # Stripping double and single quotes from header
                            stripped_header = header.strip('"').strip("'")
                            if(stripped_header=="ID Scenario"):
                              row.append(key)
                            elif(stripped_header=="Scenario Description"):
                              row.append(testing_dict[key]["UserMessage"])
                            elif(stripped_header=="Extended"):
                              row.append(item)
                            else:
                              row.append("")
                        # Write the row to the CSV file
                        writer.writerow(row)
                else:
                  for item in value:
                    # Extract values for each key in the CSV header
                    row = [item.get(header.strip('"').strip("'"), '') for header in csv_header]

                    # Write the row to the CSV file
                    writer.writerow(row)
            else:
                # If the JSON entry is empty, write an empty row with only scenario ID and description
                writer.writerow(['', ''])
    files.download(f'output_{model}.csv')

##Fine Tuning and Result Handlers

In [1]:
def startFineTunedModelTestingWithDict(model, testing_dict,assistant):
    results = {}
    full_responses = []
    errors = {}

    number_of_scenarios= len(testing_dict)
    current = 1
    for key, value in testing_dict.items():
        current,full_responses,results,errors = extracted(key,value,current,number_of_scenarios,assistant,full_responses,results,errors)
    dump_array_to_json_and_download(results, f"results_{model}.json")
    dump_array_to_json_and_download(full_responses, f"full_responses_{model}.json")


def runLLM(model,userMessages,assistant):

  if(assistant):

   return create_thread_and_send_message(model,userMessages);

  else:
    stream = client.chat.completions.create(
                  model=model,
                  temperature=0.35,
                  max_tokens=4096,
                  top_p=0.75,
                  frequency_penalty=0,
                  presence_penalty=0,
                  messages=[
                      {"role": "system", "content": system_message},
                      {"role": "user", "content": userMessages}],
                  stream=True,
              )

    result_content =""
    for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            result_content+=chunk.choices[0].delta.content
    return result_content

def extracted(key,value,current,number_of_scenarios,assistant,full_responses,results,errors):
  # Do something with the current message (print in this case)
      userMessages = value["UserMessage"]
      print(f"Sending Scenario #{current}/{number_of_scenarios}: {key} - {userMessages}")
      current+=1

      result_content = runLLM(model, userMessages,assistant)

      if(assistant):

          print(f"Response: {result_content} \n\n")
          full_responses.append(result_content)
          # Now, 'parsed_dict' contains the data as a Python dictionary
          results[key]=result_content
          return current,full_responses,results,errors
      else:
        # Extract only the content from the OpenAI response
        result_content = result_content.replace("```json\n","").replace("```","").replace("\\n","")


        # Parse the JSON-like data using the custom decoder
        try:
          jsonArray = json.loads(result_content)
          final_array = []
          for item in jsonArray:
            item["ID Scenario"] = key
            item["Scenario Description"] = value["UserMessage"]
            final_array.append(item)

          print(f"Response: {final_array} \n\n")
          full_responses.append(final_array)

          # Now, 'parsed_dict' contains the data as a Python dictionary
          results[key]=final_array
          return current,full_responses,results,errors
        except Exception as e:
          print(f"There was an error with {key}... Let's Retry")
          errors[key] = result_content
          return extracted(key,value,current,number_of_scenarios,assistant,full_responses,results,errors)



### Test JSONL Format [Optional]

In [None]:
def testJSONLFile(filename):
  from collections import defaultdict
    # Load the dataset
  with open(filename, 'r', encoding=file_encoding) as f:
      dataset = []
      for line in f:
        json_string = line.replace("\"", '\\"')  # Replace single quotes with double quotes
        json_string = json_string.replace("'", '"')
        print(json_string)
        decoded_json = json.loads(json_string)  # Decode JSON string
        dataset.append(decoded_json)  # Append decoded JSON to dataset

  # Initial dataset stats
  print("Num examples:", len(dataset))
  print("First example:")
  for message in dataset[0]["messages"]:
      print(message)

  # Format error checks
  format_errors = defaultdict(int)

  for ex in dataset:
      if not isinstance(ex, dict):
          format_errors["data_type"] += 1
          continue

      messages = ex.get("messages", None)
      if not messages:
          format_errors["missing_messages_list"] += 1
          continue

      for message in messages:
          if "role" not in message or "content" not in message:
              format_errors["message_missing_key"] += 1

          if any(k not in ("role", "content", "name", "function_call", "weight") for k in message):
              format_errors["message_unrecognized_key"] += 1

          if message.get("role", None) not in ("system", "user", "assistant", "function"):
              format_errors["unrecognized_role"] += 1

          content = message.get("content", None)
          function_call = message.get("function_call", None)

          if (not content and not function_call) or not isinstance(content, str):
              format_errors["missing_content"] += 1

      if not any(message.get("role", None) == "assistant" for message in messages):
          format_errors["example_missing_assistant_message"] += 1

  if format_errors:
      print("Found errors:")
      for k, v in format_errors.items():
          print(f"{k}: {v}")
      return False
  else:
      print("No errors found")
      return True

## Assistant API Auxilliary Functions
[Documentation on Assistant-Knowledge Retrieval](https://platform.openai.com/docs/assistants/tools/knowledge-retrieval)

In [None]:
def handleAssistantCreation(model,file_ids):
  assistant = client.beta.assistants.create(
    instructions=system_message_rag,
    model=model,
    tools=[{"type": "retrieval"}],
    file_ids=file_ids
  )
  return assistant

def addFileForRetrieval(filename):
  # Upload a file with an "assistants" purpose
  file = client.files.create(
    file=open(filename, "rb"),
    purpose='assistants'
  )
  return file

def uploadAndGetList():
  table_vuln = upload_file_and_get_path()
  table_risk = upload_file_and_get_path()
  glossary = upload_file_and_get_path()

  table_vuln_id = addFileForRetrieval(table_vuln).id
  table_risk_id = addFileForRetrieval(table_risk).id
  glossary_id = addFileForRetrieval(glossary).id
  file_ids=[table_vuln_id,table_risk_id,glossary_id]
  return file_ids


import time

def create_thread_and_send_message(assistant, message_content):
  thread = client.beta.threads.create()
  message = client.beta.threads.messages.create(
    thread_id=thread.id,
    role="user",
    content=message_content
  )
  run = client.beta.threads.runs.create(
    thread_id=thread.id,
    assistant_id=assistant,
    instructions=system_message_rag
  )

  while run.status in ['queued', 'in_progress', 'cancelling']:
    time.sleep(1) # Wait for 1 second
    run = client.beta.threads.runs.retrieve(
      thread_id=thread.id,
      run_id=run.id
    )

  if run.status == 'completed':
    retrieved_message = client.beta.threads.messages.list(
      thread_id=thread.id
    )
    print(retrieved_message.data[0].content[0].text.value)

    results= []
    for assistant_message in retrieved_message.data:
      #results.append(assistant_message.content[0].text.value)
      results.append(extract_message_and_annotations(assistant_message))
    return results
  else:
    print("ERRORE!")
    print(run.status)

  client.beta.threads.delete(thread.id)

def extract_message_and_annotations(message):
  # Extract the message content
  message_content = message.content[0].text
  annotations = message_content.annotations
  citations = []

  # Iterate over the annotations and add footnotes
  for index, annotation in enumerate(annotations):
      # Replace the text with a footnote
      message_content.value = message_content.value.replace(annotation.text, f' [{index}]')

      # Gather citations based on annotation attributes
      if (file_citation := getattr(annotation, 'file_citation', None)):
          cited_file = client.files.retrieve(file_citation.file_id)
          citations.append(f'[{index}] {file_citation.quote} from {cited_file.filename}')
      elif (file_path := getattr(annotation, 'file_path', None)):
          cited_file = client.files.retrieve(file_path.file_id)
          citations.append(f'[{index}] Click <here> to download {cited_file.filename}')
          # Note: File download functionality not implemented above for brevity
    # Add footnotes to the end of the message before displaying to user
  message_content.value += '\n' + '\n'.join(citations)
  return message_content.value

##Training and Testing set creation

The following methods requires two csv files containing the training and the testing scenarios.

In [None]:
training_raw = upload_file_and_get_path()
testing_raw = upload_file_and_get_path()

We translate the raw scenarios into dictionaries with the Scenario ID as key and the assistant message fully assembled from its raw csv version.


In [None]:
training_dict = convert_rawcsv_to_json_dict(training_raw)
testing_dict = convert_rawcsv_to_json_dict(testing_raw)

We save locally the json files.

In [None]:
store_json_file("training_dict.json", training_dict)
store_json_file("testing_dict.json", testing_dict)

We translate the training dictionary into JSONL format for fine-tuning.

##Fine-Tuning Data Preparation

We split the training data into training and validation sets with 70-30 ratio. We use the online GUI to perform the fine-tuning process.

In [None]:
def getJSONL(json_data):
    """
    Converts JSON data into a format suitable for fine-tuning.

    Args:
        json_data (dict): JSON data containing scenario information.

    Returns:
        str: A string containing the JSON-Lines formatted data.
    """
    jsonl = []
    for scenario_data in json_data.values():
        user_message = scenario_data['UserMessage'].replace("'","")#.replace("'", "\'")
        messages = [
            {"role": "system", "content": system_message},
            {"role": "user", "content": user_message},
            {"role": "assistant", "content": json.dumps(scenario_data['AssistantMessages'],ensure_ascii=False)}]
        item_to_append = {"messages":messages}

        #print(item_to_append)
        jsonl.append(item_to_append)
    ret = ""
    for line in jsonl:
      jsonl_line= json.dumps(line,ensure_ascii=False).replace("'", '"')
      ret += f"{jsonl_line}\n"

    return ret

In [None]:
rest = getJSONL(training_dict)
write_string_to_file_and_download(rest,"training.jsonl")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
splitTrainingAndValidation(training_dict)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

#Questionaire Creation
The following creates, and download, a CSV file containing the scenarios for  human experts to analyze. Those scenarios are the same for human experts and LLMs.

In [None]:
create_questionaire(testing_dict)

#Model Testing


Below, you can initiate testing the fine-tuned model on the provided data. Please use the appropriate Python methods to generate the assistants' IDs. Similarly, use OpenAI's GUI to fine-tune the model and generate the ID.

In [None]:
base = "gpt-3.5-turbo-1106"
base_fine_tuned = "xxxxxxxxxx"
base_four_turbo = "gpt-4-turbo-preview"
assistant_base = "xxxxxxxxxx"
asssistant_base_four = "xxxxxxxxxx"

In [None]:
file_ids = uploadAndGetList()
assistant_base = handleAssistantCreation(base,file_ids).id
asssistant_base_four = handleAssistantCreation(base_four_turbo,file_ids).id

In [None]:
model = asssistant_base_four
isAssistant=False

In [None]:
startFineTunedModelTestingWithDict(model, testing_dict,isAssistant)

#Output Preparation for Human Reviewers

In [None]:
test_result = upload_file_and_get_path()
json_data = load_json_file(test_result)
json_to_csv(json_data, csv_header,model,isAssistant)