<a href="https://colab.research.google.com/github/jorge-martinez-gil/colab-notebooks/blob/main/GraphCodeBERT%2BFeatures-out.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# -*- coding: utf-8 -*-
"""
GraphCodeBERT vs GraphCodeBERT+aux hybrid classifier, with visualization.

Runtime/Resource benchmarking for:
- Baseline: GraphCodeBERT classifier
- Ours: GraphCodeBERT + additional scalar feature

No wandb. Works on old/new transformers.
Dataset JSON fields: code1, code2, score (0/1), output (float)

Author: Jorge Martinez-Gil
"""

# ⚠️ Requirement: Enable GPU
# To run this notebook efficiently, you must enable GPU acceleration:
# 1. Go to **Runtime** > **Change runtime type**.
# 2. Select **T4 GPU** under Hardware accelerator.
# 3. Click **Save**.

import json
import logging
import os
import gc
from typing import Dict, List, Optional, Union
from urllib.parse import urlparse

import numpy as np
import requests
import torch
import torch.nn as nn
from sklearn.metrics import precision_recall_fscore_support, accuracy_score
from torch.utils.data import Dataset
from transformers import (
    AutoModel,
    AutoModelForSequenceClassification,   # *** NEW ***
    AutoTokenizer,
    EvalPrediction,
    PreTrainedTokenizer,
    Trainer,
    TrainingArguments,
    set_seed,
)
from transformers.modeling_outputs import SequenceClassifierOutput

import matplotlib.pyplot as plt   # *** NEW ***

# ---- disable wandb globally ----
os.environ["WANDB_DISABLED"] = "true"

# Logging
logging.basicConfig(
    format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
    datefmt="%m/%d/%Y %H:%M:%S",
    level=logging.INFO,
)
logger = logging.getLogger(__name__)


# ------------------------------------------------------
# 1. Models
# ------------------------------------------------------

class GraphCodeBERTWithOutput(nn.Module):
    def __init__(self, model_name: str, num_labels: int = 2, output_feature_dim: int = 1):
        super().__init__()
        self.num_labels = num_labels

        self.roberta = AutoModel.from_pretrained(model_name)
        config = self.roberta.config

        self.aux_feature_projection = nn.Linear(output_feature_dim, config.hidden_size)
        self.classifier = nn.Linear(config.hidden_size * 2, num_labels)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)

    def forward(
        self,
        input_ids: torch.Tensor,
        attention_mask: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_feature: Optional[torch.Tensor] = None,
    ) -> Union[SequenceClassifierOutput, Dict[str, torch.Tensor]]:

        outputs = self.roberta(input_ids, attention_mask=attention_mask)
        pooled_output = outputs.last_hidden_state[:, 0, :]  # (batch, hidden)

        if output_feature is None:
            raise ValueError("output_feature cannot be None")

        aux_processed = self.aux_feature_projection(output_feature.unsqueeze(-1))

        combined = torch.cat((pooled_output, aux_processed), dim=1)
        combined = self.dropout(combined)

        logits = self.classifier(combined)

        loss = None
        if labels is not None:
            loss_fn = nn.CrossEntropyLoss()
            loss = loss_fn(logits.view(-1, self.num_labels), labels.view(-1))

        return SequenceClassifierOutput(loss=loss, logits=logits)


# *** NEW ***
# Baseline GraphCodeBERT model (no auxiliary feature)
# Uses the standard AutoModelForSequenceClassification.
# Extra keys like "output_feature" from the dataset will be ignored.
def build_baseline_model(model_name: str, num_labels: int = 2):
    model = AutoModelForSequenceClassification.from_pretrained(
        model_name,
        num_labels=num_labels,
    )
    return model


# ------------------------------------------------------
# 2. Dataset
# ------------------------------------------------------

