# Evaluation

In [1]:
from unsloth import FastLanguageModel
from src.paths import FINAL_VULNERABILITIES_DATA_PATH
from datasets import Dataset
import polars as pl
from transformers import TextStreamer
import re

🦥 Unsloth: Will patch your computer to enable 2x faster free finetuning.
🦥 Unsloth Zoo will now patch everything to make training faster!


In [17]:
MODEL_NAME = "Qwen/Qwen2.5-Coder-7B"

max_seq_length = 8192  # Choose any! We auto support ROPE Scaling internally!
dtype = (
    None  # None for auto detection. Float16 for Tesla T4, V100, Bfloat16 for Ampere+
)
load_in_4bit = True  # Use 4bit quantization to reduce memory usage. Can be False.

model, tokenizer = FastLanguageModel.from_pretrained(
    model_name=MODEL_NAME,
    max_seq_length=max_seq_length,
    dtype=dtype,
    load_in_4bit=load_in_4bit,
)

KeyboardInterrupt: 

In [None]:
FastLanguageModel.for_inference(model) # Enable native 2x faster inference
text_streamer = TextStreamer(tokenizer)

In [2]:
test_dataset = Dataset.from_parquet("data/test_dataset.parquet")
test_dataset.shape

(293, 6)

In [3]:
prompt = """### Instruction:
You are a cybersecurity expert specialized in vulnerability detection. Your task is to analyze the provided source code and determine whether it contains any security vulnerabilities. If vulnerabilities are found, classify them by CWE ID.

### Input:
The following source code is provided for analysis:

---
{}
---

### Instruction:
State whether any vulnerabilities are present. If vulnerabilities exist, list the corresponding CWE IDs.

#### Example Output 1 (No vulnerabilities found):
No security vulnerabilities detected.

#### Example Output 2 (Vulnerabilities found):
Security vulnerabilities detected: CWE-79, CWE-89.

Provide only the response without any additional explanation.

### Output:
"""

CWE_REGEX = r"CWE-\d{1,4}"


def formatting_prompts_func(row):
    codes = row["code"]
    files = row["file"]
    input_data = ""
    for code_file, code_unit in zip(files, codes):
        input_data = (
            f"File name: {code_file}\n```python\n{code_unit.replace('\n\n', '\n')}\n```\n"
        )
    return prompt.format(input_data)

def generate_predictions(model, tokenizer, dataset):
    evaluation_results = []

    for row in dataset:
        inputs = tokenizer(
            formatting_prompts_func(row),
            return_tensors = "pt",
        ).to("cuda")
        response = model.generate(**inputs, max_new_tokens = 128, use_cache = True, min_p = 0.1)
        result = tokenizer.batch_decode(response)[0]
        
        try:
            result = result[result.rfind("### Output:\n") + 12:result.index("<|endoftext|>") - 2]
            
            if "No security vulnerabilities detected" in result:
                is_vulnerability_found = False
                found_vulnerabilities = []
            elif "Security vulnerabilities detected:" in result:
                is_vulnerability_found = True
                found_vulnerabilities = list(re.findall(CWE_REGEX, result))
            else:
                is_vulnerability_found = None
                found_vulnerabilities = []
                
            evaluation_results.append(
                {
                    "is_vulnerability_exists": row["is_vulnerability_exists"][0], 
                    "cwe_id": list(set(cwe_id for _cwe in row["clustered_cwe_id"] for cwe_id in _cwe)), 
                    "response": result,
                    "is_vulnerability_found": is_vulnerability_found,
                    "found_vulnerabilities": found_vulnerabilities
                }
            )
        except Exception as exc:
            print(exc)
        
    return evaluation_results


In [None]:
evaluation_results = generate_predictions(model, tokenizer, test_dataset)

results_without_finetune = pl.DataFrame(evaluation_results).with_columns(pl.col("response").str.strip_chars())
results_without_finetune.head(6)

