## Academic Paper PDF Summarizer


In [12]:
import os
import re
import torch
import pandas as pd
import numpy as np
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

from torch.optim import AdamW
from torch.utils.data import Dataset, DataLoader
from rouge_score import rouge_scorer

### PDF Processing Libraries


In [13]:
import PyPDF2
from PyPDF2 import PdfReader

### NLP and Transformers


In [14]:
import nltk
from nltk.tokenize import sent_tokenize
from transformers import (
    BartTokenizer, 
    BartForConditionalGeneration,
    T5Tokenizer,
    T5ForConditionalGeneration,
    get_linear_schedule_with_warmup
)

## Configuration


In [15]:
torch.manual_seed(42)
np.random.seed(42)

In [16]:
torch.backends.cudnn.deterministic = True

if torch.cuda.is_available():
  torch.cuda.manual_seed_all(42)

In [17]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


## Data Collection and Preprocessing

### PDF Extraction Functions


In [18]:
from typing import Union
from pathlib import Path

In [19]:
def extract_text_from_pdf(pdf_path: Union[str, Path]) -> str:
  """
  Extract text from a PDF file using PyPDF2.

  Args: 
    pdf_path (Union[str, Path]): Path to the PDF file
  
  Returns:
    str: Extracted text from the PDF
  """

  try:
    with open(pdf_path, 'rb') as file:
      reader = PdfReader(file)
      text = ""

      for page in reader.pages:
        page_text = page.extract_text()

        if page_text:
          text += page_text + "\n"
        
        return text

  except Exception as e:
    print(f"Error extracting text from PDF: {e}")
    return ""

### Text Cleaning and Section Extraction


In [20]:
def clean_academic_paper(text: str) -> str:
  """
  Clean extracted text from academic papers

  Args:
    text (str): Extracted text from PDF

  Returns:
    str: Cleaned text
  """

  text = re.sub(r'\n\s*\d+\s*\n', '\n', text)

  # Remove headers and footers
  lines = text.split('\n')
  filtered_lines = []
  header_footer_patterns = set()

  line_counts = {}
  for line in lines:
    line = line.strip()

    if line:
      if line in line_counts:
        line_counts[line] += 1

      else:
        line_counts[line] = 1

  # Consider lines appearing more than twice as header/footers
  for line, count in line_counts.items():
    if count > 2 and len(line) < 100:
      header_footer_patterns.add(line)

  # Filter out headers/footers
  for line in lines:
    if line.strip() not in header_footer_patterns:
      filtered_lines.append(line)

  cleaned_text = "\n".join(filtered_lines)
  cleaned_text = re.sub(r" +", ' ', cleaned_text)
  cleaned_text = re.sub(r"\n+", '\n', cleaned_text)

  return cleaned_text

In [21]:
def extract_sections(text: str) -> dict:
  """
  Extract main sections from an academic paper

  Args:
    text (str): Cleaned text from academic paper

  Returns:
    dict: Dictionary containing sections
  """

  section_patterns = {
        'abstract': r'abstract',
        'introduction': r'introduction|background',
        'methodology': r'methodology|method|materials and methods|experimental',
        'results': r'results|findings',
        'discussion': r'discussion',
        'conclusion': r'conclusion|summary|future work'
    }
  
  sections = {}

  lines = text.split("\n")
  current_section = 'preamble'
  sections[current_section] = []

  for line in lines:
    section_found = False

    for section_name, pattern in section_patterns.items():
      if re.search(pattern, line.lower()):
        current_section = section_name
        sections[current_section] = []
        section_found = True

        break

      if not section_found:
        sections[current_section].append(line)

  for section in sections:
    sections[section] = '\n'.join(sections[section])

  return sections

### Loading PDFs


In [22]:
def load_papers_from_directory(directory_path: Union[str, Path]):
  """
  Load and process academic papers from a directory

  Args:
    directory_path (Union[str, Path]): Path to directory containing PDFs

  Returns:
    list: List of processed paper
  """

  paper_files = [f for f in os.listdir(directory_path) if f.endswith('.pdf')]
  papers = []

  for paper_file in tqdm(paper_files, desc="Processing PDFs"):
    paper_path = os.path.join(directory_path, paper_file)
    text = extract_text_from_pdf(paper_path)

    if text:
      cleaned_text = clean_academic_paper(text)
      sections = extract_sections(cleaned_text)

      papers.append({
        'filename': paper_file,
        'full_text': cleaned_text,
        'sections': sections,
        'summary': sections.get('abstract', '')
      })

  return papers