class HybridCodeDataset(Dataset):
    def __init__(self, data_path: str, tokenizer: PreTrainedTokenizer, max_length: int = 512):
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.data = self._load_data(data_path)

    def _load_data(self, path: str) -> List[Dict]:
        parsed = urlparse(path)

        if parsed.scheme in ("http", "https"):
            logger.info(f"Downloading dataset from {path}...")
            resp = requests.get(path)
            resp.raise_for_status()
            data = resp.json()
            logger.info(f"Loaded {len(data)} items from remote URL.")
            return data

        if not os.path.exists(path):
            raise FileNotFoundError(path)

        with open(path, "r", encoding="utf-8") as f:
            data = json.load(f)

        logger.info(f"Loaded {len(data)} items from file.")
        return data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:
        item = self.data[idx]

        toks = self.tokenizer(
            text=item["code1"],
            text_pair=item["code2"],
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt",
        )

        encoding = {k: v.squeeze(0) for k, v in toks.items()}
        encoding["labels"] = torch.tensor(item["score"], dtype=torch.long)
        encoding["output_feature"] = torch.tensor(item["output"], dtype=torch.float)

        return encoding


# ------------------------------------------------------
# 3. Metrics
# ------------------------------------------------------

def compute_metrics(eval_pred: Union[EvalPrediction, tuple]) -> Dict[str, float]:
    if isinstance(eval_pred, tuple):
        logits, labels = eval_pred
    else:
        logits = eval_pred.predictions
        labels = eval_pred.label_ids

    preds = np.argmax(logits, axis=-1)

    precision, recall, f1, _ = precision_recall_fscore_support(
        labels, preds, average="binary"
    )
    acc = accuracy_score(labels, preds)

    return dict(accuracy=acc, f1=f1, precision=precision, recall=recall)


def clean_memory():
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()


# ------------------------------------------------------
# 4. Training helpers
# ------------------------------------------------------

def train_and_evaluate(
    model: nn.Module,
    train_ds: Dataset,
    val_ds: Dataset,
    test_ds: Dataset,
    output_dir: str,
    seed: int,
    epochs: int,
    batch_size: int,
) -> Dict[str, float]:

    # Only old-safe arguments
    args = TrainingArguments(
        output_dir=output_dir,
        num_train_epochs=epochs,
        per_device_train_batch_size=batch_size,
        per_device_eval_batch_size=batch_size,
        logging_steps=100,
        seed=seed,
        # IMPORTANT: do NOT use evaluation_strategy or save_strategy
    )

    trainer = Trainer(
        model=model,
        args=args,
        train_dataset=train_ds,
        eval_dataset=val_ds,
        compute_metrics=compute_metrics,
    )

    trainer.train()

    # Manual evaluation (old versions require this)
    results = trainer.evaluate(test_ds)

    return results


# *** NEW ***
def visualize_results(baseline_results: Dict[str, float],
                      hybrid_results: Dict[str, float],
                      out_dir: str) -> None:
    """Create a bar plot comparing baseline GraphCodeBERT vs hybrid."""

    metrics = ["accuracy", "precision", "recall", "f1"]
    baseline_vals = [baseline_results.get(f"eval_{m}", np.nan) for m in metrics]
    hybrid_vals = [hybrid_results.get(f"eval_{m}", np.nan) for m in metrics]

    x = np.arange(len(metrics))
    width = 0.35

    os.makedirs(out_dir, exist_ok=True)

    plt.figure(figsize=(6, 4))
    plt.bar(x - width / 2, baseline_vals, width, label="GraphCodeBERT")
    plt.bar(x + width / 2, hybrid_vals, width, label="GraphCodeBERT + aux")
    plt.xticks(x, [m.capitalize() for m in metrics])
    plt.ylim(0.0, 1.0)
    plt.ylabel("Score")
    plt.title("Test metrics: GraphCodeBERT vs GraphCodeBERT+aux")
    plt.legend()
    plt.tight_layout()

    out_path = os.path.join(out_dir, "graphcodebert_vs_hybrid.png")
    plt.savefig(out_path)
    plt.close()

    logger.info(f"Saved comparison plot to {out_path}")


# ------------------------------------------------------
# 5. Main entrypoint
# ------------------------------------------------------

