# **Classification of app reviews for requirements engineering using deep learning models**

## **Data loading and processing**

In [None]:
# Load multi-label dataset from CSV file
from datasets import load_dataset

# Load dataset
dataset = load_dataset('csv', data_files='dataset/gpt_multi_label_16000.csv')

print(dataset)
print(dataset['train'][0])


In [None]:
# Convert labels into a list
def process_labels(example):
    example['label'] = [example['feature request'],
                        example['bug report'],
                        example['rating'],
                        example['user experience']]
    return example

dataset = dataset.map(process_labels)

print(dataset['train'][0])


*Random Split*

In [None]:
# Split the dataset into 80% train and 10% test and 10% validation
train_test_split = dataset['train'].train_test_split(test_size=0.2)

# Separate into training sets
train_dataset = train_test_split['train']

# Separate the rest 20% into testing and validation sets
test_val_split = train_test_split['test'].train_test_split(test_size=0.5)
test_dataset = test_val_split['test']
val_dataset = test_val_split['train']

print(f"Train size: {len(train_dataset)}")
print(f"Test size: {len(test_dataset)}")
print(f"Validation size: {len(val_dataset)}")
print(train_dataset)

## **BERT Model**

In [None]:
# Data preprocessing
from transformers import BertTokenizer

# Load BERT tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Tokenize the text and add labels
def preprocess_data(example):
    tokenized = tokenizer(example['review'], 
                          padding='max_length', 
                          truncation=True, 
                          max_length=128)
    tokenized['labels'] = example['label']  # Add multi-label
    return tokenized

# Apply preprocessing to train, validation, and test datasets
train_dataset = train_dataset.map(preprocess_data, batched=True)
val_dataset = val_dataset.map(preprocess_data, batched=True)
test_dataset = test_dataset.map(preprocess_data, batched=True)

# Convert datasets to PyTorch format
train_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
val_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
test_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])


In [None]:
# Load a pre-trained BERT model
from transformers import BertForSequenceClassification
import torch.nn as nn

# Load BERT with a custom classification head for multi-label classification
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=4)

# Modify the classifier to include a Sigmoid activation for multi-label tasks
model.classifier = nn.Linear(768, 4) # BERT hidden size is 768


In [None]:
# Define training components
import torch
from torch.optim import AdamW
from torch.utils.data import DataLoader
from transformers import get_linear_schedule_with_warmup
from torch.optim.lr_scheduler import ReduceLROnPlateau

# Define the loss function for multi-label tasks
loss_fn = nn.BCEWithLogitsLoss()

# Define the optimizer
optimizer = AdamW(model.parameters(), lr=2e-5, weight_decay=1e-5) 

# Create DataLoaders for training and validation
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=32)
test_dataloader = DataLoader(test_dataset, batch_size=32)

# Set device to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

epochs = 10

# Total training steps and Warm-Up steps
total_steps = len(train_dataloader) * epochs
warmup_steps = int(0.1 * total_steps)  #  Warm-Up for the first 10% of training steps

# Warm-Up scheduler
warmup_scheduler = get_linear_schedule_with_warmup(
    optimizer, num_warmup_steps=warmup_steps, num_training_steps=total_steps
)


In [None]:
# Train the model
import time
import os
import sys
import datetime
from sklearn.metrics import classification_report