## 2. Model Architecture and Dataset Preparation


### 2.1 Custom Dataset Class


In [24]:
class AcademicPaperDataset(Dataset):
  def __init__(self, papers, summaries, tokenizer, max_input_length: int = 1024, max_target_length: int = 256):
    self.papers = papers
    self.summaries = summaries
    self.tokenizer = tokenizer
    self.max_input_length = max_input_length
    self.max_target_length = max_target_length

  def __len__(self):
    return len(self.papers)
  
  def __getitem__(self, idx: int):
    paper = self.papers[idx]
    summary = self.summaries[idx]

    inputs = self.tokenizer(
      paper,
      max_length=self.max_input_length,
      padding="max_length",
      truncation=True,
      return_tensors="pt"
    )
    
    targets = self.tokenizer(
      summary,
      max_length=self.max_input_length,
      padding="max_length",
      truncation=True,
      return_tensors="pt"
    )

    input_ids = inputs.input_ids.squeeze()
    attention_mask = inputs.attention_mask.squeeze()
    labels = targets.inputs_ids.squeeze()

    labels[labels == self.tokenizer.pad_token_id] = -100
    
    return {
      "input_ids": input_ids,
      "attention_mask": attention_mask,
      "labels": labels
    }

### 2.2 Initialize Model and Tokenizer Function


In [25]:
def initialize_model(model_type="bart", pretrained_model=None):
  """
  Initialize a summarization model and tokenizer

  Args:
    model_type (str): Type of model to use ("bart" or "t5")
    pretrained_model (str): Path to pretrained model (optional)

  Returns:
    tuple: (model, tokenizer)
  """

  if model_type.lower() == "bart":
    model_name = "facebook/bart-large-cnn"
    tokenizer = BartTokenizer.from_pretrained(model_name)

    if pretrained_model:
      model = BartForConditionalGeneration.from_pretrained(pretrained_model)

    else:
      model = BartForConditionalGeneration.from_pretrained(model_name)

  elif model_type.lower() == "t5":
    model_name = "t5-base"
    tokenizer = T5Tokenizer.from_pretrained(model_name)

    if pretrained_model:
      model = T5ForConditionalGeneration.from_pretrained(pretrained_model)

    else:
      model = T5ForConditionalGeneration.from_pretrained(model_name)

  else:
    raise ValueError(f"Unsupported model type: {model_type}. Use 'bart' or 't5'.")
  
  model = model.to(device)
  return model, tokenizer

## 3. Training the model


### 3.1 Training Function


