<a href="https://colab.research.google.com/github/ingridguza3103/Automating-SE/blob/main/Automation_AI_(3)_(1).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ============================ 1. Setup ======================================
!pip -q install \
    "transformers==4.40.2" \
    "peft==0.5.0" \
    "datasets==2.18.0" \
    "evaluate==0.4.1" \
    "accelerate" \
    "scikit-learn" -U

In [None]:
from pathlib import Path
import urllib.request, zipfile, json, os, random, torch
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_fscore_support
from sklearn.metrics import accuracy_score

from datasets import Dataset , load_metric
import evaluate
from transformers import (AutoTokenizer,
                          AutoModelForSequenceClassification,
                          TrainingArguments, Trainer,
                          DataCollatorWithPadding)

from tqdm.auto import tqdm

# reproducibility
seed = 42
random.seed(seed); np.random.seed(seed); torch.manual_seed(seed)

In [None]:
# ============================ 2. Download Big-Vul ============================
zip_url = "https://raw.githubusercontent.com/Meerschwein/Automating-SE/refs/heads/main/Big-Vul-dataset.zip"
zip_path = Path("Big-Vul-dataset.zip")
data_dir = Path("Big-Vul-dataset")

if not zip_path.exists():
    print("Downloading Big-Vul …")
    urllib.request.urlretrieve(zip_url, zip_path)

if not data_dir.exists():
    print("Unpacking …")
    with zipfile.ZipFile(zip_path) as z: z.extractall("Big-Vul-dataset")

json_path = data_dir / "data.json"            # <- original file name
assert json_path.exists(), "data.json not found in the ZIP!"

In [None]:
# ============================ 3. Load & light-clean ==========================
print("Loading JSON …")

# --- 1)  Robustly read data.json  ------------------------------------------
# Big-Vul dumps appear in **two** formats:
#   • ordinary JSON array:     [ { … }, { … }, … ]
#   • ND-JSON / JSONL:         { … }\n{ … }\n…
# We’ll try ND-JSON first, then fall back automatically.

try:                                # ➊ first try ND-JSON
    df = pd.read_json(json_path, lines=True)
except ValueError:                  # ➋ fall back to ordinary JSON array
    df = pd.read_json(json_path)

# If both attempts fail, go fully manual (extremely rare)
if df.empty:
    raw = json_path.read_text(encoding="utf-8").strip()
    if raw.startswith('['):                              # array
        records = json.loads(raw)
    else:                                                # ND-JSON
        records = [json.loads(l) for l in raw.splitlines() if l.strip()]
    df = pd.DataFrame(records)

print("Columns in data.json:", list(df.columns))

# --- 2)  Harmonise column names --------------------------------------------
label_col_candidates = ["target", "targets", "label", "vul", "vulnerable"]
for cand in label_col_candidates:
    if cand in df.columns:
        df = df.rename(columns={cand: "label"})
        break
else:
    raise ValueError(
        "❌ Could not find a label column!\nAvailable columns: %s" % list(df.columns)
    )

if "code" not in df.columns:
    if "func" in df.columns:
        df = df.rename(columns={"func": "code"})
    else:
        raise ValueError(
            "❌ Could not find a code column!\nAvailable columns: %s" % list(df.columns)
        )

# --- 3)  Keep only what we need & light cleaning ---------------------------
df = df[["code", "label"]].dropna()            # remove NaNs just in case
df["label"] = df["label"].astype(int)          # ensure 0/1 ints

# drop duplicate functions and over-long blobs (optional but helpful)
df = df.drop_duplicates("code")
df = df[df.code.str.count(r"\n") < 300].reset_index(drop=True)

# --- 4)  Show class balance -------------------------------------------------
print("\nClass distribution (0 = benign, 1 = vulnerable):")
print(df.label.value_counts())


In [None]:
# ============================ 4. Train / Val / Test split ====================
train_df, test_df = train_test_split(df, test_size=0.15,
                                     stratify=df.label, random_state=seed)
train_df, val_df  = train_test_split(train_df, test_size=0.1,
                                     stratify=train_df.label, random_state=seed)

def to_hf(ds):
    return Dataset.from_pandas(ds, preserve_index=False)

train_ds, val_ds, test_ds = map(to_hf, (train_df, val_df, test_df))

In [None]:
# ============================ 5. Tokenisation ===============================
model_name = "microsoft/codebert-base"
tok = AutoTokenizer.from_pretrained(model_name)

