In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import RobertaTokenizer, RobertaModel, TrainingArguments, Trainer
import torch.nn as nn
from sklearn.metrics import f1_score, precision_score, recall_score
import numpy as np

In [None]:
class SmellsDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=512, stride=128):
        """
        texts: List of raw source code strings.
        labels: List of labels (e.g., multi-label binary vectors).
        tokenizer: The CodeBERT tokenizer.
        max_length: Maximum tokens per window.
        stride: Overlap between windows.
        """
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.stride = stride

    def sliding_window_tokenize(self, text):
        # First, get the full encoding without truncation.
        encoding = self.tokenizer(
            text, add_special_tokens=False, truncation=False)
        input_ids = encoding['input_ids']

        windows = []
        # Use a sliding window: note that we subtract one token for the CLS token later.
        # Here, we assume we add special tokens manually.
        effective_window = self.max_length - 2  # for CLS and SEP
        for i in range(0, len(input_ids), effective_window - self.stride):
            window = input_ids[i: i + effective_window]
            # Add special tokens: CLS at beginning, SEP at end.
            window = [self.tokenizer.cls_token_id] + \
                window + [self.tokenizer.sep_token_id]
            # Pad if needed to ensure consistent length (max_length)
            if len(window) < self.max_length:
                window = window + [self.tokenizer.pad_token_id] * \
                    (self.max_length - len(window))
            windows.append(window)
            if i + effective_window >= len(input_ids):
                break
        return windows

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        windows = self.sliding_window_tokenize(text)
        
        item = {
            "encodings": windows,
            "labels": label
        }

        print("K" * 100)
        print(item)
        print("K" * 100)
        return item

In [None]:
class SmellCodeBERTClassifier(nn.Module):
    def __init__(self, model_name, num_labels):
        super(SmellCodeBERTClassifier, self).__init__()
        self.roberta = RobertaModel.from_pretrained(model_name)
        self.classifier = nn.Linear(
            self.roberta.config.hidden_size, num_labels)

    def forward(self, input_ids, attention_mask, labels=None):
        """
        input_ids: Tensor of shape (num_windows, max_length)
        attention_mask: Tensor of shape (num_windows, max_length)
        """
        outputs = self.roberta(input_ids=input_ids,
                               attention_mask=attention_mask)
        # Get the CLS token embedding from each window (first token)
        # (num_windows, hidden_size)
        cls_embeddings = outputs.last_hidden_state[:, 0, :]
        # Aggregate using mean pooling over the window dimension:
        pooled, _ = torch.max(cls_embeddings, dim=0,
                              keepdim=True)  # (1, hidden_size)
        logits = self.classifier(pooled)  # (1, num_labels)
        output = {"logits": logits}
        if labels is not None:
            loss_fct = nn.BCEWithLogitsLoss()
            loss = loss_fct(logits, labels.unsqueeze(0))
            output["loss"] = loss
        return output

In [None]:
class SmellTrainer(Trainer):
    def get_train_dataloader(self):
        # Create a DataLoader for the training dataset with our custom collate function.
        return DataLoader(
            self.train_dataset,
            batch_size=self.args.per_device_train_batch_size,
            shuffle=True,
            # Use our custom collate function here.
            collate_fn=custom_collate_fn
        )

    def get_eval_dataloader(self, eval_dataset=None):
        # Create a DataLoader for the evaluation dataset with our custom collate function.
        eval_dataset = eval_dataset if eval_dataset is not None else self.eval_dataset
        return DataLoader(
            eval_dataset,
            batch_size=self.args.per_device_eval_batch_size,
            shuffle=False,
            collate_fn=custom_collate_fn
        )
    
    def custom_collate_fn(batch):
        # Print keys for debugging
        print("Keys in batch sample:", batch[0].keys())
        sample = batch[0]
        windows = sample.get("encodings", None)
        if windows is None:
            raise ValueError(
                "The expected key 'encodings' is missing from the sample.")
        # Shape: (num_windows, max_length)
        input_ids = torch.tensor(windows, dtype=torch.long)
        attention_mask = (input_ids != tokenizer.pad_token_id).long()
        labels = torch.tensor(sample["labels"], dtype=torch.float)
        item = {
            "input_ids": input_ids,
            "attention_mask": attention_mask, 
            "labels": labels
        }

        return item


In [None]:

tokenizer = RobertaTokenizer.from_pretrained("microsoft/codebert-base")


def custom_collate_fn(batch):
    # Print keys for debugging
    print("Keys in batch sample:", batch[0].keys())
    sample = batch[0]
    windows = sample.get("encodings", None)
    if windows is None:
        raise ValueError(
            "The expected key 'encodings' is missing from the sample.")
    # Shape: (num_windows, max_length)
    input_ids = torch.tensor(windows, dtype=torch.long)
    attention_mask = (input_ids != tokenizer.pad_token_id).long()
    labels = torch.tensor(sample["labels"], dtype=torch.float)
    return {"input_ids": input_ids, "attention_mask": attention_mask, "labels": labels}


# # Create dataset
dataset = SmellsDataset(texts, lables, tokenizer,
                           max_length=512, stride=128)

# # Create DataLoader that yields one instance (with its windows) at a time
dataloader = DataLoader(dataset, batch_size=1, collate_fn=custom_collate_fn)

# # Initialize custom model
num_labels = 2  # Adjust based on your task
model = SmellCodeBERTClassifier("microsoft/codebert-base", num_labels)

# # Define Training Arguments
training_args = TrainingArguments(
    output_dir="./results",
    num_train_epochs=1,
    # because each "sample" (instance) is processed individually
    per_device_train_batch_size=1,
    logging_dir="./logs",
    logging_steps=10,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True
)

trainer = SmellTrainer(
    model=model,
    args=training_args,
    train_dataset=dataset,
    eval_dataset=dataset,
    data_collator=custom_collate_fn,  # Our custom collate function
    # Optionally, add compute_metrics if you want evaluation metrics.
)

trainer.train()


def compute_metrics(eval_pred):
    logits, labels = eval_pred
    # Convert logits to tensor and compute probabilities
    logits_tensor = torch.tensor(logits)
    probs = torch.sigmoid(logits_tensor)

    # If the number of predictions doesn't match the number of labels,
    # assume that predictions are per-window and need aggregation.
    if probs.shape[0] != np.array(labels).shape[0]:
        # Aggregate by taking the mean across the window dimension.
        # This results in a single prediction vector per instance.
        aggregated_probs = torch.mean(probs, dim=0, keepdim=True)
        predictions = (aggregated_probs > 0.5).numpy()[0]
    else:
        predictions = (probs > 0.5).numpy()[0]

    # Ensure labels are numpy arrays
    # labels = np.array(labels)

    # print(labels)
    # print(probs)
    # print(predictions)

    return {
        "f1": f1_score(labels, predictions, average="micro"),
        "precision": precision_score(labels, predictions, average="micro"),
        "recall": recall_score(labels, predictions, average="micro"),
    }


# Assign the function to your trainer.
trainer.compute_metrics = compute_metrics

# Then evaluate:
trainer.evaluate()