# Directory to save the model
save_dir = os.path.join(".", "models", "bert")
os.makedirs(save_dir, exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
save_path = os.path.join(save_dir, f"best_model_{timestamp}.pth")
backup_path = os.path.join(save_dir, f"backup_model_{timestamp}.pth")
best_model_path = None

val_losses = []
global_step = 0
epochs_no_improve = 0
best_val_loss = float('inf')
patience = 3

# Define a function to save the model
def save_model(model, path):
    try:
        torch.save(model.state_dict(), path)
        print(f"Model saved to {path}")
        return True
    except Exception as e:
        print(f"Failed to save the model to {path}: {e}")
        return False
        
start_time = time.time()
# Training and Validation Loop
for epoch in range(epochs):
    # Training
    model.train()
    total_loss = 0
    for batch in train_dataloader:
        # Move data to the device
        inputs = {key: val.to(device) for key, val in batch.items() if key != 'labels'}
        labels = batch['labels'].to(device)

        # Forward pass
        outputs = model(**inputs)
        loss = loss_fn(outputs.logits, labels.float())
        total_loss += loss.item()

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Update the learning rate scheduler
        if global_step < warmup_steps:
            warmup_scheduler.step()
        else:
            pass # After Warm-Up ends, stop using Warm-Up scheduler
        global_step += 1


    # Calculate average training loss
    avg_train_loss = total_loss / len(train_dataloader)

    # Validation
    model.eval()
    val_loss = 0
    all_labels = []
    all_preds = []
    with torch.no_grad():
        for batch in val_dataloader:
            # Move data to the device
            inputs = {key: val.to(device) for key, val in batch.items() if key != 'labels'}
            labels = batch['labels'].to(device)

            # Forward pass
            outputs = model(**inputs)
            loss = loss_fn(outputs.logits, labels.float())
            val_loss += loss.item()

            # Store true labels and predictions for evaluation
            preds = torch.sigmoid(outputs.logits).cpu().numpy() > 0.5  # Multi-label thresholding
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds)
            
    # Calculate average validation loss
    avg_val_loss = val_loss / len(val_dataloader)
    val_losses.append(avg_val_loss) # Append the average validation loss to the list
    
    # Print epoch metrics
    print(f"Epoch {epoch + 1:}")
    print(f"Training Loss:   {avg_train_loss}")
    print(f"Validation Loss: {avg_val_loss}")
    
    # Save the best model based on validation loss
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        epochs_no_improve = 0
        # Save the best model
        saved = save_model(model, save_path)
        if saved:
            best_model_path = save_path
        else:
            # Attempt backup path if saving to primary path fails
            backup_saved = save_model(model, backup_path)
            if backup_saved:
                best_model_path = backup_path
            else:
                print("Failed to save the model to both primary and backup paths. Stopping training.")
                sys.exit(1)
    else:
        epochs_no_improve += 1
        if epochs_no_improve >= patience:
            print("Early stopping due to no improvement in validation loss.")
            break
end_time = time.time()               

In [None]:
# Draw the validation loss curve
import matplotlib.pyplot as plt

plt.plot(range(1, len(val_losses) + 1), val_losses)
plt.xlabel('Epoch')
plt.ylabel('Validation Loss')
plt.title('Validation Loss Over Epochs')
plt.show()

In [None]:
import pandas as pd
# Load the best model
if best_model_path is not None and os.path.exists(best_model_path):
    model.load_state_dict(torch.load(best_model_path, weights_only=True))
    print(f"Loaded the best model from {best_model_path}")
else:
    print("No best model was saved during training.")
    sys.exit(1)
    
model.to(device)
model.eval()

# Evaluate on the test set
all_labels = []
all_preds = []
with torch.no_grad():
    for batch in test_dataloader:
        inputs = {key: val.to(device) for key, val in batch.items() if key != 'labels'}
        labels = batch['labels'].to(device)

        outputs = model(**inputs)
        preds = torch.sigmoid(outputs.logits).cpu().numpy() > 0.5
        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(preds)

print("Test Classification Report:")
target_names = ['feature request', 'bug report', 'rating', 'user experience']
print(classification_report(all_labels, all_preds, target_names=target_names, zero_division=0, digits=4))

# Save report as CSV
report_dict = classification_report(
    all_labels,
    all_preds,
    target_names=target_names,
    output_dict=True,
    zero_division=0
)
report_df = pd.DataFrame(report_dict).transpose().round(4)

report_path = os.path.join(save_dir, f"classification_report_{timestamp}.csv")
report_df.to_csv(report_path, float_format="%.4f")
print(f"Classification report saved to {report_path}")

# Print macro F1 and total time
macro_f1 = report_dict["macro avg"]["f1-score"]
print(f"\nMacro F1 Score: {macro_f1:.4f}")
print("Total time: {:.2f}s".format(end_time - start_time))

## **RoBERTa Model**

In [None]:
from transformers import RobertaTokenizer, RobertaForSequenceClassification

# Load RoBERTa tokenizer
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')

# Re-tokenize the text using RoBERTa tokenizer and add labels
def preprocess_data_roberta(example):
    tokenized = tokenizer(example['review'], 
                          padding='max_length', 
                          truncation=True, 
                          max_length=128)
    tokenized['labels'] = example['label']  # Add multi-label
    return tokenized

