This notebook fine-tunes and evaluates BERT, XLNet, RoBERTa, and ALBERT on both Gold and Premo datasets for sentiment analysis.

It was written based on the approach used in the following paper: Zhang, T., Xu, B., Thung, F., Haryono, S. A., Lo, D., & Jiang, L. (2020, September). Sentiment analysis for software engineering: How far can pre-trained transformer models go?. In 2020 IEEE International Conference on Software Maintenance and Evolution (ICSME) (pp. 70-80). IEEE.

## Import Libraries

Import all necessary libraries for data processing, model training, and evaluation.

In [None]:
import json
import csv
import random
import time
import numpy as np
import pandas as pd
import torch
from pathlib import Path

from transformers import (
    BertTokenizer, BertForSequenceClassification,
    XLNetTokenizer, XLNetForSequenceClassification,
    RobertaTokenizer, RobertaForSequenceClassification,
    AlbertTokenizer, AlbertForSequenceClassification
)

from torch.optim import AdamW
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, precision_score, recall_score, f1_score
from tqdm import tqdm, trange

## Configuration and Hyperparameters

Define hyperparameters and model configurations.

In [None]:
# Hyperparameters
MAX_LEN = 256
BATCH_SIZE = 16
EPOCHS = 4
LEARNING_RATE = 2e-5
TEST_SIZE = 0.2
RANDOM_STATE = 42

# Model configurations
MODELS = [
    (BertForSequenceClassification, BertTokenizer, 'bert-base-cased', 'bert'),
    (XLNetForSequenceClassification, XLNetTokenizer, 'xlnet-base-cased', 'xlnet'),
    (RobertaForSequenceClassification, RobertaTokenizer, 'roberta-base', 'roberta'),
    (AlbertForSequenceClassification, AlbertTokenizer, 'albert-base-v1', 'albert')
]

## Setup Device and Seed

Configure the device (GPU/CPU) and set random seeds for reproducibility.