In [26]:
def train_model(model, train_dataloader, val_dataloader, tokenizer,
                epochs=3, lr=2e-5, warmup_steps=500,
                eval_steps=100, save_path="model_checkpoints"):
  """
  Train the summarization model

  Args:
        model: The model to train
        train_dataloader: DataLoader for training data
        val_dataloader: DataLoader for validation data
        tokenizer: Tokenizer for decoding predictions
        epochs: Number of training epochs
        lr: Learning rate
        warmup_steps: Number of warmup steps for scheduler
        eval_steps: Evaluate model every eval_steps steps
        save_path: Path to save model checkpoints
        
    Returns:
        tuple: (Trained model, training history)
  """

  os.makedirs(save_path, exist_ok=True)

  optimizer = AdamW(model.parameters(), lr=lr)
  total_steps = len(train_dataloader) * epochs

  scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=warmup_steps,
    num_training_steps=total_steps
  )

  scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)
  history = {
    'train_loss': [],
    'val_rouge1': [],
    'val_rouge2': [],
    'val_rougeL': []
  }

  global_step = 0
  best_rouge_l = 0.0

  for epoch in range(epochs):
    print(f"Epoch {epoch + 1} / {epochs}")

    model.train()
    epoch_loss = 0
    progress_bar = tqdm(train_dataloader, desc=f"Training epoch {epoch + 1}")

    for batch in progress_bar:
      input_ids = batch["input_ids"].to(device)
      attention_mask = batch["attention_mask"].to(device)
      labels = batch["labels"].to(device)

      # Clear previous gradients
      optimizer.zero_grad()

      # Forward pass
      outputs = model(
        input_ids=input_ids,
        attention_mask=attention_mask,
        labels=labels
      )

      loss = outputs.loss
      epoch_loss += loss.item()

      # Updates
      progress_bar.set_postfix({'loss': loss.item()})
      loss.backward()

      torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
      optimizer.step()
      scheduler.step()

      global_step += 1

      # Evaluate during training
      if global_step % eval_steps == 0:
        print(f"\nEvaluating at step {global_step}")
        rouge_scores = evaluate_model(model, val_dataloader, tokenizer, scorer)

        # Print scores
        print(f"ROUGE-1: {rouge_scores['rouge1']:.4f}")
        print(f"ROUGE-2: {rouge_scores['rouge2']:.4f}")
        print(f"ROUGE-L: {rouge_scores['rougeL']:.4f}")

        # Update history
        history['val_rouge1'].append((global_step, rouge_scores['rouge1']))
        history['val_rouge2'].append((global_step, rouge_scores['rouge2']))
        history['val_rougeL'].append((global_step, rouge_scores['rougeL']))

        # Save best model
        if rouge_scores["rougeL"] > best_rouge_l:
          best_rouge_l = rouge_scores['rougeL']
          model_path = os.path.join(save_path, f'best_model_step_{global_step}.pt')
          torch.save(model.state_dict(), model_path)
          print(f"Saved best model to {model_path}")
                
          # Switch back to training mode
          model.train()

      avg_loss = epoch_loss / len(train_dataloader)
      history['train_loss'].append((global_step, avg_loss))
      print(f"Average training loss: {avg_loss:.4f}")

      # Save epoch checkpoint
      model_path = os.path.join(save_path, f'model_epoch_{epoch+1}.pt')
      torch.save(model.state_dict(), model_path)
      print(f"Saved epoch checkpoint to {model_path}")

    return model, history

### 3.2 Evaluation Function


In [27]:
def evaluate_model(model, dataloader, tokenizer, scorer=None):
  """
  Evaluate the model using ROUGE scores.
  
  Args:
      model: The model to evaluate
      dataloader: DataLoader with evaluation data
      tokenizer: Tokenizer for decoding predictions
      scorer: Rouge scorer (if None, will create a new one)
      
  Returns:
      dict: Dictionary with ROUGE scores
  """
  
  model.eval()
  if scorer is None:
    scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)
    
  rouge_scores = []
  with torch.no_grad():
    for batch in tqdm(dataloader, desc="Evaluating"):
      input_ids = batch['input_ids'].to(device)
      attention_mask = batch["attention_mask"].to(device)
      labels = batch["labels"].to(device)

      summary_ids = model.generate(
        input_ids=input_ids,
        attention_mask=attention_mask,
        max_length=150,
        min_length=40,
        length_penalty=2.0,
        num_beams=4,
        early_stopping=True
      )

      # Convert to text
      predictions = [tokenizer.decode(g, skip_special_tokens=True) for g in summary_ids]

      # Get reference summaries (labels)
      references = []
      for label in labels:
        label = torch.where(label != -100, label, tokenizer.pad_token_id)
        references.append(tokenizer.decode(label, skip_special_tokens=True))

      # Calculate ROUGE scores
      for pred, ref in zip(predictions, references):
        score = scorer.score(ref, pred)
        rouge_scores.append(score)

  # Calculate avg scores
  avg_rouge1 = sum(score['rouge1'].fmeasure for score in rouge_scores) / len(rouge_scores)
  avg_rouge2 = sum(score['rouge2'].fmeasure for score in rouge_scores) / len(rouge_scores)
  avg_rougeL = sum(score['rougeL'].fmeasure for score in rouge_scores) / len(rouge_scores)
    
  return {
    'rouge1': avg_rouge1,
    'rouge2': avg_rouge2,
    'rougeL': avg_rougeL
  }

### 3.3 Long Document Handling Function


