# Notebook to run GPT, Gemini, Replicate models

## Install All Required Packages
Run this cell first to install all necessary packages if not already installed.

In [39]:
# Install required packages
#  %pip install openai replicate python-dotenv pandas numpy matplotli google.genai

# Notebook to run GPT, Gemini, Replicate models

In [40]:
from openai import OpenAI
import replicate
""" import vertexai
from vertexai.generative_models import GenerativeModel, GenerationConfig, HarmBlockThreshold, HarmCategory
 """
from google import genai
from google.genai import types
import pandas as pd
import numpy as np
import os
import time
import re
import matplotlib.pyplot as plt

## Setup all APIs

In [41]:
# replicate
import os
from dotenv import load_dotenv

load_dotenv()

True

In [42]:
# ChatGPT
openai_client = OpenAI(api_key=os.environ.get("OPEN_AI_API_KEY"))

In [43]:
# Gemini
# project_id = ""   # add project ID and location
# vertexai.init(project=project_id, location="")

gemini_client = genai.Client(api_key=os.environ.get("GOOGLE_API_KEY"))


## Prediction Params & Method

In [44]:
# set parameters for more deterministic output
temperature = 0
top_p = 1
seed = 42
max_tokens = 2048

In [45]:
sys_prompt = 'You are a cybersecurity expert specializing in cyberthreat intelligence.'

In [46]:
model_mapping = {
    'llama3-70b': 'meta/meta-llama-3-70b-instruct',
    'llama3-8b': 'meta/meta-llama-3-8b-instruct',
    'gemini': 'gemini-2.5-pro-exp-03-25', 
    'gpt3': 'gpt-3.5-turbo',
    'gpt4': 'gpt-4-turbo',
    'gpt-4o-mini': 'gpt-4o-mini',
}

In [47]:
def get_single_prediction(question, model_name):
    if model_name.startswith('llama') or model_name.startswith('mistral'):
        # replicate
        model = model_mapping[model_name]
        prompt = sys_prompt + ' ' + question
        input = {'prompt': prompt, 'max_tokens': max_tokens, 'temperature': temperature, 'top_p': top_p, 'seed': seed}
        output = replicate.run(model, input=input)
        output = "".join(output)
    elif model_name.startswith('gemma'):
        # replicate
        model = model_mapping[model_name]
        prompt = sys_prompt + ' ' + question
        input = {'prompt': prompt, 'max_tokens': max_tokens, 'temperature': 0.01, 'top_p': top_p, 'seed': seed}
        output = replicate.run(model, input=input)
        output = "".join(output)
    elif model_name.startswith('01-ai'):
        # replicate
        model = model_mapping[model_name]
        prompt = sys_prompt + ' ' + question
        input = {'prompt': prompt, 'max_tokens': max_tokens, 'temperature': temperature, 'top_p': top_p, 'seed': seed}
        output = replicate.run(model, input=input)
        output = "".join(output)
    elif model_name.startswith('gpt'):
        # ChatGPT
        model = model_mapping[model_name]
        response = openai_client.chat.completions.create(
            model=model,
            messages=[
                {'role': 'system', 'content': sys_prompt},
                {'role': 'user', 'content': question}
            ],
            temperature=temperature,
            top_p=top_p,
            max_tokens=max_tokens,
            seed=seed
        )
        output = response.choices[0].message.content
    elif model_name.startswith('gemini'):
        # Gemini
        model = model_mapping[model_name]
        prompt = sys_prompt + ' ' + question
        
        # Use gemini_client instead of Vertex AI's GenerativeModel
        safety_settings = {
            types.HarmCategory.HARM_CATEGORY_HARASSMENT: types.HarmBlockThreshold.BLOCK_ONLY_HIGH,
            types.HarmCategory.HARM_CATEGORY_HATE_SPEECH: types.HarmBlockThreshold.BLOCK_ONLY_HIGH,
            types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: types.HarmBlockThreshold.BLOCK_ONLY_HIGH,
            types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: types.HarmBlockThreshold.BLOCK_ONLY_HIGH,
        }
        
        generation_config = {
            "temperature": temperature,
            "top_p": top_p,
            "max_output_tokens": max_tokens,
        }
        
        response = gemini_client.models.generate_content(
            model=model,
            contents=prompt,
            config=generation_config,
        )
        
        output = response.text
        time.sleep(1)   # so it doesn't throw error
        return output