def tokenize(batch):
    return tok(batch["code"],
               truncation=True,
               max_length=512,
               padding="max_length")

train_ds = train_ds.map(tokenize, batched=True, remove_columns=["code"])
val_ds   = val_ds.map(tokenize,   batched=True, remove_columns=["code"])
test_ds  = test_ds.map(tokenize,  batched=True, remove_columns=["code"])

data_collator = DataCollatorWithPadding(tok)

In [None]:

# ============================ 6. Model ======================================
# We keep all CodeBERT weights and only add the sequence-classification head
# (a single linear layer that outputs 2 logits → benign / vulnerable).

model = AutoModelForSequenceClassification.from_pretrained(
    "microsoft/codebert-base", num_labels=2
)

#for p in model.base_model.parameters():      # ".roberta" if using MiniLM etc.
#    p.requires_grad = False

# ---------------------------------------------------------------------------
# Class-imbalance weights forwarded to Trainer
# ---------------------------------------------------------------------------
neg, pos = train_df.label.value_counts().sort_index().values
class_weights = torch.tensor([1.0, neg / pos], dtype=torch.float)




In [None]:
# ============================ 7. Trainer setup ==============================
class WeightedTrainer(Trainer):
    """Trainer that uses class-balanced CrossEntropyLoss."""
    def __init__(self, class_weights=None, **kwargs):
        super().__init__(**kwargs)
        self.class_weights = class_weights.to(self.model.device)

    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.get("labels")
        outputs = model(**inputs)

        logits = outputs.get("logits")
        loss_fct = torch.nn.CrossEntropyLoss(weight=self.class_weights)
        loss = loss_fct(logits, labels)

        return (loss, outputs) if return_outputs else loss


# --- metrics ---------------------------------------------------------------
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=-1)
    prec, rec, f1, _ = precision_recall_fscore_support(
        labels, preds, average="binary", zero_division=0, pos_label=1
    )

    acc = accuracy_score(labels, preds)            # ❷  accuracy

    return {"accuracy": acc, "precision": prec, "recall": rec, "f1": f1}


args = TrainingArguments(
    output_dir="results",
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    fp16=True,
    learning_rate=2e-5,
    weight_decay=0.01,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    seed=seed,
    report_to="none"
)

trainer = WeightedTrainer(
    model=model,
    args=args,
    train_dataset=train_ds,
    eval_dataset=val_ds,
    tokenizer=tok,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    class_weights=class_weights          # <-- passed to our subclass
)

In [None]:
# ============================ 8. Training ====================================
trainer.train()

In [None]:
# ============================ 9. Final evaluation ============================
print("\n=== Test set metrics ===")
metrics = trainer.evaluate(test_ds)
for k,v in metrics.items():
    if k.startswith("eval_"): print(f"{k[5:]} : {v:.4f}")

In [None]:
# ============================ 10. Save model ================================
trainer.save_model("codebert-bigvul-func")
tok.save_pretrained("codebert-bigvul-func")

In [None]:
# ============================ 11. Inspect sample predictions ===============
import textwrap, torch, random

k = 5                                          # number of samples to display
sample_rows = test_df.sample(k, random_state=seed).reset_index(drop=True)

model.eval()                                   # just in case
for i, row in sample_rows.iterrows():
    code_snippet = row["code"]
    true_label   = int(row["label"])

    # tokenize + forward pass
    inputs = tok(code_snippet,
                 return_tensors="pt",
                 truncation=True,
                 max_length=512).to(model.device)
    with torch.no_grad():
        logits = model(**inputs).logits
    prob = torch.softmax(logits, dim=-1)[0, 1].item()
    pred = int(prob > 0.5)

    # pretty print
    print("="*80)
    print(f"SAMPLE {i+1}")
    print(f"Ground-truth:  {'VULNERABLE' if true_label else 'BENIGN'}")
    print(f"Model pred.:  {'VULNERABLE' if pred       else 'BENIGN'}  "
          f"(prob = {prob:.2f})")
    print("-"*80)
    # show first 40 lines (or the whole thing if it's shorter)
    lines = code_snippet.splitlines()
    if len(lines) > 40:
        lines = lines[:40] + ["    … (truncated) …"]
    print("\n".join(f"{j+1:>3}: {line}" for j, line in enumerate(lines)))
    print()  # blank line between samples