In [None]:
# Download NLTK resources
import nltk
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')

import numpy as np
import pandas as pd
import json
import re
from transformers import GPT2Tokenizer, BartTokenizer, AutoTokenizer, BartForConditionalGeneration, BartConfig
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import string
import os
import torch
from torch.utils.data import Dataset, DataLoader
import json
import time

# Data Preprocessing

In [None]:
directory = r"Local Directory\eLife"
train_name = os.path.join(directory, "train.json")
val_name = os.path.join(directory, "val.json")
test_name = os.path.join(directory, "test.json")

# Open training data
with open(train_name, 'r') as f:
    data_train = json.load(f)

# Open validation data
with open(val_name, 'r') as f:
    data_val = json.load(f)

# Open test data
with open(test_name, 'r') as f:
    data_test = json.load(f)

In [None]:
def process_data(data):

    preprocessed_texts = []

    for article in data:
    
        # Preprocess title
        # Add dot at the end of the title
        title = article["title"] + "."
    
        # Preprocess sections and headings
        article_sections = {}
        main_body = ""
        for sub_sections, heading in zip(article["sections"], [heading.lower() for heading in article["headings"]]):
            sub_section_text = " ".join(sub_sections)
            # Add space between sections
            main_body += sub_section_text + " "
            article_sections[f"{heading}"] = sub_section_text
    
        # Preprocess abstract, summary, and keywords
        abstract = " ".join(article["abstract"])
        summary = " ".join(article["summary"])
        # Add dot at the beginning and end of keywords
        keywords = "." + " ".join(article["keywords"]) + "."
    
        # Combine all sections into main body
        # Remove leading/trailing spaces
        main_body = main_body.strip()
    
        # Combine all components into final data
        preprocessed_texts.append({
            "abstract": f"""{abstract}""",
            **article_sections,
            "main_body": f"""{main_body}""",
            "complete_text": f"""{title} {abstract} {main_body} {keywords}""",
            "summary": f"""{summary}""",
        })
    return preprocessed_texts

def separate_body_label(processed_data):

    body_list = list()
    label_list = list()

    for article in processed_data:
        body_list.append(article["complete_text"])
        label_list.append(article["summary"])

    return body_list, label_list

In [None]:
# Preprocess training, validation, and test datasets
preprocessed_train = process_data(data_train)
preprocessed_val = process_data(data_val)
preprocessed_test = process_data(data_test)

# Create a new list by concatenating preprocessed_train and preprocessed_val
merged_list = preprocessed_train + preprocessed_val
print(f"Length of Training: {len(preprocessed_train)}")
print(f"Length of Validation: {len(preprocessed_val)}")
print(f"Length of Merged: {len(merged_list)}")

# Seperate the main body and summary of the articles in each dataset
training_body, training_label = separate_body_label(preprocessed_train)
val_body, val_label = separate_body_label(preprocessed_val)
test_body, test_label = separate_body_label(preprocessed_test)
merged_body, merged_label = separate_body_label(merged_list)

The merged_list will be used in the final fine-tuning of the model after determining the optimal learning rate and number of epochs.

# Hyperparameter Tuning

In [None]:
import torch
from transformers import BartForConditionalGeneration, BartTokenizer, BartConfig
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt

# Load the pretrained BART model and tokenizer
tokenizer = BartTokenizer.from_pretrained("facebook/bart-large-cnn")

# Define a custom dataset class for your data
class BiomedicalDataset(Dataset):
    def __init__(self, data, labels, tokenizer, max_length):
        self.data = data
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        input_text = self.data[idx]
        target_text = self.labels[idx]
        input_encoding = self.tokenizer(
            input_text, max_length=self.max_length, truncation=True, padding='max_length', return_tensors="pt"
        )
        target_encoding = self.tokenizer(
            target_text, max_length=self.max_length, truncation=True, padding='max_length', return_tensors="pt"
        )

        input_ids = input_encoding["input_ids"].squeeze()
        labels = target_encoding["input_ids"].squeeze()

        return {
            "input_ids": input_ids,
            "attention_mask": input_encoding["attention_mask"].squeeze(),
            "labels": labels
        }

# Define your datasets and dataloaders
max_len = 800
batch_size = 16

