<a href="https://colab.research.google.com/github/saivenkateshparuchuri/SESD_Project/blob/main/SESD_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [14]:
from google.colab import files
uploaded = files.upload()


Saving data.zip to data (1).zip


In [16]:
import os
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from torch.optim import AdamW
from transformers import AutoTokenizer, AutoModel
import torch.nn as nn
from sklearn.metrics import precision_recall_fscore_support, roc_auc_score
from tqdm import tqdm

In [17]:

# ------------------------------
# Config
# ------------------------------
MODEL_NAME = "microsoft/codebert-base"
MAX_LEN = 256
BATCH_SIZE = 4
EPOCHS = 3
LR = 2e-5
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Acceptable alternate column names (helps robustness)
POSSIBLE_CODE_COLS = ['code', 'source', 'snippet', 'function_code', 'code_snippet']
POSSIBLE_LABEL_COLS = ['label', 'y', 'is_vuln']

In [18]:
# ------------------------------
# Data loader utilities
# ------------------------------

def load_df(path: str) -> pd.DataFrame:
    """Robust CSV loader: tries common encodings and separators and prints detected columns.

    Returns a pandas DataFrame or raises a clear RuntimeError on failure.
    """
    seps = [',', ';', '\t']
    encodings = ['utf-8', 'utf-8-sig']
    last_err = None
    for enc in encodings:
        for sep in seps:
            try:
                df = pd.read_csv(path, encoding=enc, sep=sep, engine='python')
                # quick sanity: must have >= 2 columns
                if df.shape[1] >= 1:
                    print(f"[load_df] Loaded '{path}' with encoding={enc} sep='{sep}' -> columns: {list(df.columns)}")
                    return df
            except Exception as e:
                last_err = e
    # fallback
    try:
        df = pd.read_csv(path, engine='python')
        print(f"[load_df] Fallback read -> columns: {list(df.columns)}")
        return df
    except Exception as e:
        raise RuntimeError(f"Failed to read '{path}'. Last error: {last_err} | fallback error: {e}")

class CodeDataset(Dataset):
    def __init__(self, df: pd.DataFrame, tokenizer):
        # discover code & label column names (case-insensitive)
        cols = [c for c in df.columns]
        cols_l = [c.lower().strip() for c in cols]
        code_col = None
        label_col = None
        for cand in POSSIBLE_CODE_COLS:
            if cand in cols_l:
                code_col = cols[cols_l.index(cand)]
                break
        for cand in POSSIBLE_LABEL_COLS:
            if cand in cols_l:
                label_col = cols[cols_l.index(cand)]
                break
        if code_col is None:
            raise RuntimeError(
                "Could not find a code column in the CSV. Detected columns: {}. Expected one of: {}".format(cols, POSSIBLE_CODE_COLS)
            )
        if label_col is None:
            raise RuntimeError(
                "Could not find a label column in the CSV. Detected columns: {}. Expected one of: {}".format(cols, POSSIBLE_LABEL_COLS)
            )

        self.tokenizer = tokenizer
        self.codes = df[code_col].fillna('').astype(str).tolist()
        # ensure labels are ints (0/1)
        self.labels = df[label_col].fillna(0).astype(int).tolist()

    def __len__(self):
        return len(self.codes)

    def __getitem__(self, idx):
        code = self.codes[idx]
        toks = self.tokenizer(code, truncation=True, padding='max_length', max_length=MAX_LEN, return_tensors='pt')
        item = {k: v.squeeze(0) for k, v in toks.items()}
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.float)
        return item



In [19]:
# ------------------------------
# Model
# ------------------------------
class VulnClassifier(nn.Module):
    def __init__(self, model_name: str):
        super().__init__()
        self.encoder = AutoModel.from_pretrained(model_name)
        hidden = self.encoder.config.hidden_size
        self.classifier = nn.Sequential(
            nn.Linear(hidden, 256),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(256, 1),
        )

    def forward(self, input_ids, attention_mask):
        out = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
        pooled = out.last_hidden_state[:, 0, :]  # [CLS] pooling
        logits = self.classifier(pooled).squeeze(-1)
        return logits

In [20]:
# ------------------------------
# Training loop
# ------------------------------

def train():
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

    train_df = load_df('/content/drive/MyDrive/train.csv')
    val_df = load_df('/content/drive/MyDrive/val.csv')

    train_ds = CodeDataset(train_df, tokenizer)
    val_ds = CodeDataset(val_df, tokenizer)

    train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_ds, batch_size=BATCH_SIZE)

    model = VulnClassifier(MODEL_NAME).to(DEVICE)
    optimizer = AdamW(model.parameters(), lr=LR)
    criterion = nn.BCEWithLogitsLoss()

    for epoch in range(EPOCHS):
        model.train()
        pbar = tqdm(train_loader, desc=f"Train {epoch+1}")
        for batch in pbar:
            input_ids = batch['input_ids'].to(DEVICE)
            attention_mask = batch['attention_mask'].to(DEVICE)
            labels = batch['labels'].to(DEVICE)

            logits = model(input_ids=input_ids, attention_mask=attention_mask)
            loss = criterion(logits, labels)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            pbar.set_postfix({'loss': float(loss.item())})

        # evaluation
        model.eval()
        ys, preds, probs = [], [], []
        with torch.no_grad():
            for batch in val_loader:
                input_ids = batch['input_ids'].to(DEVICE)
                attention_mask = batch['attention_mask'].to(DEVICE)
                labels = batch['labels'].to(DEVICE)

                logits = model(input_ids=input_ids, attention_mask=attention_mask)
                probs_batch = torch.sigmoid(logits).cpu().numpy()
                preds_batch = (probs_batch >= 0.5).astype(int)

                ys.extend(labels.cpu().numpy().tolist())
                preds.extend(preds_batch.tolist())
                probs.extend(probs_batch.tolist())

        p, r, f, _ = precision_recall_fscore_support(ys, preds, average='binary', zero_division=0)
        try:
            auc = roc_auc_score(ys, probs)
        except Exception:
            auc = 0.0
        print(f"Epoch {epoch+1} Val Precision: {p:.3f} Recall: {r:.3f} F1: {f:.3f} AUC: {auc:.3f}")

    # save model
    os.makedirs('models', exist_ok=True)
    torch.save(model.state_dict(), 'models/codebert_vuln.pt')
    print('Model saved to models/codebert_vuln.pt')


if __name__ == '__main__':
    train()


[load_df] Loaded '/content/drive/MyDrive/train.csv' with encoding=utf-8 sep=',' -> columns: ['id', 'code', 'label']
[load_df] Loaded '/content/drive/MyDrive/val.csv' with encoding=utf-8 sep=',' -> columns: ['id', 'code', 'label']


Train 1: 100%|██████████| 2/2 [00:22<00:00, 11.42s/it, loss=0.718]


Epoch 1 Val Precision: 0.333 Recall: 1.000 F1: 0.500 AUC: 0.500


Train 2: 100%|██████████| 2/2 [00:23<00:00, 11.95s/it, loss=0.657]


Epoch 2 Val Precision: 0.333 Recall: 1.000 F1: 0.500 AUC: 0.500


Train 3: 100%|██████████| 2/2 [00:20<00:00, 10.23s/it, loss=0.598]


Epoch 3 Val Precision: 0.333 Recall: 1.000 F1: 0.500 AUC: 0.500
Model saved to models/codebert_vuln.pt