# Apply preprocessing to train, validation, and test datasets
train_dataset_roberta = train_dataset.map(preprocess_data_roberta, batched=True)
val_dataset_roberta = val_dataset.map(preprocess_data_roberta, batched=True)
test_dataset_roberta = test_dataset.map(preprocess_data_roberta, batched=True)

# Convert datasets to PyTorch format
train_dataset_roberta.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
val_dataset_roberta.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
test_dataset_roberta.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])

# Load RoBERTa model with a custom classification head for multi-label classification
model_roberta = RobertaForSequenceClassification.from_pretrained('roberta-base', num_labels=4,)

# Define the loss function for multi-label tasks
loss_fn = nn.BCEWithLogitsLoss()

# Define the optimizer
optimizer = AdamW(model_roberta.parameters(), lr=2e-5, weight_decay=1e-5) 

# Create DataLoaders for training and validation
train_dataloader = DataLoader(train_dataset_roberta, batch_size=32, shuffle=True)
val_dataloader = DataLoader(val_dataset_roberta, batch_size=32)
test_dataloader = DataLoader(test_dataset_roberta, batch_size=32)

# Set device to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_roberta.to(device)

epochs = 10

# Total training steps and Warm-Up steps
total_steps = len(train_dataloader) * epochs
warmup_steps = int(0.1 * total_steps)  # Warm-Up for the first 10% of training steps

# Scheduler for learning rate
scheduler = get_linear_schedule_with_warmup(
    optimizer, num_warmup_steps=warmup_steps, num_training_steps=total_steps
)

# Train the model
# Directory to save the model
save_dir = os.path.join(".", "models", "roberta")
os.makedirs(save_dir, exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
save_path = os.path.join(save_dir, f"best_model_roberta_{timestamp}.pth")
backup_path = os.path.join(save_dir, f"backup_model_roberta_{timestamp}.pth")

best_model_path = None

val_losses = []
best_val_loss = float('inf')
epochs_no_improve = 0
patience = 3

start_time = time.time()
# Training and Validation Loop
for epoch in range(epochs):
    # Training
    model_roberta.train()
    total_loss = 0
    for batch in train_dataloader:
        # Move data to the device
        inputs = {
            'input_ids': batch['input_ids'].to(device),
            'attention_mask': batch['attention_mask'].to(device)
        }
        labels = batch['labels'].to(device).float()

        # Forward pass
        outputs = model_roberta(**inputs)
        logits = outputs.logits

        # Compute loss
        loss = loss_fn(logits, labels)
        total_loss += loss.item()

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        scheduler.step()

    # Calculate average training loss
    avg_train_loss = total_loss / len(train_dataloader)

    # Validation
    model_roberta.eval()
    val_loss = 0
    all_labels = []
    all_preds = []
    with torch.no_grad():
        for batch in val_dataloader:
            # Move data to the device
            inputs = {
                'input_ids': batch['input_ids'].to(device),
                'attention_mask': batch['attention_mask'].to(device)
            }
            labels = batch['labels'].to(device).float()

            # Forward pass
            outputs = model_roberta(**inputs)
            logits = outputs.logits

            # Compute loss
            loss = loss_fn(logits, labels)
            val_loss += loss.item()

            # Store true labels and predictions for evaluation
            preds = (torch.sigmoid(logits) > 0.5).cpu().numpy()
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds)
            
    # Calculate average validation loss
    avg_val_loss = val_loss / len(val_dataloader)
    val_losses.append(avg_val_loss)  # Append the average validation loss to the list

    # Print epoch metrics
    print(f"Epoch {epoch + 1}/{epochs}")
    print(f"Training Loss:   {avg_train_loss:}")
    print(f"Validation Loss: {avg_val_loss:}")


    # Save the best model based on validation loss
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        epochs_no_improve = 0
        # Save the best model
        saved = save_model(model_roberta, save_path)
        if saved:
            best_model_path = save_path
        else:
            # Attempt backup path if saving to primary path fails
            backup_saved = save_model(model_roberta, backup_path)
            if backup_saved:
                best_model_path = backup_path
            else:
                print("Failed to save the model to both primary and backup paths. Stopping training.")
                sys.exit(1)
    else:
        epochs_no_improve += 1
        if epochs_no_improve >= patience:
            print("Early stopping due to no improvement in validation loss.")
            break
end_time = time.time()               