train_dataset = BiomedicalDataset(training_body, training_label, tokenizer, max_length=max_len)
val_dataset = BiomedicalDataset(val_body, val_label, tokenizer, max_length=max_len)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size)

# Observe the learning rate and number of epochs
learning_rates = [1e-6, 5e-6, 1e-5, 5e-5]
num_epochs = 20
loss_results = {lr: {"train_loss": list(), "val_loss": list()} for lr in learning_rates}

for lr in learning_rates:
    
    model = BartForConditionalGeneration.from_pretrained("facebook/bart-large-cnn")
    model.to("cuda" if torch.cuda.is_available() else "cpu")
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr)

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for batch in train_dataloader:
            input_ids = batch["input_ids"].to(model.device)
            attention_mask = batch["attention_mask"].to(model.device)
            labels = batch["labels"].to(model.device)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            total_loss += loss.item()

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        avg_train_loss = total_loss / len(train_dataloader)
        loss_results[lr]["train_loss"].append(avg_train_loss)

        model.eval()
        val_total_loss = 0
        with torch.no_grad():
            for batch in val_dataloader:
                input_ids = batch["input_ids"].to(model.device)
                attention_mask = batch["attention_mask"].to(model.device)
                labels = batch["labels"].to(model.device)

                outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
                val_loss = outputs.loss
                val_total_loss += val_loss.item()

        avg_val_loss = val_total_loss / len(val_dataloader)
        loss_results[lr]["val_loss"].append(avg_val_loss)

        print(f"Learning Rate: {lr}, Epoch {epoch + 1}/{num_epochs}, Avg Train Loss: {avg_train_loss}, Avg Val Loss: {avg_val_loss}")

# Plot the validation curves for all learning rates in one plot
plt.figure(figsize=(12, 8))
# Define base colors for different learning rates
colors = ['red', 'green', 'blue']
for i, lr in enumerate(learning_rates):
    plt.plot(loss_results[lr]["train_loss"], label=f'Train Loss (LR={lr})', color=colors[i], linestyle='-', alpha=0.7)
    plt.plot(loss_results[lr]["val_loss"], label=f'Val Loss (LR={lr})', color=colors[i], linestyle='--', alpha=0.7)

plt.title('Training and Validation Loss by Learning Rate')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)
plt.show()

# Fine-Tuning

In [None]:
# Define a custom dataset class for your data
class BiomedicalDataset(Dataset):
    def __init__(self, data, labels, tokenizer, max_length):
        self.data = data
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        input_text = self.data[idx]
        target_text = self.labels[idx]
        input_encoding = self.tokenizer(
            input_text, max_length=self.max_length, truncation=True, padding='max_length', return_tensors="pt"
        )
        target_encoding = self.tokenizer(
            target_text, max_length=self.max_length, truncation=True, padding='max_length', return_tensors="pt"
        )

        input_ids = input_encoding["input_ids"].squeeze()
        labels = target_encoding["input_ids"].squeeze()

        return {
            "input_ids": input_ids,
            "attention_mask": input_encoding["attention_mask"].squeeze(),
            "labels": labels
        }

# Evaluation function
def evaluate(model, dataloader):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for batch in dataloader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            total_loss += loss.item()

    avg_loss = total_loss / len(dataloader)
    return avg_loss

In [None]:
# Define your datasets and dataloaders
max_len = 800
batch_size = 16

# Define the optimal hyperparameters
learning_rate = 5e-6
num_epochs = 6

# Load the pretrained BART model and tokenizer
tokenizer = BartTokenizer.from_pretrained("facebook/bart-large-cnn")
model = BartForConditionalGeneration.from_pretrained("facebook/bart-large-cnn")
model.to("cuda" if torch.cuda.is_available() else "cpu")
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

