In [None]:
# Install necessary packages
# This installs the Hugging Face transformers library with support for PyTorch.
# The '-U' flag ensures the package is upgraded to the latest version if it's already installed.
!pip install transformers[torch] -U

In [None]:
# Import necessary libraries
# PyTorch is used for building and training neural networks.
# The Hugging Face transformers library is used for working with transformer models like RoBERTa.
# We also import libraries for handling datasets, managing memory, and evaluating model performance.
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from transformers import RobertaTokenizer, RobertaForSequenceClassification, AutoTokenizer, AutoModel, Trainer, TrainingArguments, EarlyStoppingCallback
from transformers.modeling_outputs import SequenceClassifierOutput
import json
import random
from sklearn.metrics import precision_recall_fscore_support, accuracy_score

# Memory management
import gc  # Python's garbage collector
torch.cuda.empty_cache()  # Clear unused memory in the GPU
gc.collect()  # Collect and free unused memory in the RAM

In [None]:
# Define the custom model class
# This class defines a custom neural network that extends a pre-trained RoBERTa model
# with additional features and a classification head.

class RobertaForSequenceClassificationWithOutput(nn.Module):
    def __init__(self, num_labels=2, output_feature_dim=1):
        super().__init__()
        # Load the pre-trained RoBERTa model from the Hugging Face model hub.
        self.num_labels = num_labels
        self.roberta = AutoModel.from_pretrained('microsoft/graphcodebert-base')
        
        # Define a linear layer to process the additional input feature.
        self.output_feature_layer = nn.Linear(output_feature_dim, self.roberta.config.hidden_size)
        
        # Define a classification head that combines the RoBERTa output with the additional feature.
        self.classifier = nn.Linear(self.roberta.config.hidden_size + self.roberta.config.hidden_size, num_labels)
        
        # Define a dropout layer to prevent overfitting.
        self.dropout = nn.Dropout(self.roberta.config.hidden_dropout_prob)

    def forward(self, input_ids, attention_mask=None, labels=None, output_feature=None):
        # Pass the input through the RoBERTa model to get the pooled output.
        outputs = self.roberta(input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output

        # Process the additional feature and concatenate it with the pooled output.
        output_feature_processed = self.output_feature_layer(output_feature.unsqueeze(-1))
        combined_features = torch.cat((pooled_output, output_feature_processed), dim=1)
        combined_features = self.dropout(combined_features)

        # Pass the combined features through the classifier to get the logits.
        logits = self.classifier(combined_features)

        # Compute the loss if labels are provided.
        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))

        # Return the output as a SequenceClassifierOutput object.
        return SequenceClassifierOutput(loss=loss, logits=logits)

In [None]:
# Define a custom dataset class
# This class handles loading and tokenizing data for training and evaluation.

class CodePairDataset(Dataset):
    def __init__(self, file_path, tokenizer):
        # Load the data from a JSON file.
        with open(file_path, 'r') as file:
            self.data = json.load(file)
        self.tokenizer = tokenizer

    def __getitem__(self, idx):
        # Retrieve a single data item and tokenize it.
        item = self.data[idx]
        encoding = self.tokenizer(
            text=item["code1"], 
            text_pair=item["code2"], 
            truncation=True, 
            padding="max_length", 
            max_length=512, 
            return_tensors="pt"
        )
        encoding = {key: val.squeeze(0) for key, val in encoding.items()}  # Remove batch dimension
        
        # Add the labels and additional feature to the encoding.
        encoding['labels'] = torch.tensor(item["score"], dtype=torch.long)
        encoding['output_feature'] = torch.tensor(item["output"], dtype=torch.float)
        return encoding

    def __len__(self):
        # Return the total number of data items.
        return len(self.data)

In [None]:
# Define the evaluation metrics function
# This function calculates the precision, recall, F1 score, and accuracy of the model's predictions.

def compute_metrics(p):
    predictions, labels = p
    predictions = predictions.argmax(-1)  # Get the predicted class by taking the argmax of the logits
    precision, recall, f1, _ = precision_recall_fscore_support(labels, predictions, average='binary')
    return {
        'accuracy': accuracy_score(labels, predictions), 
        'f1': f1, 
        'precision': precision, 
        'recall': recall
    }

In [None]:
# Main function to execute the training and evaluation process
# This function loads the data, prepares the model, and manages the training and evaluation workflow.

def main():
    # Load the tokenizer for the pre-trained model
    tokenizer = AutoTokenizer.from_pretrained('microsoft/graphcodebert-base')
    
    # Specify the path to the dataset
    dataset_path = '/content/drive/MyDrive/ensemble-codesim/karnalim/ensemble/data2.json'  # Replace with your dataset path
    full_dataset = CodePairDataset(file_path=dataset_path, tokenizer=tokenizer)

    # Split the dataset into training, validation, and test sets
    train_size = int(0.8 * len(full_dataset))
    test_val_size = len(full_dataset) - train_size
    val_size = int(0.5 * test_val_size)  # Split the remaining data equally into validation and test sets
    test_size = test_val_size - val_size

    # Randomly split the dataset into train, validation, and test sets
    train_dataset, remaining_dataset = torch.utils.data.random_split(full_dataset, [train_size, test_val_size])
    val_dataset, test_dataset = torch.utils.data.random_split(remaining_dataset, [val_size, test_size])

    # Initialize the custom model
    model = RobertaForSequenceClassificationWithOutput(num_labels=2, output_feature_dim=1)

    # Define training arguments
    training_args = TrainingArguments(
        output_dir='/content/sample_data/results',  # Directory to save model checkpoints and outputs
        num_train_epochs=3,  # Number of training epochs
        per_device_train_batch_size=8,  # Batch size for training
        warmup_steps=500,  # Number of warmup steps for learning rate scheduler
        weight_decay=0.01,  # Strength of weight decay
        logging_dir='./logs',  # Directory to save logs
        evaluation_strategy="steps",  # Evaluate the model at regular intervals (every `eval_steps`)
        eval_steps=500,  # Number of steps between evaluations
        save_strategy="steps",  # Save the model at regular intervals (every `save_steps`)
        save_steps=500,  # Number of steps between saving the model
        load_best_model_at_end=True,  # Load the best model found during training at the end
        metric_for_best_model="f1",  # Metric to use to select the best model
    )

    # Initialize the Trainer class for training and evaluation
    trainer = Trainer(
        model=model,  # The model to be trained
        args=training_args,  # Training arguments
        train_dataset=train_dataset,  # Training dataset
        eval_dataset=val_dataset,  # Evaluation dataset
        compute_metrics=compute_metrics,  # Function to compute evaluation metrics
        callbacks=[EarlyStoppingCallback(early_stopping_patience=3)],  # Early stopping callback
    )

    # Train the model
    trainer.train()

    # Evaluate the model on the validation set and print the results
    val_results = trainer.evaluate(val_dataset)
    print(f"Validation Precision: {val_results['eval_precision']:.4f}")
    print(f"Validation Recall: {val_results['eval_recall']:.4f}")
    print(f"Validation F1 Score: {val_results['eval_f1']:.4f}")

    # Evaluate the model on the test set and print the results
    test_results = trainer.evaluate(test_dataset)
    print(f"Test Precision: {test_results['eval_precision']:.4f}")
    print(f"Test Recall: {test_results['eval_recall']:.4f}")
    print(f"Test F1 Score: {test_results['eval_f1']:.4f}")

In [None]:
# Run the main function
# This cell initiates the entire process by calling the main function.

if __name__ == "__main__":
    main()