is_vulnerability_exists,cwe_id,response,is_vulnerability_found,found_vulnerabilities
bool,object,str,bool,list[null]
False,{'CWE-22'},"""No security vulnerabilities de…",False,[]
False,{'CWE-79'},"""No security vulnerabilities de…",False,[]
True,{'CWE-539'},"""No security vulnerabilities de…",False,[]
True,{'CWE-20'},"""No security vulnerabilities de…",False,[]
False,{'CWE-203'},"""No security vulnerabilities de…",False,[]
True,{'CWE-94'},"""No security vulnerabilities de…",False,[]


## Generate predictions after fine tuning

In [4]:
FINE_TUNED_MODEL_NAME = "pretrained_vulnerability_searcher"

max_seq_length = 8192  # Choose any! We auto support ROPE Scaling internally!
dtype = (
    None  # None for auto detection. Float16 for Tesla T4, V100, Bfloat16 for Ampere+
)
load_in_4bit = True  # Use 4bit quantization to reduce memory usage. Can be False.

model, tokenizer = FastLanguageModel.from_pretrained(
    model_name=FINE_TUNED_MODEL_NAME,
    max_seq_length=max_seq_length,
    dtype=dtype,
    load_in_4bit=load_in_4bit,
)

==((====))==  Unsloth 2025.1.8: Fast Qwen2 patching. Transformers: 4.48.2.
   \\   /|    GPU: NVIDIA GeForce RTX 3060. Max memory: 12.0 GB. Platform: Windows.
O^O/ \_/ \    Torch: 2.6.0+cu124. CUDA: 8.6. CUDA Toolkit: 12.4. Triton: 3.1.0
\        /    Bfloat16 = TRUE. FA [Xformers = 0.0.29.post3. FA2 = False]
 "-____-"     Free Apache license: http://github.com/unslothai/unsloth
Unsloth: Fast downloading is enabled - ignore downloading bars which are red colored!


  self.register_buffer("cos_cached", emb.cos().to(dtype=dtype, device=device, non_blocking=True), persistent=False)
Unsloth 2025.1.8 patched 28 layers with 28 QKV layers, 28 O layers and 28 MLP layers.


In [5]:
FastLanguageModel.for_inference(model) # Enable native 2x faster inference