# Use the merged training and validation sets to train the model
train_dataset = BiomedicalDataset(merged_body, merged_label, tokenizer, max_length=max_len)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Define the test dataset and dataloader
test_dataset = BiomedicalDataset(test_body, test_label, tokenizer, max_length=max_len)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Start timing the training process
start_time = time.time()

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    
    for batch in train_dataloader:
        input_ids = batch["input_ids"].to(model.device)
        attention_mask = batch["attention_mask"].to(model.device)
        labels = batch["labels"].to(model.device)

        outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        total_loss += loss.item()
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # Calculate training and test errors per epoch
    avg_train_loss = total_loss / len(train_dataloader)
    test_loss = evaluate(model, test_dataloader)
    print(f"Epoch-{epoch+1} >>> Training Loss : {avg_train_loss:.3f} & Test Loss : {test_loss:.3f}")
    # Calculate the test error
    
    print(f"Test Loss for Epoch-{epoch+1}: {test_loss:.3f}")

# End timing the training process
end_time = time.time()
total_time = end_time - start_time
print(f"Total training time: {total_time:.2f} seconds")

# Save the final model and tokenizer
model.save_pretrained("fine_tuned_bart_model")
tokenizer.save_pretrained("fine_tuned_bart_model")

## Check the working directory

In [None]:
import os

# Check the current working directory
current_dir = os.getcwd()
print(f"Current Working Directory: {current_dir}")

# List the contents of the current directory
contents = os.listdir(current_dir)
print(f"Contents of {current_dir}: {contents}")

# Define the expected directory name for the fine-tuned model
model_dir = "fine_tuned_bart_model"

# Check if the model directory exists in the current directory
if model_dir in contents:
    print(f"The model directory '{model_dir}' is found in the current directory.")
    # List the contents of the model directory
    model_contents = os.listdir(os.path.join(current_dir, model_dir))
    print(f"Contents of {model_dir}: {model_contents}")
else:
    print(f"The model directory '{model_dir}' is not found in the current directory. Please check the path.")

# Generate summaries

In [None]:
# Complete summarization
def generate_complete(text_list, model, tokenizer, n_beams, min_len, max_len):

    summary_list = list()

    for article in text_list:

        # New article to summarize
        new_article = article
        
        # Tokenize the new article and move the input tensors to the device
        inputs = tokenizer(new_article, max_length=1024, truncation=True, return_tensors="pt")
        inputs = {key: value.to(device) for key, value in inputs.items()}
        
        # Generate Summary
        summary_ids = model.generate(inputs["input_ids"], num_beams=n_beams, min_length=min_len, max_length=max_len)
        
        # Decode Summary
        summary = tokenizer.batch_decode(summary_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0]
    
        summary_list.append(summary)

    return summary_list

# Step-by-step summarization
def generate_sbs(text_list, model, n_beams, min_len, max_len):

    summary_list = list()
    
    # Iterate through the preprocessed_test list
    for i, text_dict in enumerate(text_list):
    
        summarized_body = ""
    
        # Add the abstract to summarized_body if it exists
        if 'abstract' in text_dict:
            summarized_body += text_dict['abstract']
        
        # Summarize each section except for the excluded keys
        for section_key, section_value in text_dict.items():
            if section_key not in ['abstract', 'main_body', 'complete_text', 'summary']:
                inputs = tokenizer(section_value, max_length=1024, truncation=True, return_tensors="pt")
                inputs = {key: value.to(device) for key, value in inputs.items()}
                summary_ids = model.generate(inputs["input_ids"], num_beams=n_beams, min_length=min_len, max_length=max_len)
                section_summary = tokenizer.batch_decode(summary_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0]
                # Append the section summary to summarized_body
                summarized_body += section_summary
    
        # Generate a final summary for summarized_body
        final_inputs = tokenizer(summarized_body, max_length=1024, truncation=True, return_tensors="pt")
        final_inputs = {key: value.to(device) for key, value in final_inputs.items()}
        final_summary_ids = model.generate(final_inputs["input_ids"], num_beams=n_beams, min_length=min_len, max_length=max_len)
        final_summary = tokenizer.batch_decode(final_summary_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0]
    
        summary_list.append(final_summary)

    return summary_list

## 1) Generate Summaries Using The Pre-Trained Model

In [None]:
# Check if CUDA is available and set the device accordingly
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")

# Load the fine-tuned BART model and tokenizer
model_name = "facebook/bart-large-cnn"
model = BartForConditionalGeneration.from_pretrained(model_name).to(device)
tokenizer = BartTokenizer.from_pretrained(model_name)