In [28]:
def process_long_document(document, model, tokenizer, max_length=1024, overlap=256):
  """
  Process a long document by splitting it into chunks with overlap
  """

  model.eval()
  tokens = tokenizer.encode(document)

  if len(tokens) <= max_length:
    inputs = tokenizer(document, max_length, truncation=True, return_tensors="pt").to(device)

    summary_ids = model.generate(
      input_ids=inputs["input_ids"],
      attention_mask=inputs["attention_masks"],
      max_length=150,
      min_length=40,
      length_penalty=2.0,
      num_beams=4,
      early_stopping=True
    )

    summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True)
    return summary
  
  chunks = []
  for i in range(0, len(tokens), max_length - overlap):
    chunk = tokens[i : i + max_length]
    chunks.append(tokenizer.decode(chunk, skip_special_tokens=True))

  chunk_summaries = []  
  for chunk in chunks:
    inputs = tokenizer(chunk, max_length, truncation=True, return_tensors="pt").to(device)
    summary_ids = model.generate(
      input_ids=inputs["input_ids"],
      attention_mask=inputs["attention_masks"],
      max_length=100,
      min_length=20,
      length_penalty=2.0,
      num_beams=4,
      early_stopping=True
    )

    summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True)
    chunk_summaries.append(summary)

  combined_summary = " ".join(chunk_summaries)
  inputs = tokenizer(combined_summary, max_length, truncation=True, return_tensors="pt").to(device)

  final_summary_ids = model.generate(
    input_ids=inputs["input_ids"],
    attention_mask=inputs["attention_masks"],
    max_length=150,
    min_length=40,
    length_penalty=2.0,
    num_beams=4,
    early_stopping=True
  )

  final_summary = tokenizer.decode(final_summary_ids[0], skip_special_tokens=True)

  return final_summary

## 4. Visualizing Training Results


In [29]:
def plot_training_history(history, save_path=None):
  plt.figure(figsize=(12, 8))

  # Plot training loss
  plt.subplot(2, 1, 1)
  steps, losses = zip(*history['train_loss'])
  plt.plot(steps, losses)
  plt.title('Training Loss')
  plt.xlabel('Steps')
  plt.ylabel('Loss')
  
  # Plot ROUGE scores
  plt.subplot(2, 1, 2)
  steps_rouge1, rouge1_scores = zip(*history['val_rouge1'])
  steps_rouge2, rouge2_scores = zip(*history['val_rouge2'])
  steps_rougeL, rougeL_scores = zip(*history['val_rougeL'])
  
  plt.plot(steps_rouge1, rouge1_scores, label='ROUGE-1')
  plt.plot(steps_rouge2, rouge2_scores, label='ROUGE-2')
  plt.plot(steps_rougeL, rougeL_scores, label='ROUGE-L')
  plt.title('ROUGE Scores during Training')
  plt.xlabel('Steps')
  plt.ylabel('Score')
  plt.legend()
  
  plt.tight_layout()
  
  if save_path:
      plt.savefig(save_path)
  
  plt.show()

## Main Training Workflow


In [30]:
def train_academic_summarizer(data_path, output_path, model_type='bart', epochs=3, batch_size=2, max_length=1024):
  """
  Complete workflow for training an academic paper summarizer.
  
  Args:
      data_path (str): Path to directory containing PDF files
      output_path (str): Path to save model and results
      model_type (str): Type of model to use ('bart' or 't5')
      epochs (int): Number of training epochs
      batch_size (int): Batch size for training
      max_length (int): Maximum input sequence length
  """

  os.makedirs(output_path, exist_ok=True)
  papers = load_papers_from_directory(data_path)

  if not papers:
    print("No papers found or processed. Check the data path.")
    return
  
  # Prepare data from training
  paper_texts = [paper['full_text'] for paper in papers]
  paper_summaries = [paper['summary'] for paper in papers]

  train_texts, val_texts, train_summaries, val_summaries = train_test_split(
    paper_texts, paper_summaries, test_size=0.2, random_state=42
  )

  print(f"Training set: {len(train_texts)} papers")
  print(f"Validation set: {len(val_texts)} papers")

  # Initialize the model and tokenizer
  model, tokenizer = initialize_model(model_type=model_type)

  # Create datasets and dataloaders
  train_dataset = AcademicPaperDataset(
    train_texts, train_summaries, tokenizer, max_input_length=max_length
  )

  val_dataset = AcademicPaperDataset(
    val_texts, val_summaries, tokenizer, max_input_length=max_length
  )

  train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
  val_dataloader = DataLoader(val_dataset, batch_size=batch_size)

  # Train the model
  model_save_path = os.path.join(output_path, 'model_checkpoints')
  trained_model, history = train_model(
    model=model,
    train_dataloader=train_dataloader,
    val_dataloader=val_dataloader,
    tokenizer=tokenizer,
    epochs=epochs,
    save_path=model_save_path
  )

  # Plot history 
  history_plot_path = os.path.join(output_path, 'training_history.png')
  plot_training_history(history, save_path=history_plot_path)

  print("Final evaluation on validation set...")
  final_scores = evaluate_model(trained_model, val_dataloader, tokenizer)
  print(f"Final ROUGE-1: {final_scores['rouge1']:.4f}")
  print(f"Final ROUGE-2: {final_scores['rouge2']:.4f}")
  print(f"Final ROUGE-L: {final_scores['rougeL']:.4f}")
  
  # 8. Save evaluation results
  with open(os.path.join(output_path, 'evaluation_results.txt'), 'w') as f:
      f.write(f"Model type: {model_type.upper()}\n")
      f.write(f"Training epochs: {epochs}\n")
      f.write(f"Training set size: {len(train_texts)}\n")
      f.write(f"Validation set size: {len(val_texts)}\n")
      f.write(f"Final ROUGE-1: {final_scores['rouge1']:.4f}\n")
      f.write(f"Final ROUGE-2: {final_scores['rouge2']:.4f}\n")
      f.write(f"Final ROUGE-L: {final_scores['rougeL']:.4f}\n")
  
  print(f"Training completed. Model saved to {model_save_path}")
  print(f"Results saved to {output_path}")
  
  return trained_model, tokenizer, history