PeftModelForCausalLM(
  (base_model): LoraModel(
    (model): Qwen2ForCausalLM(
      (model): Qwen2Model(
        (embed_tokens): Embedding(152064, 3584, padding_idx=151665)
        (layers): ModuleList(
          (0-27): 28 x Qwen2DecoderLayer(
            (self_attn): Qwen2Attention(
              (q_proj): lora.Linear4bit(
                (base_layer): Linear4bit(in_features=3584, out_features=3584, bias=True)
                (lora_dropout): ModuleDict(
                  (default): Identity()
                )
                (lora_A): ModuleDict(
                  (default): Linear(in_features=3584, out_features=32, bias=False)
                )
                (lora_B): ModuleDict(
                  (default): Linear(in_features=32, out_features=3584, bias=False)
                )
                (lora_embedding_A): ParameterDict()
                (lora_embedding_B): ParameterDict()
                (lora_magnitude_vector): ModuleDict()
              )
              (k_proj): lora

In [None]:
evaluation_results = generate_predictions(model, tokenizer, test_dataset)

results_with_finetune = pl.DataFrame(evaluation_results).with_columns(pl.col("response").str.strip_chars())
results_with_finetune.describe()

statistic,is_vulnerability_exists,cwe_id,response,is_vulnerability_found,found_vulnerabilities
str,f64,f64,str,f64,f64
"""count""",293.0,293.0,"""293""",293.0,293.0
"""null_count""",0.0,0.0,"""0""",0.0,0.0
"""mean""",0.498294,,,0.477816,
"""std""",,,,,
"""min""",0.0,,"""No security vulnerabilities de…",0.0,
"""25%""",,,,,
"""50%""",,,,,
"""75%""",,,,,
"""max""",1.0,,"""Security vulnerabilities detec…",1.0,


In [7]:
def calculate_metrics(df: pl.DataFrame):
    TP = df.filter(
        (pl.col("is_vulnerability_exists") == True) &
        (pl.col("is_vulnerability_found") == True)
    ).shape[0]

    FP = df.filter(
        (pl.col("is_vulnerability_exists") == False) &
        (pl.col("is_vulnerability_found") == True)
    ).shape[0]

    TN = df.filter(
        (pl.col("is_vulnerability_exists") == False) &
        (pl.col("is_vulnerability_found") == False)
    ).shape[0]

    FN = df.filter(
        (pl.col("is_vulnerability_exists") == True) &
        (pl.col("is_vulnerability_found") == False)
    ).shape[0]

    # Compute metrics
    precision = TP / (TP + FP) if (TP + FP) > 0 else 0.0
    recall = TP / (TP + FN) if (TP + FN) > 0 else 0.0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0
    fpr = FP / (FP + TN) if (FP + TN) > 0 else 0.0

    return precision, recall, f1_score, fpr


In [8]:
precision, recall, f1_score, fpr = calculate_metrics(results_with_finetune)

print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall:.4f}")
print(f"F1 Score:  {f1_score:.4f}")
print(f"FPR:       {fpr:.4f}")

Precision: 0.5643
Recall:    0.5411
F1 Score:  0.5524
FPR:       0.4150


In [9]:
metrics_per_cwe = []
results_with_finetune_exploded = results_with_finetune.explode("cwe_id")

for cwe_val in results_with_finetune.select("cwe_id").explode("cwe_id").unique("cwe_id").to_series().to_list():
    # True Positive: actual is cwe_val, predicted includes cwe_val
    TP = results_with_finetune_exploded.filter(
        (pl.col("cwe_id") == cwe_val)
        & (pl.col("found_vulnerabilities").list.contains(cwe_val))
    ).shape[0]

    # False Negative: actual is cwe_val, predicted does not include cwe_val
    FN = results_with_finetune_exploded.filter(
        (pl.col("cwe_id") == cwe_val)
        & (~pl.col("found_vulnerabilities").list.contains(cwe_val))
    ).shape[0]

    # False Positive: actual is not cwe_val, but predicted includes cwe_val
    FP = results_with_finetune_exploded.filter(
        (pl.col("cwe_id") != cwe_val)
        & (pl.col("found_vulnerabilities").list.contains(cwe_val))
    ).shape[0]

    # True Negative: actual is not cwe_val, and predicted does not include cwe_val
    TN = results_with_finetune_exploded.filter(
        (pl.col("cwe_id") != cwe_val)
        & (~pl.col("found_vulnerabilities").list.contains(cwe_val))
    ).shape[0]

    # Compute metrics safely (avoid zero-division)
    precision_val = TP / (TP + FP) if (TP + FP) > 0 else 0.0
    recall_val = TP / (TP + FN) if (TP + FN) > 0 else 0.0
    f1_val = (
        2 * precision_val * recall_val / (precision_val + recall_val)
        if (precision_val + recall_val) > 0
        else 0.0
    )
    fpr_val = FP / (FP + TN) if (FP + TN) > 0 else 0.0

    metrics_per_cwe.append({
        "cwe_id": cwe_val,
        "TP": TP,
        "FP": FP,
        "FN": FN,
        "TN": TN,
        "precision": precision_val,
        "recall": recall_val,
        "f1_score": f1_val,
        "false_positive_rate": fpr_val
    })

results_df = pl.DataFrame(metrics_per_cwe)
results_df


cwe_id,TP,FP,FN,TN,precision,recall,f1_score,false_positive_rate
str,i64,i64,i64,i64,f64,f64,f64,f64
"""CWE-664""",8,6,40,265,0.571429,0.166667,0.258065,0.02214
"""CWE-79""",19,46,21,233,0.292308,0.475,0.361905,0.164875
"""CWE-200""",4,7,30,278,0.363636,0.117647,0.177778,0.024561
"""CWE-610""",12,21,30,256,0.363636,0.285714,0.32,0.075812
"""CWE-284""",3,3,35,278,0.5,0.078947,0.136364,0.010676
"""CWE-22""",12,4,32,271,0.75,0.272727,0.4,0.014545
"""CWE-707""",4,3,69,243,0.571429,0.054795,0.1,0.012195
