In [None]:
import os
import logging
import torch
import torch.nn as nn
import pandas as pd
import matplotlib.pyplot as plt

from datasets import load_dataset
from sklearn.metrics import precision_recall_fscore_support, accuracy_score
from transformers import (
    RobertaTokenizerFast,
    RobertaModel,
    Trainer,
    TrainingArguments,
    DataCollatorWithPadding,
    PreTrainedModel,
    RobertaConfig,
    EarlyStoppingCallback
)
from transformers.modeling_outputs import SequenceClassifierOutput
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("bug_localiser")


os.environ["TOKENIZERS_PARALLELISM"] = "false"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


Using device: cuda


In [9]:
data_path = "../../Data/SSTUBS_ENHANCED_23MAR"
dataset = load_dataset("json", data_files={
    "train": f"{data_path}/train.json",
    "validation": f"{data_path}/val.json",
    "test": f"{data_path}/test.json"
})


In [10]:
model_name = "microsoft/codebert-base"
tokenizer = RobertaTokenizerFast.from_pretrained(model_name)

special_tokens = {
    "additional_special_tokens": ["[CONTEXT]", "[SNIPPET]", "[COMMIT]", "[PARENT]"]
}
tokenizer.add_special_tokens(special_tokens)

def preprocess(dataset):
    return tokenizer(dataset["text"], truncation=True, padding=False)

tokenised_dataset = dataset.map(preprocess, batched=True)


In [None]:
class CustomRobertaClassifier(PreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.roberta = RobertaModel.from_pretrained(model_name)
        self.roberta.resize_token_embeddings(len(tokenizer))
    
        for name, param in self.roberta.named_parameters():
            if "embeddings" in name:
                param.requires_grad = True
            else:
                param.requires_grad = False # freeze all bar embeddings

        self.classifier = nn.Sequential(
            nn.Linear(config.hidden_size, 512),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(512, config.num_labels)
        )

    def forward(self, input_ids=None, attention_mask=None, labels=None):
        outputs = self.roberta(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output
        logits = self.classifier(pooled_output)

        loss = None
        if labels is not None:
            loss_fn = nn.CrossEntropyLoss()
            loss = loss_fn(logits, labels)

        return SequenceClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )
model = CustomRobertaClassifier(RobertaConfig.from_pretrained(model_name, num_labels=2)).to(device)

In [12]:
class MultiEvalTrainer(Trainer):
    def __init__(self, *args, eval_datasets=None, **kwargs):
        if eval_datasets and len(eval_datasets) > 0:
            kwargs["eval_dataset"] = eval_datasets[0]
        super().__init__(*args, **kwargs)
        self.eval_datasets = eval_datasets or []

    def _compute_metrics_with_prefix(self, pred, prefix):
        labels = pred.label_ids
        preds = pred.predictions.argmax(-1)
        precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='binary')
        acc = accuracy_score(labels, preds)
        return {
            f"{prefix}_accuracy": acc,
            f"{prefix}_precision": precision,
            f"{prefix}_recall": recall,
            f"{prefix}_f1": f1,
        }

    def evaluate(self, eval_dataset=None, ignore_keys=None, metric_key_prefix="eval"):
        if eval_dataset is None and self.eval_datasets:
            results = {}
            for i, eval_ds in enumerate(self.eval_datasets):
                prefix = ["eval_train_val", "eval_val"][i]
                logger.info(f" Evaluating dataset {i + 1} with prefix '{prefix}'...")
                eval_output = self.predict(eval_ds, ignore_keys=ignore_keys)
                metrics = self._compute_metrics_with_prefix(eval_output, prefix)
                metrics[f"{prefix}_loss"] = eval_output.metrics["test_loss"]
                self.log(metrics)
                results.update(metrics)
            return results
        return super().evaluate(eval_dataset, ignore_keys, metric_key_prefix)


In [None]:
output_dir = "../../FINAL_CODEBERT_FINETUNED"
log_dir = f"{output_dir}/logs"

tokenised_dataset["train_val"] = tokenised_dataset["train"].shuffle(seed=42).select(range(min(500, len(tokenised_dataset["train"]))))
tokenised_dataset["validation"] = tokenised_dataset["validation"].shuffle(seed=42).select(range(min(500, len(tokenised_dataset["validation"]))))

In [14]:
training_args = TrainingArguments(
    output_dir=output_dir,
    logging_dir=log_dir,
    report_to="none",
    learning_rate=2e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=16,
    num_train_epochs=20,
    max_steps=100000,
    weight_decay=0.01,
    evaluation_strategy="steps",
    eval_steps=1000,
    logging_strategy="steps",
    logging_steps=1000,
    save_strategy="steps",
    save_steps=1000,
    save_total_limit=2,
    load_best_model_at_end=True,
    metric_for_best_model="eval_val_f1",
    greater_is_better=True,
    disable_tqdm=False,
    push_to_hub=False,
    seed=42,
)

In [None]:
trainer = MultiEvalTrainer(
    model=model,
    args=training_args,
    train_dataset=tokenised_dataset["train"],
    eval_datasets=[tokenised_dataset["train_val"], tokenised_dataset["validation"]],
    tokenizer=tokenizer,
    data_collator=DataCollatorWithPadding(tokenizer),
    callbacks=[EarlyStoppingCallback(early_stopping_patience=5)]  #  stop if no F1 improvement after 5 evals
)


In [None]:
try:
    logger.info("Training started with early stopping...")
    trainer.train()
finally:
    log_path = f"{output_dir}/final_log.csv"
    pd.DataFrame(trainer.state.log_history).to_csv(log_path, index=False)
    logger.info(f" Logs saved to {log_path}")

2025-03-26 19:18:00,780 - INFO - Training started with early stopping...
You're using a RobertaTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.


Step,Training Loss,Validation Loss