In [None]:
# Draw the validation loss curve
plt.plot(range(1, len(val_losses) + 1), val_losses)
plt.xlabel('Epoch')
plt.ylabel('Validation Loss')
plt.title('Validation Loss Over Epochs (RoBERTa)')
plt.show()

In [None]:
# Load the best model
if best_model_path is not None and os.path.exists(best_model_path):
    model_roberta.load_state_dict(torch.load(best_model_path))
    print(f"Loaded the best model from {best_model_path}")
else:
    print("No best model was saved during training.")
    sys.exit(1)
    
# Evaluate on the test set    
model_roberta.to(device)
model_roberta.eval()
all_labels = []
all_preds = []
with torch.no_grad():
    for batch in test_dataloader:
        inputs = {
            'input_ids': batch['input_ids'].to(device),
            'attention_mask': batch['attention_mask'].to(device)
        }
        labels = batch['labels'].cpu().numpy()
        outputs = model_roberta(**inputs)
        logits = outputs.logits
        preds = (torch.sigmoid(logits) > 0.5).cpu().numpy()

        all_labels.extend(labels)
        all_preds.extend(preds)

print("Test Classification Report:")
target_names = ['feature request', 'bug report', 'rating', 'user experience']
print(classification_report(all_labels, all_preds, target_names=target_names, zero_division=0, digits=4))

# Save report as CSV
report_dict = classification_report(
    all_labels,
    all_preds,
    target_names=target_names,
    output_dict=True,
    zero_division=0
)
report_df = pd.DataFrame(report_dict).transpose().round(4)

report_path = os.path.join(save_dir, f"classification_report_{timestamp}.csv")
report_df.to_csv(report_path, float_format="%.4f")
print(f"Classification report saved to {report_path}")

# Print macro F1 and total time
macro_f1 = report_dict["macro avg"]["f1-score"]
print(f"\nMacro F1 Score: {macro_f1:.4f}")
print("Total time: {:.2f}s".format(end_time - start_time))

## **BART Model**

In [None]:
from transformers import BartTokenizer, BartForSequenceClassification
import torch.nn as nn
import torch
from torch.utils.data import DataLoader
from transformers import get_linear_schedule_with_warmup, AdamW
import os
import sys
import datetime
from sklearn.metrics import classification_report
import matplotlib.pyplot as plt

# Use BartTokenizer for tokenization
bart_tokenizer = BartTokenizer.from_pretrained('facebook/bart-base')

def preprocess_data_bart(example):
    tokenized = bart_tokenizer(
        example['review'], 
        padding='max_length', 
        truncation=True, 
        max_length=128
    )
    tokenized['labels'] = example['label']  # Multi-label
    return tokenized

# Tokenize the training, validation, and test datasets
train_dataset_bart = train_dataset.map(preprocess_data_bart, batched=True)
val_dataset_bart = val_dataset.map(preprocess_data_bart, batched=True)
test_dataset_bart = test_dataset.map(preprocess_data_bart, batched=True)

train_dataset_bart.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
val_dataset_bart.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
test_dataset_bart.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])

# Initialize the Bart model for multi-label classification
model_bart = BartForSequenceClassification.from_pretrained(
    'facebook/bart-base', 
    num_labels=4,
    problem_type='multi_label_classification'
)

# Define the multi-label loss function
loss_fn = nn.BCEWithLogitsLoss()

# Define the optimizer
optimizer = AdamW(model_bart.parameters(), lr=2e-5, weight_decay=1e-5)

# Create DataLoaders
train_dataloader = DataLoader(train_dataset_bart, batch_size=32, shuffle=True)
val_dataloader = DataLoader(val_dataset_bart, batch_size=32)
test_dataloader = DataLoader(test_dataset_bart, batch_size=32)

# Use GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_bart.to(device)

epochs = 10
total_steps = len(train_dataloader) * epochs
warmup_steps = int(0.1 * total_steps)  # Warm-up for the first 10% of steps

# Learning rate scheduler (linear warm-up + linear decay)
scheduler = get_linear_schedule_with_warmup(
    optimizer, num_warmup_steps=warmup_steps, num_training_steps=total_steps
)

