## Document Summarization Using Pegasus
This notebook implements a document summarization pipeline using Google's Pegasus model. It extracts text from various document formats, processes it for summarization, and generates concise summaries using Pegasus.


In [None]:
import torch
device = "mps" if torch.backends.mps.is_available() else "cpu"

Checks if Apple's Metal Performance Shaders (MPS) backend is available for running the model on Mac's GPU; otherwise, it defaults to CPU.

In [None]:
import os
import subprocess
import logging
import PyPDF2
import pytesseract
import docx
from bs4 import BeautifulSoup
from pdf2image import convert_from_path

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('text_extractor')

class TextExtractor:
    """A utility for extracting text from various document formats."""
    
    SUPPORTED_FORMATS = {
        "pdf": lambda path: extract_text_from_pdf(path).replace("\n", " "),
        "docx": lambda path: extract_text_from_docx(path).replace("\n", " "),
        "doc": lambda path: extract_text_from_doc(path).replace("\n", " "),
        "txt": lambda path: extract_text_from_txt(path).replace("\n", " "),
        "html": lambda path: extract_text_from_html(path).replace("\n", " "),
    }
    
    @classmethod
    def extract(cls, file_path: str, ocr_if_needed: bool = True) -> str:
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"File not found: {file_path}")

        _, ext = os.path.splitext(file_path)
        ext = ext[1:].lower() if ext else ""
        
        if not ext:
            raise ValueError("Cannot determine file type (no extension)")
            
        if ext not in cls.SUPPORTED_FORMATS:
            raise ValueError(
                f"Unsupported file format: {ext}. Supported formats: {', '.join(cls.SUPPORTED_FORMATS.keys())}"
            )
        
        try:
            if ext == "pdf":
                text = extract_text_from_pdf(file_path).replace("\n", " ")
                if not text.strip() and ocr_if_needed:
                    logger.info(f"No text found in PDF, attempting OCR: {file_path}")
                    text = extract_text_from_scanned_pdf(file_path).replace("\n", " ")
            else:
                text = cls.SUPPORTED_FORMATS[ext](file_path)
                
            return text.strip()
            
        except Exception as e:
            logger.error(f"Error extracting text from {file_path}: {str(e)}")
            raise RuntimeError(f"Failed to extract text: {str(e)}")

def extract_text_from_pdf(pdf_path: str) -> str:
    text = ""
    try:
        with open(pdf_path, "rb") as file:
            reader = PyPDF2.PdfReader(file)
            for page in reader.pages:
                extracted_text = page.extract_text()
                if extracted_text:
                    text += extracted_text + " "
    except Exception as e:
        logger.error(f"Error opening PDF file: {str(e)}")
        raise
    
    return text.strip()

def extract_text_from_scanned_pdf(pdf_path: str, dpi: int = 300, lang: str = 'eng') -> str:
    try:
        images = convert_from_path(pdf_path, dpi=dpi)
        text_parts = [pytesseract.image_to_string(img, lang=lang).strip() for img in images]
        return " ".join(text_parts)
    except Exception as e:
        logger.error(f"Error during OCR processing: {str(e)}")
        raise RuntimeError(f"OCR processing failed: {str(e)}")

def extract_text_from_docx(docx_path: str) -> str:
    try:
        doc = docx.Document(docx_path)
        return " ".join(para.text.strip() for para in doc.paragraphs if para.text.strip())
    except Exception as e:
        logger.error(f"Error extracting DOCX content: {str(e)}")
        raise

def extract_text_from_doc(doc_path: str) -> str:
    try:
        result = subprocess.run(["antiword", doc_path], capture_output=True, text=True, check=True)
        return result.stdout.strip().replace("\n", " ")
    except FileNotFoundError:
        raise RuntimeError("antiword is not installed. Install it to process DOC files.")
    except subprocess.CalledProcessError as e:
        raise RuntimeError(f"antiword failed with error code {e.returncode}: {e.stderr}")

def extract_text_from_txt(txt_path: str) -> str:
    encodings = ['utf-8', 'latin-1', 'windows-1252']
    for encoding in encodings:
        try:
            with open(txt_path, "r", encoding=encoding) as file:
                return file.read().strip().replace("\n", " ")
        except UnicodeDecodeError:
            continue
    raise RuntimeError(f"Failed to decode file with supported encodings: {encodings}")