#### Test

In [48]:
question = (
    "Analyze the following CVE description and map it to the appropriate CWE. "
    "Provide a brief justification. The last line of your answer should only contain the CWE ID.\n\n"
    "CVE Description:\n\n"
    "Dell EMC CloudLink 7.1 and all prior versions contain an Improper Input Validation Vulnerability. "
    "A remote low privileged attacker may potentially exploit this vulnerability, "
    "leading to execution of arbitrary files on the server."
)

question2 = ("Analyze the following CVE description and calculate the CVSS v3.1 Base Score. Determine the values for each base metric: AV, AC, PR, UI, S, C, I, and A. Summarize each metric's value and provide the final CVSS v3.1 vector string.   Valid options for each metric are as follows: - **Attack Vector (AV)**: Network (N), Adjacent (A), Local (L), Physical (P) - **Attack Complexity (AC)**: Low (L), High (H) - **Privileges Required (PR)**: None (N), Low (L), High (H) - **User Interaction (UI)**: None (N), Required (R) - **Scope (S)**: Unchanged (U), Changed (C) - **Confidentiality (C)**: None (N), Low (L), High (H) - **Integrity (I)**: None (N), Low (L), High (H) - **Availability (A)**: None (N), Low (L), High (H)  Summarize each metric's value and provide the final CVSS v3.1 vector string. Ensure the final line of your response contains only the CVSS v3 Vector String in the following format:  Example format: CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H  CVE Description: The Orbit Fox by ThemeIsle plugin for WordPress is vulnerable to Stored Cross-Site Scripting via the plugin's Pricing Table Elementor Widget in all versions up to, and including, 2.10.27 due to insufficient input sanitization and output escaping on the user supplied link URL. This makes it possible for authenticated attackers with contributor-level and above permissions to inject arbitrary web scripts in pages that will execute whenever a user accesses an injected page.")

##### Are all the APIS working?

In [49]:
# print(get_single_prediction(question, 'gpt-4o-mini'))

In [50]:
print(get_single_prediction(question2, 'gemini'))

Okay, let's analyze the CVE description and determine the CVSS v3.1 Base Score metrics.

**CVE Description Analysis:**

*   **Vulnerability:** Stored Cross-Site Scripting (XSS).
*   **Component:** Orbit Fox plugin for WordPress, specifically the Pricing Table Elementor Widget.
*   **Mechanism:** Insufficient input sanitization and output escaping on a link URL field.
*   **Attacker Requirements:** Authenticated user with contributor-level (or higher) permissions.
*   **Exploitation:** Attacker injects script via the vulnerable widget field. The script


In [32]:
# print(get_single_prediction(question, 'llama3-8b'))

# Run Evaluation for a Dataset

### All formatting comes here
While these captures most output format of the LLMs we studied, we still had to manually collect some responses from the generated response file

In [33]:
def format_rcm(text):
    # Define the regex pattern for CWE ID
    cwe_pattern = r'CWE-\d+'

    # Find all matches in the text
    matches = re.findall(cwe_pattern, text)

    # Return the last match if any match is found, otherwise return the original text
    if matches:
        return matches[-1], True
    else:
        return text, False

def format_vsp(text):
    # Define the regex pattern for CVSS v3.1 vector string
    #cvss_pattern = r'AV:[^/]+?/AC:[^/]+?/PR:[^/]+?/UI:[^/]+?/S:[^/]+?/C:[^/]+?/I:[^/]+?/A:[^/]+?'
    cvss_pattern = r'AV:[A-Za-z]+/AC:[A-Za-z]+/PR:[A-Za-z]+/UI:[A-Za-z]+/S:[A-Za-z]+/C:[A-Za-z]+/I:[A-Za-z]+/A:[A-Za-z]+'


    # Find all matches in the text
    matches = re.findall(cvss_pattern, text)

    # Return the last match if any match is found, otherwise return the original text
    if matches:
        return matches[-1], True
    else:
        return text, False