## 6. Inference Function


In [None]:
def load_trained_model(model_path, model_type='bart'):
  """
  Load a trained summarization model.
  
  Args:
      model_path (str): Path to the saved model
      model_type (str): Type of model ('bart' or 't5')
      
  Returns:
      tuple: (model, tokenizer)
  """
  
  model, tokenizer = initialize_model(model_type)
  model.load_state_dict(torch.load(model_path, map_location=device))
  model.to(device)
  model.eval()

  return model, tokenizer

In [None]:
def summarize_paper(pdf_path, model, tokenizer):
  """
  Summarize a single academic paper.
  
  Args:
      pdf_path (str): Path to the PDF file
      model: The summarization model
      tokenizer: The tokenizer
      
  Returns:
      dict: Dictionary with original text, sections, and summary
  """
  # Extract text from PDF
  text = extract_text_from_pdf(pdf_path)
  
  if not text:
      return {"error": "Failed to extract text from PDF"}
  
  # Clean the text
  cleaned_text = clean_academic_paper(text)
  sections = extract_sections(cleaned_text)
  summary = process_long_document(cleaned_text, model, tokenizer)
  
  return {
    "filename": os.path.basename(pdf_path),
    "full_text": cleaned_text,
    "sections": sections,
    "summary": summary
  }

In [None]:
def batch_summarize_papers(pdf_directory, model, tokenizer, output_directory=None):
  """
  Summarize multiple papers in a directory.
  
  Args:
      pdf_directory (str): Path to directory containing PDFs
      model: The summarization model
      tokenizer: The tokenizer
      output_directory (str): Path to save summaries (optional)
      
  Returns:
      list: List of dictionaries with summaries
  """
  if output_directory:
      os.makedirs(output_directory, exist_ok=True)
  
  pdf_files = [f for f in os.listdir(pdf_directory) if f.endswith('.pdf')]
  summaries = []
  
  for pdf_file in tqdm(pdf_files, desc="Summarizing papers"):
      pdf_path = os.path.join(pdf_directory, pdf_file)
      
      # Summarize the paper
      result = summarize_paper(pdf_path, model, tokenizer)
      summaries.append(result)
      
      # Save the summary if output directory is provided
      if output_directory and 'error' not in result:
          output_file = os.path.join(output_directory, f"{os.path.splitext(pdf_file)[0]}_summary.txt")
          with open(output_file, 'w', encoding='utf-8') as f:
              f.write(f"# Summary of {pdf_file}\n\n")
              f.write(result['summary'])
              f.write("\n\n# Extracted Sections\n\n")
              for section_name, section_text in result['sections'].items():
                  if section_text.strip():
                      f.write(f"## {section_name.capitalize()}\n\n")
                      f.write(section_text)
                      f.write("\n\n")
  
  return summaries