def extract_text_from_html(html_path: str) -> str:
    try:
        with open(html_path, "r", encoding="utf-8") as file:
            soup = BeautifulSoup(file, "html.parser")
            return soup.get_text(separator=" ").strip()
    except UnicodeDecodeError:
        with open(html_path, "r", encoding="latin-1") as file:
            soup = BeautifulSoup(file, "html.parser")
            return soup.get_text(separator=" ").strip()

def extract_text(file_path: str) -> str:
    return TextExtractor.extract(file_path)


The above function defines a text extraction utility that processes different document formats (PDF, DOCX, DOC, TXT, and HTML) to extract their textual content. It leverages various libraries such as PyPDF2 for PDFs, pytesseract for OCR on scanned PDFs, docx for Word documents, BeautifulSoup for HTML parsing, and subprocess for handling .doc files using antiword. The TextExtractor class manages supported formats through a dictionary mapping file extensions to extraction functions. The extract method determines the file type, validates its existence, and applies the appropriate extraction method, logging errors when necessary. If a PDF lacks extractable text, the script attempts OCR-based text extraction using pytesseract and pdf2image. The utility also includes robust error handling for missing files, unsupported formats, and encoding issues in text files. The script is designed for flexibility and scalability, making it useful for document processing applications.

In [None]:
from transformers import PegasusTokenizer
import re
import nltk
from bs4 import BeautifulSoup
from nltk.tokenize import sent_tokenize

nltk.download("punkt")
model_name = "google/pegasus-large"
tokenizer = PegasusTokenizer.from_pretrained(model_name)


def preprocess_for_pegasus(text, max_length=1024):
    """
    Prepares text for Pegasus summarization.
    
    Args:
        text (str): Raw extracted text.
        max_length (int): Maximum token length for Pegasus.
        
    Returns:
        list: List of text chunks that fit within the token limit.
    """
    # Remove HTML tags
    text = BeautifulSoup(text, "html.parser").get_text()
    # Remove URLs
    text = re.sub(r"http\S+|www\S+|https\S+", "", text, flags=re.MULTILINE)
    # Normalize whitespace
    text = re.sub(r"\s+", " ", text).strip()
    # Sentence tokenization
    sentences = sent_tokenize(text)
    
    chunks = []
    current_chunk = []
    current_length = 0
    for sentence in sentences:
        # Get token count for the sentence without special tokens
        tokenized_sentence = tokenizer.encode(sentence, add_special_tokens=False)
        token_count = len(tokenized_sentence)
        # If adding this sentence exceeds the limit, store the current chunk and reset
        if current_length + token_count > max_length:
            chunks.append(" ".join(current_chunk))
            current_chunk = [sentence]
            current_length = token_count
        else:
            current_chunk.append(sentence)
            current_length += token_count
    # Append the final chunk if non-empty
    if current_chunk:
        chunks.append(" ".join(current_chunk))
    return chunks

[nltk_data] Downloading package punkt to /Users/nelson/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


- Cleans and tokenizes text before passing it to the Pegasus model.
- Splits long text into chunks (ensuring each fits within the token limit).

In [None]:
from transformers import PegasusForConditionalGeneration, PegasusTokenizer


def summarize_text_with_pegasus(text, tokenizer, model, device,
                                max_input_length=1024,
                                max_output_length=256):
    """
    Summarizes long text using Google Pegasus.

    Args:
        text (str): The full input text.
        tokenizer: The Pegasus tokenizer.
        model: The Pegasus model.
        device: The device to run inference on (e.g., "cuda" or "cpu").
        max_input_length (int): Maximum input token length.
        max_output_length (int): Maximum output token length.

    Returns:
        tuple: (generated summary (str))
    """
    text_chunks = preprocess_for_pegasus(text, max_length=max_input_length)
    summaries = []

    for idx, chunk in enumerate(text_chunks):
        if not chunk.strip():
            print(f"Skipping empty chunk {idx}.")
            continue

        # Prepend the "summarize:" prompt if desired (based on how the model was trained)
        input_text = "summarize: " + chunk.strip()

        # Tokenize input text with truncation
        inputs = tokenizer(input_text, return_tensors="pt", truncation=True, max_length=max_input_length)
        inputs = {k: v.to(device) for k, v in inputs.items()}

        if inputs["input_ids"].shape[1] == 0:
            print("⚠️ Skipping empty tokenized chunk.")
            continue

        try:
            # Generate summary for the current chunk
            summary_ids = model.generate(
                input_ids=inputs["input_ids"],
                attention_mask=inputs["attention_mask"],
                max_length=max_output_length,
                num_beams=5,
                early_stopping=True
            )
            summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True)
            summaries.append(summary)
        except Exception as e:
            print(f"⚠️ Error processing chunk {idx}: {e}")
            continue

    full_summary = " ".join(summaries) if summaries else "No summary generated."
    
   
    
    return full_summary