In [None]:
def seed_torch(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
n_gpu = torch.cuda.device_count()

if torch.cuda.is_available():
    print(f"Using GPU: {torch.cuda.get_device_name(0)}")
else:
    print("Using CPU")

## Data Loading Functions

Functions to load the Gold dataset (CSV) and Premo dataset (JSON).

In [None]:
def load_gold_dataset():
    """Load the Gold dataset from CSV"""
    print("Loading Gold dataset...")
    texts = []
    labels = []
    
    with open("data/github_gold.csv", newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile, delimiter=';')
        for row in reader:
            if 'Text' in row and row['Text'].strip() != '':
                texts.append(row['Text'])
                labels.append(row['Polarity'])
    
    print(f"Loaded {len(texts)} samples from Gold dataset")
    return texts, labels

def load_premo_dataset():
    """Load the Premo dataset from JSON"""
    print("Loading Premo dataset...")
    
    with open("data/dataset.json") as file:
        dataset = json.load(file)
    
    texts = []
    labels = []
    
    for message in dataset:
        text = message["raw_message"]
            
        # Extract label
        if "part2_aggregate" in message and "polarity" in message["part2_aggregate"]:
            if message["part2_aggregate"]["polarity"] != "undefined":
                label = message["part2_aggregate"]["polarity"]
            elif "discussion_polarity" in message:
                label = message["discussion_polarity"]
            else:
                continue
        elif "discussion_polarity" in message:
            label = message["discussion_polarity"]
        else:
            continue
        
        texts.append(text)
        labels.append(label)
    
    print(f"Loaded {len(texts)} samples from Premo dataset")
    return texts, labels

## Data Encoding

Function to tokenize and encode text data for transformer models.

In [None]:
def encode_data(texts, labels, tokenizer):
    """Encode texts and labels for transformer models"""
    label_map = {'positive': 1, 'negative': 2, 'neutral': 0}
    numeric_labels = [label_map[label] for label in labels]
    
    input_ids = []
    attention_masks = []
    
    print("Encoding data...")
    for text in tqdm(texts):
        encoded_dict = tokenizer(
            str(text),
            add_special_tokens=True,
            max_length=MAX_LEN,
            padding='max_length',
            return_attention_mask=True,
            return_tensors='pt',
            truncation=True
        )
        
        input_ids.append(encoded_dict['input_ids'])
        attention_masks.append(encoded_dict['attention_mask'])
    
    input_ids = torch.cat(input_ids, dim=0)
    attention_masks = torch.cat(attention_masks, dim=0)
    labels_tensor = torch.tensor(numeric_labels)
    
    return input_ids, attention_masks, labels_tensor

## Training Function

Function to train the model for the specified number of epochs.

In [None]:
def train_model(model, train_dataloader, optimizer):
    print(f"Training for {EPOCHS} epochs...")
    model.train()
    
    for epoch in trange(EPOCHS, desc="Epoch"):
        tr_loss = 0
        nb_tr_steps = 0
        
        for step, batch in enumerate(train_dataloader):
            batch = tuple(t.to(device) for t in batch)
            b_input_ids, b_input_mask, b_labels = batch
            
            optimizer.zero_grad()
            
            outputs = model(b_input_ids, token_type_ids=None,
                          attention_mask=b_input_mask, labels=b_labels)
            loss = outputs[0]
            
            loss.backward()
            optimizer.step()
            
            tr_loss += loss.item()
            nb_tr_steps += 1
        
        print(f"Epoch {epoch + 1} - Train loss: {tr_loss/nb_tr_steps:.4f}")

## Evaluation Function

Function to evaluate the model on test data and return predictions.

In [None]:
def evaluate_model(model, test_dataloader):
    """Evaluate the model and return predictions"""
    print("Evaluating model...")
    model.eval()
    predictions, true_labels = [], []
    
    for batch in test_dataloader:
        batch = tuple(t.to(device) for t in batch)
        b_input_ids, b_input_mask, b_labels = batch
        
        with torch.no_grad():
            outputs = model(b_input_ids, token_type_ids=None,
                          attention_mask=b_input_mask)
            logits = outputs[0]
        
        logits = logits.detach().cpu().numpy()
        label_ids = b_labels.to('cpu').numpy()
        
        predictions.append(logits)
        true_labels.append(label_ids)
    
    flat_predictions = np.concatenate(predictions, axis=0)
    flat_predictions = np.argmax(flat_predictions, axis=1)
    flat_true_labels = np.concatenate(true_labels, axis=0)
    
    return flat_predictions, flat_true_labels

## Metrics Calculation

Functions to calculate, display, and save evaluation metrics.

In [None]:
def calculate_metrics(true_labels, predictions):
    labels = [0, 1, 2] 
    label_names = ['neutral', 'positive', 'negative']
    
    precision = precision_score(true_labels, predictions, labels=labels, average=None, zero_division=0)
    recall = recall_score(true_labels, predictions, labels=labels, average=None, zero_division=0)
    f1_scores = f1_score(true_labels, predictions, labels=labels, average=None, zero_division=0)
    
    macro_precision = precision_score(true_labels, predictions, average='macro', zero_division=0)
    micro_precision = precision_score(true_labels, predictions, average='micro', zero_division=0)
    macro_recall = recall_score(true_labels, predictions, average='macro', zero_division=0)
    micro_recall = recall_score(true_labels, predictions, average='micro', zero_division=0)
    macro_f1 = f1_score(true_labels, predictions, average='macro', zero_division=0)
    micro_f1 = f1_score(true_labels, predictions, average='micro', zero_division=0)
    
    return {
        'per_class': {
            'precision': {label_names[i]: precision[i] for i in range(3)},
            'recall': {label_names[i]: recall[i] for i in range(3)},
            'f1_score': {label_names[i]: f1_scores[i] for i in range(3)}
        },
        'macro': {
            'precision': macro_precision,
            'recall': macro_recall,
            'f1_score': macro_f1
        },
        'micro': {
            'precision': micro_precision,
            'recall': micro_recall,
            'f1_score': micro_f1
        }
    }

def print_metrics(metrics, model_name, dataset_name):
    """Print metrics in a formatted way"""
    print(f"\n{'='*60}")
    print(f"Results for {model_name} on {dataset_name} dataset")
    print(f"{'='*60}")
    
    print("\nPer-class metrics:")
    print(f"{'Class':<12} {'Precision':<12} {'Recall':<12} {'F1-Score':<12}")
    print("-" * 48)
    for label in ['neutral', 'positive', 'negative']:
        print(f"{label:<12} {metrics['per_class']['precision'][label]:<12.4f} "
              f"{metrics['per_class']['recall'][label]:<12.4f} "
              f"{metrics['per_class']['f1_score'][label]:<12.4f}")
    
    print("\nMacro averages:")
    print(f"Precision: {metrics['macro']['precision']:.4f}")
    print(f"Recall:    {metrics['macro']['recall']:.4f}")
    print(f"F1-Score:  {metrics['macro']['f1_score']:.4f}")
    
    print("\nMicro averages:")
    print(f"Precision: {metrics['micro']['precision']:.4f}")
    print(f"Recall:    {metrics['micro']['recall']:.4f}")
    print(f"F1-Score:  {metrics['micro']['f1_score']:.4f}")

def save_results(metrics, predictions, true_labels, model_name, dataset_name, output_dir):
    """Save results to JSON file"""
    output_dir.mkdir(parents=True, exist_ok=True)
    
    results = {
        'model': model_name,
        'dataset': dataset_name,
        'metrics': metrics,
        'predictions': predictions.tolist(),
        'true_labels': true_labels.tolist()
    }
    
    output_file = output_dir / f"{model_name}_{dataset_name}.json"
    with open(output_file, 'w') as f:
        json.dump(results, f, indent=2)
    
    print(f"Results saved to {output_file}")

## Complete Training Pipeline

Function that orchestrates the complete training and evaluation process for a single model-dataset combination.

In [None]:
def train_and_evaluate(model_config, texts, labels, dataset_name, output_dir):
    model_class, tokenizer_class, pretrained_name, model_name = model_config
    
    print(f"\n{'#'*70}")
    print(f"# Training {model_name.upper()} on {dataset_name} dataset")
    print(f"{'#'*70}\n")
    
    seed_torch(RANDOM_STATE)
    
    train_texts, test_texts, train_labels, test_labels = train_test_split(
        texts, labels, test_size=TEST_SIZE, random_state=RANDOM_STATE, stratify=labels
    )
    
    print(f"Train size: {len(train_texts)}, Test size: {len(test_texts)}")
    
    print(f"Loading tokenizer: {pretrained_name}")
    tokenizer = tokenizer_class.from_pretrained(pretrained_name, do_lower_case=True)
    
    train_inputs, train_masks, train_labels_tensor = encode_data(train_texts, train_labels, tokenizer)
    test_inputs, test_masks, test_labels_tensor = encode_data(test_texts, test_labels, tokenizer)
    
    train_data = TensorDataset(train_inputs, train_masks, train_labels_tensor)
    train_sampler = RandomSampler(train_data)
    train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=BATCH_SIZE)
    
    test_data = TensorDataset(test_inputs, test_masks, test_labels_tensor)
    test_sampler = SequentialSampler(test_data)
    test_dataloader = DataLoader(test_data, sampler=test_sampler, batch_size=BATCH_SIZE)
    
    print(f"Loading model: {pretrained_name}")
    model = model_class.from_pretrained(pretrained_name, num_labels=3)
    
    if torch.cuda.is_available():
        model.cuda()
    
    param_optimizer = list(model.named_parameters())
    no_decay = ['bias', 'gamma', 'beta']
    optimizer_grouped_parameters = [
        {'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)],
         'weight_decay_rate': 0.01},
        {'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)],
         'weight_decay_rate': 0.0}
    ]
    optimizer = AdamW(optimizer_grouped_parameters, lr=LEARNING_RATE)
    
    start_time = time.time()
    train_model(model, train_dataloader, optimizer)
    train_time = time.time() - start_time
    print(f"Training completed in {train_time:.2f} seconds")
    
    start_time = time.time()
    predictions, true_labels = evaluate_model(model, test_dataloader)
    eval_time = time.time() - start_time
    print(f"Evaluation completed in {eval_time:.2f} seconds")
    
    metrics = calculate_metrics(true_labels, predictions)
    
    print_metrics(metrics, model_name, dataset_name)
    
    save_results(metrics, predictions, true_labels, model_name, dataset_name, output_dir)
    
    del model
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

### Main Execution

In [None]:
print("="*70)
print("Transformer Fine-tuning and Evaluation")
print("="*70)

output_dir = Path("output_transformers")

gold_texts, gold_labels = load_gold_dataset()
premo_texts, premo_labels = load_premo_dataset()

datasets = [
    ("gold", gold_texts, gold_labels),
    ("premo", premo_texts, premo_labels)
]

print("\n" + "="*70)
print("FINE-TUNED EVALUATION")
print("="*70)
for dataset_name, texts, labels in datasets:
    for model_config in MODELS:
        try:
            train_and_evaluate(model_config, texts, labels, dataset_name, output_dir)
        except Exception as e:
            print(f"\nError training {model_config[3]} on {dataset_name}: {str(e)}")
            continue

print("\n" + "="*70)
print("All experiments completed!")
print("="*70)