# Directory to save the model
save_dir = os.path.join(".", "models", "bart")
os.makedirs(save_dir, exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
save_path = os.path.join(save_dir, f"best_model_bart_{timestamp}.pth")
backup_path = os.path.join(save_dir, f"backup_model_bart_{timestamp}.pth")

best_model_path = None
val_losses = []
best_val_loss = float('inf')
epochs_no_improve = 0
patience = 3

def save_model(model, path):
    try:
        torch.save(model.state_dict(), path)
        print(f"Model saved to {path}")
        return True
    except Exception as e:
        print(f"Failed to save the model to {path}: {e}")
        return False
        
start_time = time.time()
# Training and validation loop
for epoch in range(epochs):
    # Training
    model_bart.train()
    total_loss = 0
    for batch in train_dataloader:
        inputs = {
            'input_ids': batch['input_ids'].to(device),
            'attention_mask': batch['attention_mask'].to(device)
        }
        labels = batch['labels'].to(device).float()

        outputs = model_bart(**inputs)
        logits = outputs.logits
        loss = loss_fn(logits, labels)
        total_loss += loss.item()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        scheduler.step()

    avg_train_loss = total_loss / len(train_dataloader)

    # Validation
    model_bart.eval()
    val_loss = 0
    all_labels = []
    all_preds = []
    with torch.no_grad():
        for batch in val_dataloader:
            inputs = {
                'input_ids': batch['input_ids'].to(device),
                'attention_mask': batch['attention_mask'].to(device)
            }
            labels = batch['labels'].to(device).float()

            outputs = model_bart(**inputs)
            logits = outputs.logits
            loss = loss_fn(logits, labels)
            val_loss += loss.item()

            preds = (torch.sigmoid(logits) > 0.5).cpu().numpy()
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds)

    avg_val_loss = val_loss / len(val_dataloader)
    val_losses.append(avg_val_loss)

    print(f"Epoch {epoch + 1}/{epochs}")
    print(f"Training Loss:   {avg_train_loss}")
    print(f"Validation Loss: {avg_val_loss}")

    # Save the best model
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        epochs_no_improve = 0
        saved = save_model(model_bart, save_path)
        if saved:
            best_model_path = save_path
        else:
            backup_saved = save_model(model_bart, backup_path)
            if backup_saved:
                best_model_path = backup_path
            else:
                print("Failed to save the model to both primary and backup paths. Stopping training.")
                sys.exit(1)
    else:
        epochs_no_improve += 1
        if epochs_no_improve >= patience:
            print("Early stopping due to no improvement in validation loss.")
            break
end_time = time.time()     

In [None]:
# Plot the validation loss curve
plt.plot(range(1, len(val_losses) + 1), val_losses)
plt.xlabel('Epoch')
plt.ylabel('Validation Loss')
plt.title('Validation Loss Over Epochs (BART)')
plt.show()


In [None]:
# Load the best model
if best_model_path is not None and os.path.exists(best_model_path):
    model_bart.load_state_dict(torch.load(best_model_path))
    print(f"Loaded the best model from {best_model_path}")
else:
    print("No best model was saved during training.")
    sys.exit(1)

model_bart.to(device)
model_bart.eval()

# Test evaluation
all_labels = []
all_preds = []
with torch.no_grad():
    for batch in test_dataloader:
        inputs = {
            'input_ids': batch['input_ids'].to(device),
            'attention_mask': batch['attention_mask'].to(device)
        }
        labels = batch['labels'].cpu().numpy()
        outputs = model_bart(**inputs)
        logits = outputs.logits
        preds = (torch.sigmoid(logits) > 0.5).cpu().numpy()

        all_labels.extend(labels)
        all_preds.extend(preds)

print("Test Classification Report:")
target_names = ['feature request', 'bug report', 'rating', 'user experience']
print(classification_report(all_labels, all_preds, target_names=target_names, zero_division=0, digits=4))

# Save report as CSV
report_dict = classification_report(
    all_labels,
    all_preds,
    target_names=target_names,
    output_dict=True,
    zero_division=0
)
report_df = pd.DataFrame(report_dict).transpose().round(4)

report_path = os.path.join(save_dir, f"classification_report_{timestamp}.csv")
report_df.to_csv(report_path, float_format="%.4f")
print(f"Classification report saved to {report_path}")

# Print macro F1 and total time
macro_f1 = report_dict["macro avg"]["f1-score"]
print(f"\nMacro F1 Score: {macro_f1:.4f}")
print("Total time: {:.2f}s".format(end_time - start_time))