1. **Preprocessing the Input Text**
- Calls preprocess_for_pegasus(text, max_length=1024) to split long text into smaller chunks.
- Ensures that each chunk does not exceed the model’s token limit.
- Stores the processed text chunks in text_chunks.
2. **Iterating Through Text Chunks**
- Loops over each chunk using enumerate(text_chunks).
- Skips empty chunks with if not chunk.strip() to avoid processing unnecessary data.
- Prepares input text by adding the "summarize:" prefix, as Pegasus may require prompts.
3. **Tokenizing and Moving to Device**
- Converts each chunk into tokenized form using tokenizer().
- Uses PyTorch tensors (return_tensors="pt") for model compatibility.
- Moves the tokenized input to the specified device (CPU/GPU).
4. **Generating the Summary**
- Calls model.generate() to produce the summarized text.
- Uses:
	- num_beams=5 for better quality results.
	- max_length=256 to limit the summary size.
	- early_stopping=True to prevent unnecessary generation.
- Decodes and appends the generated summary.
5. **Handling Errors and Returning Output**
- Handles exceptions to prevent crashes when processing large texts.
- Concatenates all summary chunks into a single final summary.
- Returns the summarized text or "No summary generated." if errors occur.

In [6]:
from textstat import flesch_reading_ease
import evaluate

# Load Pegasus model & tokenizer 
model_name = "google/pegasus-large"
tokenizer = PegasusTokenizer.from_pretrained(model_name)
model = PegasusForConditionalGeneration.from_pretrained(model_name).to(device)

ref  = " Seeking a position to apply my comprehensive drink knowledge and service skills to enhance the bar program and guest experience in a professional establishment.Profile Work Experience Bartender"
raw_text = extract_text("Bartender.pdf")

# Generate summary
summary = summarize_text_with_pegasus(
    text=raw_text,
    tokenizer=tokenizer,
    model=model,
    device=device,
    max_input_length=1024,
    max_output_length=256
)

bertscore = evaluate.load("bertscore")
bertscore_results = bertscore.compute(
    predictions=[summary],  # Must be a list
    references=[ref],       # Must be a list
    lang="en"
)

# Compute compression ratio
original_length = len(raw_text.split())
summary_length = len(summary.split())
compression_ratio = original_length / summary_length if summary_length > 0 else 0

# Compute readability score
readability = flesch_reading_ease(summary)

# Print results
print("Evaluation Metrics:")
print(f"BERT Scores:-  Precision: {bertscore_results['precision'][0]:.4f} Recall: {bertscore_results['recall'][0]:.4f} F1_score: {bertscore_results['f1'][0]:.4f}")
print(f"Compression Ratio: {compression_ratio:.2f}")
print(f"Readability Score: {readability:.2f}")

print("Generated Summary:", summary)

2025-03-06 11:06:20,864 - datasets - INFO - PyTorch version 2.6.0 available.
Some weights of PegasusForConditionalGeneration were not initialized from the model checkpoint at google/pegasus-large and are newly initialized: ['model.decoder.embed_positions.weight', 'model.encoder.embed_positions.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of RobertaModel were not initialized from the model checkpoint at roberta-large and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Evaluation Metrics:
BERT Scores:-  Precision: 0.8837 Recall: 0.9849 F1_score: 0.9316
Compression Ratio: 5.10
Readability Score: 22.75
Generated Summary: Seeking a position to apply my comprehensive drink knowledge and service skills to enhance the bar program and guest experience in a professional establishment.Profile Work Experience Bartender | Forge (Bank) | 26/07/2023 – present Prepared and presented a wide array of cocktails (30+), mocktails, and speciality drinks with precision.


1. **Loads Pegasus Model**
– Imports necessary libraries and loads "google/pegasus-large" for text summarization.

2. **Extracts Text** 
– Reads text from "Bartender.pdf" using extract_text().

3. **Generates Summary** 
– Uses summarize_text_with_pegasus() to summarize the extracted text.

4. **Evaluates Summary** 
– Computes BERTScore (Precision, Recall, F1-score) by comparing the summary with a reference text.

5. **Computes Metrics** 
– Calculates compression ratio (original vs. summary length) and readability score, then prints all results. 🚀