# Vulnerability Detection

In [1]:
import locale
locale.getpreferredencoding = lambda: "UTF-8" # this is needed to get rid of weird colab locale error
# if you are still running into issues, please restart the runtime to initialize a new environment

In [None]:
# obtain data
!wget https://github.com/uiuc-cs598lmz-s24/hw4/raw/main/d2a.zip

In [None]:
!unzip d2a.zip

In [None]:
import torch

from torch.utils.data import DataLoader
from transformers import AdamW, AutoTokenizer, AutoModelForCausalLM, T5ForConditionalGeneration

def load_codet5_model():
    tokenizer = AutoTokenizer.from_pretrained("Salesforce/codet5-base")
    model = T5ForConditionalGeneration.from_pretrained("Salesforce/codet5-base-codexglue-defect").cuda()
    return model, tokenizer

model, tokenizer = load_codet5_model()


In [None]:
# example test using the small model

code_snippets = """
#include <stdio.h>
int main()
{
    return 0;
}
"""

input_tensor = tokenizer.encode(code_snippets, return_tensors='pt').to("cuda")
x = model.generate(input_tensor, max_length=50, temperature=1)


In [None]:
import glob
import json

def prepare_vuln_dataset():
    vuln_dataset = []
    for file in glob.glob("d2a/*/src.c"):
        with open(file, "r") as f:
          src = f.read()

        with open(file.replace("src.c","meta_data.json"), "r") as f:
          x = json.load(f)

        x['src'] = src
        vuln_dataset.append(x)

    return vuln_dataset


vuln_dataset = prepare_vuln_dataset()

In [None]:
import os
from tqdm.auto import tqdm


def predict_vuln(code: str, model, tokenizer) -> bool:
    # TODO: implement greedy prediction of the model
    # The return value should either be True (code contains a vulnerability)
    # or False (code does not contain a vulnerability)
    return prediction


def codet5_vuln_detection(model, tokenizer, vuln_dataset) -> float:

  # TODO add neccessary local variables

  for data in tqdm(vuln_dataset):
      code = data['src']
      gt_prediction = data['vulnerable']
      vulnerability_type = data['vulnerability_type']

      prediction = predict_vuln(code, model, tokenizer)

  # TODO: compute F1 score

# F1 score of codet5
f1_score = codet5_vuln_detection(model, tokenizer, vuln_dataset)

# Prompt-based Vulnerability Detection

In [None]:
import google.generativeai as genai
genai.configure(api_key="YOUR_API_KEY_HERE")
gemini = genai.GenerativeModel("models/gemini-pro")

In [None]:
import signal
import time
from google.generativeai import GenerationConfig

safety_settings = [ # google is afraid of pretty much everything i think.
    {
        "category": "HARM_CATEGORY_DANGEROUS",
        "threshold": "BLOCK_NONE",
    },
    {
        "category": "HARM_CATEGORY_HARASSMENT",
        "threshold": "BLOCK_NONE",
    },
    {
        "category": "HARM_CATEGORY_HATE_SPEECH",
        "threshold": "BLOCK_NONE",
    },
    {
        "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
        "threshold": "BLOCK_NONE",
    },
    {
        "category": "HARM_CATEGORY_DANGEROUS_CONTENT",
        "threshold": "BLOCK_NONE",
    },
]


def create_gemini_config(
    max_tokens: int,
    temperature: float = 1,
    batch_size: int = 1,
):
    config = GenerationConfig(
        candidate_count=batch_size,
        max_output_tokens=max_tokens,
        temperature=temperature,
    )
    return config


def handler(signum, frame):
    # swallow signum and frame
    raise Exception("end of time")


def request_gemini_engine(model, message, config):
    ret = None
    count = 0
    while ret is None:
        try:
            signal.signal(signal.SIGALRM, handler)
            signal.alarm(100)
            ret = model.generate_content(message, generation_config=config, safety_settings=safety_settings)
            s = ret.text  # check if response can be accessed.
            signal.alarm(0)
        except Exception as e:
            # NOTE this exception handling is needed since sometimes gemini will
            # refuse to answer due to safety reason (even if all blockers are set off)
            # instead we just simply catch this and then retry the response.
            # don't be alarmed if certain inputs take a long time to finish
            # eventually it should return a response.
            ret = None  # reset
            signal.alarm(0)
            time.sleep(20)
    return ret

In [None]:
def predict_vuln_gemini(code: str, gemini) -> bool:
    # TODO: temperature = 1 binary prediction of Gemini
    # The return value should either be True (code contains a vulnerability)
    # or False (code does not contain a vulnerability)
    config = create_gemini_config(
      max_tokens=100,
      temperature=1,
      batch_size=1,
    )

    formatted_input = f"{code}" # TODO add your instructions to the model

    ret = request_gemini_engine(gemini, formatted_input, config)
    gemini_response = ret.text

    # TODO parse output to obtain prediction of either True or False
    return prediction


def gemini_vuln_detection(gemini, vuln_dataset) -> float:

  # TODO add neccessary local variables

  for data in tqdm(vuln_dataset):
      code = data['src']
      gt_prediction = data['vulnerable']
      vulnerability_type = data['vulnerability_type']

      prediction = predict_vuln_gemini(code, gemini)

  # TODO: compute F1 score

# F1 score of gemini
gemini_f1_score = gemini_vuln_detection(gemini, vuln_dataset)

In [None]:
def predict_vuln_gemini_multi(code: str, gemini) -> str:
    # TODO: temperature = 1 multi-class prediction of Gemini
    # The return value shoud be a str which corresponding to one of the
    # vulnerability types
    config = create_gemini_config(
      max_tokens=100,
      temperature=1,
      batch_size=1,
    )

    formatted_input = f"{code}" # TODO add your instructions to the model

    ret = request_gemini_engine(gemini, formatted_input, config)
    gemini_response = ret.text

    # TODO parse output to obtain prediction as a string
    return prediction


def gemini_vuln_detection_multi(gemini, vuln_dataset) -> list:

  # TODO add neccessary local variables

  for data in tqdm(vuln_dataset):
      code = data['src']
      gt_prediction = data['vulnerable']
      vulnerability_type = data['vulnerability_type']
      print(vulnerability_type)

      # pick from these types ['Integer-Overflow', 'Buffer-Overflow', 'Null-Pointer-Dereference', 'Resource-Exhaustion-Error', 'No-Vulnerability']

      prediction = predict_vuln_gemini(code, gemini)

  # TODO: compute accuracy of classifying each of these vulnerabilities

gemini_multi_class_vulnerability = gemini_vuln_detection_multi(gemini, vuln_dataset)