# Define model parameters
n_beams = 5
min_len = 600
max_len = 800

pretrained_complete = generate_complete(preprocessed_test, model, tokenizer, n_beams, min_len, max_len)
pretrained_sbs = generate_sbs(preprocessed_test, model, tokenizer, n_beams, min_len, max_len)

## 2) Generate Summaries Using The Fine-Tuned Model

In [None]:
# Check if CUDA is available and set the device accordingly
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")

# Load the fine-tuned BART model and tokenizer
model_name = "fine_tuned_bart_model"
model = BartForConditionalGeneration.from_pretrained(model_name).to(device)
tokenizer = BartTokenizer.from_pretrained(model_name)

# Define model parameters
n_beams = 5
min_len = 600
max_len = 800

finetuned_complete = generate_complete(preprocessed_test, model, tokenizer, n_beams, min_len, max_len)
finetuned_sbs = generate_sbs(preprocessed_test, model, tokenizer, n_beams, min_len, max_len)

# Summary Evaluation

## 1) Factuality Score

In [None]:
import torch
import torch.nn as nn
import traceback
from transformers import BartTokenizer, BartForConditionalGeneration
from typing import List
import numpy as np


class BARTScorer:
    def __init__(self, device='cuda:0', max_length=1024, checkpoint='facebook/bart-large-cnn'):
        # Set up model
        self.device = device
        self.max_length = max_length
        self.tokenizer = BartTokenizer.from_pretrained(checkpoint)
        self.model = BartForConditionalGeneration.from_pretrained(checkpoint)
        self.model.eval()
        self.model.to(device)

        # Set up loss
        self.loss_fct = nn.NLLLoss(reduction='none', ignore_index=self.model.config.pad_token_id)
        self.lsm = nn.LogSoftmax(dim=1)

    def load(self, path=None):
        """ Load model from paraphrase finetuning """
        if path is None:
            path = 'models/bart.pth'
        self.model.load_state_dict(torch.load(path, map_location=self.device))

    def score(self, srcs, tgts, batch_size=4):
        """ Score a batch of examples """
        score_list = []
        for i in range(0, len(srcs), batch_size):
            src_list = srcs[i: i + batch_size]
            tgt_list = tgts[i: i + batch_size]
            try:
                with torch.no_grad():
                    encoded_src = self.tokenizer(
                        src_list,
                        max_length=self.max_length,
                        truncation=True,
                        padding=True,
                        return_tensors='pt'
                    )
                    encoded_tgt = self.tokenizer(
                        tgt_list,
                        max_length=self.max_length,
                        truncation=True,
                        padding=True,
                        return_tensors='pt'
                    )
                    src_tokens = encoded_src['input_ids'].to(self.device)
                    src_mask = encoded_src['attention_mask'].to(self.device)

                    tgt_tokens = encoded_tgt['input_ids'].to(self.device)
                    tgt_mask = encoded_tgt['attention_mask']
                    tgt_len = tgt_mask.sum(dim=1).to(self.device)

                    output = self.model(
                        input_ids=src_tokens,
                        attention_mask=src_mask,
                        labels=tgt_tokens
                    )
                    logits = output.logits.view(-1, self.model.config.vocab_size)
                    loss = self.loss_fct(self.lsm(logits), tgt_tokens.view(-1))
                    loss = loss.view(tgt_tokens.shape[0], -1)
                    loss = loss.sum(dim=1) / tgt_len
                    curr_score_list = [-x.item() for x in loss]
                    score_list += curr_score_list

            except RuntimeError:
                traceback.print_exc()
                print(f'source: {src_list}')
                print(f'target: {tgt_list}')
                exit(0)
        return score_list

    def multi_ref_score(self, srcs, tgts: List[List[str]], agg="mean", batch_size=4):
        # Assert we have the same number of references
        ref_nums = [len(x) for x in tgts]
        if len(set(ref_nums)) > 1:
            raise Exception("You have different number of references per test sample.")

        ref_num = len(tgts[0])
        score_matrix = []
        for i in range(ref_num):
            curr_tgts = [x[i] for x in tgts]
            scores = self.score(srcs, curr_tgts, batch_size)
            score_matrix.append(scores)
        if agg == "mean":
            score_list = np.mean(score_matrix, axis=0)
        elif agg == "max":
            score_list = np.max(score_matrix, axis=0)
        else:
            raise NotImplementedError
        return list(score_list)

    def test(self, batch_size=3):
        """ Test """
        src_list = [
            'This is a very good idea. Although simple, but very insightful.',
            'Can I take a look?',
            'Do not trust him, he is a liar.'
        ]

        tgt_list = [
            "That's stupid.",
            "What's the problem?",
            'He is trustworthy.'
        ]

        print(self.score(src_list, tgt_list, batch_size))