def format_mcq(text):
    last_line = text.split('\n')[-1].rstrip()
    if last_line.startswith('A)') or last_line.startswith('B)') or last_line.startswith('C)') or last_line.startswith('D)'):
        return last_line[0]
    if last_line.endswith('A') or last_line.endswith('B') or last_line.endswith('C') or last_line.endswith('D'):
        return last_line[-1]
    if last_line.endswith('**'):
        return last_line[-3]
    if len(last_line) == 0:
        last_line = text.split('\n')[-2].rstrip()
        if last_line.startswith('A)') or last_line.startswith('B)') or last_line.startswith('C)') or last_line.startswith('D)'):
            return last_line[0]
        if last_line.endswith('A') or last_line.endswith('B') or last_line.endswith('C') or last_line.endswith('D'):
            return last_line[-1]
        if last_line.endswith('**'):
            return last_line[-3]
    return ' '.join(text.split('\n'))

def format_taa(text):
    # need to manually extract the attribution
    return ' '.join(text.split('\n'))

In [34]:
def run_evaluation(file_path, task, model_name):
    # Keep track of time and total #chars generated
    start_time = time.time()
    count_chars = 0
    instructions_failed = 0
    
    data = pd.read_csv(file_path, encoding='utf-8', sep='\t')

    # response contain the entire response, result the formatted result
    all_responses = []
    all_results = []
    
    for index, row in data.iterrows():
        prompt = row['Prompt']
        try:
            output = get_single_prediction(prompt, model_name)
            
            count_chars += len(output)
            all_responses.append(output)
            if task == 'rcm':
                answer, success = format_rcm(output)
                if not success:
                    instructions_failed += 1
            elif task == 'vsp':
                answer, success = format_vsp(output)
                if not success:
                    instructions_failed += 1      
            elif task == 'mcq':
                answer = format_mcq(output)
            elif task == 'taa':
                answer = format_taa(output)
            else:
                raise ValueError('Task unknown!')
        except Exception as e:
            answer = 'Error'
            all_responses.append(answer)
            print('Exception at row ', index+1)
            print(e)
        all_results.append(answer)
        print(index+1, answer)
        # print(index+1)


    time_taken = time.time() - start_time
    print('Time taken:', time_taken)
    print('#Characters generated:', count_chars)
    print('#Instructions failed:', instructions_failed)

    # Create responses-new directory if it doesn't exist
    output_dir = 'responses-new'
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Save all the responses & results to the responses-new folder
    out_response = os.path.join(output_dir, 'SENG402_' + os.path.basename(file_path).split('.')[0] + '_' + model_name + '_response.txt')
    out_result = os.path.join(output_dir, 'SENG402_' + os.path.basename(file_path).split('.')[0] + '_' + model_name + '_result.txt')

    with open(out_response, 'w', encoding='utf-8') as f:
        out_str = ''
        for i in range(len(all_responses)):
            out_str += '#####' + str(i+1) + '#####\n'
            out_str += all_responses[i]
            out_str += '\n\n'
        f.write(out_str)
    with open(out_result, 'w', encoding='utf-8') as f:
        f.write('\n'.join(all_results))

    print('------- Done --------')

In [35]:
# run_evaluation('datasets/cti-mcq.tsv', 'mcq', 'gpt3')

In [36]:
# run_evaluation('datasets/cti-rcm.tsv', 'rcm', 'gpt3')

In [53]:
run_evaluation('../new-data/cti-vsp-only-2024-and-2025-SMALL.tsv', 'vsp', 'gpt-4o-mini')

Exception at row  1
object of type 'NoneType' has no len()
1 Error
Time taken: 12.952844619750977
#Characters generated: 0
#Instructions failed: 0
------- Done --------


In [32]:
# run_evaluation('datasets/cti-taa.tsv', 'taa', 'gpt3')