def main(data_path, output_dir="./results", seed=42, epochs=3, batch_size=8):

    set_seed(seed)
    clean_memory()

    model_name = "microsoft/graphcodebert-base"
    tokenizer = AutoTokenizer.from_pretrained(model_name)

    full_dataset = HybridCodeDataset(data_path, tokenizer)

    total = len(full_dataset)
    train_size = int(0.6 * total)
    remaining = total - train_size
    val_size = remaining // 2
    test_size = remaining - val_size

    train_ds, tmp = torch.utils.data.random_split(full_dataset, [train_size, remaining])
    val_ds, test_ds = torch.utils.data.random_split(tmp, [val_size, test_size])

    # ----- Baseline GraphCodeBERT (no aux feature) -----
    logger.info("Training baseline GraphCodeBERT (no auxiliary feature)...")
    baseline_model = build_baseline_model(model_name=model_name, num_labels=2)

    baseline_results = train_and_evaluate(
        model=baseline_model,
        train_ds=train_ds,
        val_ds=val_ds,
        test_ds=test_ds,
        output_dir=os.path.join(output_dir, "baseline"),
        seed=seed,
        epochs=epochs,
        batch_size=batch_size,
    )

    logger.info(f"Baseline results on test set: {baseline_results}")

    clean_memory()

    # ----- Hybrid GraphCodeBERT + aux feature -----
    logger.info("Training GraphCodeBERT + auxiliary feature...")
    hybrid_model = GraphCodeBERTWithOutput(model_name=model_name)

    hybrid_results = train_and_evaluate(
        model=hybrid_model,
        train_ds=train_ds,
        val_ds=val_ds,
        test_ds=test_ds,
        output_dir=os.path.join(output_dir, "hybrid"),
        seed=seed,
        epochs=epochs,
        batch_size=batch_size,
    )

    logger.info(f"Hybrid results on test set: {hybrid_results}")

    # ----- Print final comparison -----
    print("\n==============================")
    print("FINAL TEST RESULTS - BASELINE (GraphCodeBERT)")
    print("==============================")
    print("Precision :", round(baseline_results.get("eval_precision", float("nan")), 4))
    print("Recall    :", round(baseline_results.get("eval_recall", float("nan")), 4))
    print("F1 Score  :", round(baseline_results.get("eval_f1", float("nan")), 4))
    print("Accuracy  :", round(baseline_results.get("eval_accuracy", float("nan")), 4))

    print("\n==============================")
    print("FINAL TEST RESULTS - HYBRID (GraphCodeBERT + aux)")
    print("==============================")
    print("Precision :", round(hybrid_results.get("eval_precision", float("nan")), 4))
    print("Recall    :", round(hybrid_results.get("eval_recall", float("nan")), 4))
    print("F1 Score  :", round(hybrid_results.get("eval_f1", float("nan")), 4))
    print("Accuracy  :", round(hybrid_results.get("eval_accuracy", float("nan")), 4))
    print("==============================\n")

    # ----- Visualization -----
    visualize_results(
        baseline_results=baseline_results,
        hybrid_results=hybrid_results,
        out_dir=output_dir,
    )


if __name__ == "__main__":
    main("https://www.jorgemar.com/data/data2.json")


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/539 [00:00<?, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/772 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/499M [00:00<?, ?B/s]

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at microsoft/graphcodebert-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Using the `WANDB_DISABLED` environment variable is deprecated and will be removed in v5. Use the --report_to flag to control the integrations used for logging result (for instance --report_to none).


model.safetensors:   0%|          | 0.00/499M [00:00<?, ?B/s]

Step,Training Loss
100,0.2822


Some weights of RobertaModel were not initialized from the model checkpoint at microsoft/graphcodebert-base and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Using the `WANDB_DISABLED` environment variable is deprecated and will be removed in v5. Use the --report_to flag to control the integrations used for logging result (for instance --report_to none).


Step,Training Loss
100,0.2053



FINAL TEST RESULTS - BASELINE (GraphCodeBERT)
Precision : 0.9714
Recall    : 1.0
F1 Score  : 0.9855
Accuracy  : 0.9783

FINAL TEST RESULTS - HYBRID (GraphCodeBERT + aux)
Precision : 0.9855
Recall    : 1.0
F1 Score  : 0.9927
Accuracy  : 0.9891