In [None]:
max_len = 800

# 1) Pre-trained summaries
pre_model = 'facebook/bart-large-cnn'
# Create an instance of BARTScorer
bart_scorer = BARTScorer(device='cuda:0', max_length=max_len, checkpoint=pre_model)

# Score the generated summaries against the target summaries
pretrained_complete_scores = bart_scorer.score(pretrained_complete, list(test_label))
pretrained_sbs_scores = bart_scorer.score(pretrained_sbs, list(test_label))
print("Pre-trained Factuality Scores:")
print(f"Complete Summaries: {pretrained_complete_scores:.3f}")
print(f"Step-by-step Summaries: {pretrained_sbs_scores:.3f}")

# 2) Fine-tuned summaries
tuned_model = 'fine_tuned_bart_model'
# Create an instance of BARTScorer
bart_scorer = BARTScorer(device='cuda:0', max_length=max_len, checkpoint=tuned_model)

# Score the generated summaries against the target summaries
finetuned_complete_scores = bart_scorer.score(finetuned_complete, list(test_label))
finetuned_sbs_scores = bart_scorer.score(finetuned_sbs, list(test_label))
print("Pre-trained Factuality Scores:")
print(f"Complete Summaries: {finetuned_complete_scores:.3f}")
print(f"Step-by-step Summaries: {finetuned_sbs_scores:.3f}")

## 2) Relevance and Readibility 

In [None]:
from rouge_score import rouge_scorer
from bert_score import score as bert_score
from readability import Readability

def relevance_readibility(generated_list, target_list):

    text_results = list()

    for i in range(len(generated_list)):

        generated_sum = generated_list[i]
        target_sum = test_label[i]

        # 1) Relevance
        rouge = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)
        rouge_scores = rouge.score(generated_sum, target_sum)

        # Extract F-measures for ROUGE-1, ROUGE-2, and ROUGE-L
        rouge1_fmeasure = rouge_scores['rouge1'].fmeasure
        rouge2_fmeasure = rouge_scores['rouge2'].fmeasure
        rougel_fmeasure = rouge_scores['rougeL'].fmeasure

        # Calculate BERT score
        P, R, F1 = bert_score([generated_sum], [target_sum], lang='en', model_type='roberta-large')
        # F-measure is at index 2
        bert_fmeasure = F1[0].item()

        # 2) Readability
        readability_target = Readability(target_sum)
        fkgl_target = readability_target.flesch_kincaid().score

        # Calculate FKGL for generated summaries
        readability_generated = Readability(generated_sum)
        fkgl_generated = readability_generated.flesch_kincaid().score

        # Compare readability of generated summaries with target summary
        # Calculate absolute difference
        diff_A = abs(fkgl_generated - fkgl_target)

        result_dict = {
            "Rouge-1 F-measure": rouge1_fmeasure,
            "Rouge-2 F-measure": rouge2_fmeasure,
            "Rouge-L F-measure": rougel_fmeasure,
            "Bert Score F-measure": bert_fmeasure,
            "Readibility Difference": diff_A,
        }

        # Append result dictionary to complete_text_results list
        text_results.append(result_dict)

    # Convert the list of dictionaries to a DataFrame
    evaluation_df = pd.DataFrame(text_results)

    return evaluation_df

In [None]:
pretrained_complete_df = relevance_readibility(pretrained_complete, list(test_label))
pretrained_sbs_df = relevance_readibility(pretrained_sbs, list(test_label))
finetuned_complete_df = relevance_readibility(finetuned_complete, list(test_label))
finetuned_sbs_df = relevance_readibility(finetuned_sbs, list(test_label))

# END