# Web-Scale PDF Processing Pipeline - Educational Example

This notebook provides a simplified version of the web-scale PDF processing pipeline based on the [Web Scale PDF Processing Pipeline](https://github.com/aisingapore/web_scale_pdf_processing_pipeline) pipeline used to extract educational web resources for pretraining large language models.

We'll break down the workflow into the following steps:
1. Setup and Environment
2. PDF Collection & Filtering
3. Quality Filtering
4. OCR Text Extraction using Marker
5. Text Post-processing

This simplified version uses a single GPU without distributed computing or Spark, perfect for educational purposes.

## 1. Setup and Environment

First, let's install the required packages. The main package we'll need is [Marker](https://github.com/VikParuchuri/marker), a GPU-accelerated OCR tool for extracting text from PDFs.

In [None]:
# Install required packages
!pip install marker-pdf pypdf pandas opencv-python openai transformers torch

In [None]:
# Import necessary libraries
import os
import re
import glob
import json
import pandas as pd
import numpy as np
from pypdf import PdfReader
from marker.convert import convert_single_pdf
from marker.models import load_all_models
from marker.output import save_markdown
import tempfile
import shutil
import uuid
import time

# Set up directories
input_dir = "./pdf_samples"  # Directory containing your PDF files
output_dir = "./output"      # Directory for outputs

# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)

## 2. PDF Collection & Filtering

In this step, we'll find all PDF files in a directory and filter them based on basic criteria like page count.

In [None]:
def list_pdf_files(directory):
    """List all PDF files in the given directory."""
    pdf_files = glob.glob(os.path.join(directory, "*.pdf"))
    return pdf_files

def get_pdf_page_count(pdf_path):
    """Get the number of pages in a PDF file."""
    try:
        with open(pdf_path, "rb") as file:
            pdf_reader = PdfReader(file)
            return len(pdf_reader.pages)
    except Exception as e:
        print(f"Error in get_pdf_page_count for {pdf_path}: {str(e)}")
        return 0

def filter_pdfs_by_page_count(pdf_files, min_pages=2, max_pages=500):
    """Filter PDFs by page count."""
    filtered_pdfs = []
    for pdf_path in pdf_files:
        page_count = get_pdf_page_count(pdf_path)
        if min_pages <= page_count <= max_pages:
            filtered_pdfs.append((pdf_path, page_count))
    return filtered_pdfs

In [None]:
# Get all PDF files
pdf_files = list_pdf_files(input_dir)
print(f"Found {len(pdf_files)} PDF files")

# Filter PDFs by page count
filtered_pdfs = filter_pdfs_by_page_count(pdf_files)
print(f"After filtering by page count: {len(filtered_pdfs)} PDFs")

# Create a DataFrame with the filtered PDFs
pdf_df = pd.DataFrame(filtered_pdfs, columns=["pdf_path", "page_count"])
pdf_df.head()

## 3. Quality Filtering

This step determines if a PDF contains relevant content for our needs. We'll implement two approaches:

1. **Rule-based filtering**: A simple approach using basic text metrics
2. **LLM-based filtering**: More sophisticated approach using language models
   - API-based models (OpenAI's GPT-4o-mini)
   - Open source models (Llama-3 8B)
   
You can choose which filtering method to use based on your needs and available resources.

In [None]:
def extract_basic_text(pdf_path):
    """Extract text from PDF using PyPDF (not as good as Marker but faster for initial filtering)."""
    try:
        with open(pdf_path, "rb") as file:
            pdf_reader = PdfReader(file)
            # Only extract from first few pages for quick filtering
            max_pages = min(5, len(pdf_reader.pages))
            text = ""
            for i in range(max_pages):
                text += pdf_reader.pages[i].extract_text() + "\n"
        return text
    except Exception as e:
        print(f"Error in extract_basic_text for {pdf_path}: {str(e)}")
        return ""

# 1. RULE-BASED FILTERING

def is_relevant_rule_based(text, keywords=None, min_text_length=100):
    """Simple relevance check based on text length and optional keywords."""
    if len(text) < min_text_length:
        return False
        
    if keywords:
        return any(keyword.lower() in text.lower() for keyword in keywords)
    
    return True

# 2. LLM-BASED FILTERING

# 2.1 OpenAI API Model (GPT-4o-mini)
def is_relevant_openai(text, api_key=None, model="gpt-4o-mini", domain="education"):
    """Use OpenAI's API to determine if a PDF is relevant for a specific domain."""
    try:
        import openai
        
        # You would need to set your API key
        if api_key:
            openai.api_key = api_key
        elif os.environ.get("OPENAI_API_KEY"):
            openai.api_key = os.environ.get("OPENAI_API_KEY")
        else:
            print("Warning: No OpenAI API key provided. Skipping OpenAI filtering.")
            return True  # Default to True if no API key
        
        client = openai.OpenAI()
        
        # Truncate the text to avoid excessive token usage
        truncated_text = text[:15000]  # Using first 15k chars, adjust as needed
        
        # Create the prompt based on the domain
        prompt = f"""You are an expert content evaluator. Your task is to determine if the following document is relevant for {domain} content.

Here is a sample of the document:

<document_sample>
{truncated_text}
</document_sample>

Is this document relevant for {domain} purposes? Answer only with 'true' or 'false'."""
        
        # Call the OpenAI API
        response = client.chat.completions.create(
            model=model,
            messages=[
                {"role": "system", "content": "You help determine if documents are relevant for specific domains."},
                {"role": "user", "content": prompt}
            ],
            temperature=0
        )
        
        # Extract the answer
        answer = response.choices[0].message.content.strip().lower()
        is_relevant = "true" in answer
        
        return is_relevant
    
    except Exception as e:
        print(f"Error in is_relevant_openai: {str(e)}")
        return True  # Default to True in case of error

# 2.2 Open Source Model (Llama-3)
def is_relevant_llama(text, domain="education"):
    """Use Llama-3 to determine if a PDF is relevant for a specific domain."""
    try:
        from transformers import AutoTokenizer, AutoModelForCausalLM
        import torch
        
        # Truncate the text to avoid excessive token usage
        truncated_text = text[:5000]  # Smaller context for local models
        
        # Load model and tokenizer (cached after first run)
        model_name = "meta-llama/Llama-3-8b-instruct"  # Or any other appropriate model
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.float16)
        
        # Move model to GPU if available
        device = "cuda" if torch.cuda.is_available() else "cpu"
        model.to(device)
        
        # Create the prompt
        prompt = f"""<|system|>
You are an expert content evaluator. You determine if documents are relevant for specific domains.
<|user|>
Is the following document relevant for {domain} content? Answer only with 'true' or 'false'.

{truncated_text}
<|assistant|>
"""
        
        # Tokenize and generate
        input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(device)
        
        with torch.no_grad():
            output = model.generate(
                input_ids,
                max_new_tokens=10,
                temperature=0,
                do_sample=False
            )
        
        # Decode the output
        response = tokenizer.decode(output[0], skip_special_tokens=True)
        
        # Extract just the assistant's response (after the prompt)
        assistant_response = response.split("<|assistant|>")[-1].strip().lower()
        
        is_relevant = "true" in assistant_response
        return is_relevant
    
    except Exception as e:
        print(f"Error in is_relevant_llama: {str(e)}")
        return True  # Default to True in case of error

In [None]:
# Extract text for filtering
pdf_df["ocr_text"] = pdf_df["pdf_path"].apply(extract_basic_text)

# Choose your filtering method
filtering_method = "rule-based"  # Options: "rule-based", "openai", "llama"

# Parameters for filtering
domain = "education"  # Target domain for content
keywords = ["education", "research", "study", "learning"]  # For rule-based filtering

# Apply the selected filtering method
if filtering_method == "rule-based":
    print("Using rule-based filtering...")
    pdf_df["is_relevant"] = pdf_df["ocr_text"].apply(lambda text: is_relevant_rule_based(text, keywords))

elif filtering_method == "openai":
    # You would need to set your API key: os.environ["OPENAI_API_KEY"] = "your-api-key"
    print("Using OpenAI API filtering...")
    # Check a small sample first (comment out for full dataset)
    sample_size = min(3, len(pdf_df))
    pdf_df = pdf_df.head(sample_size)  # For testing API usage
    pdf_df["is_relevant"] = pdf_df["ocr_text"].apply(lambda text: is_relevant_openai(text, domain=domain))

elif filtering_method == "llama":
    print("Using Llama-3 filtering...")
    # Comment out the next line for the full dataset
    pdf_df = pdf_df.head(min(3, len(pdf_df)))  # Small sample for testing
    pdf_df["is_relevant"] = pdf_df["ocr_text"].apply(lambda text: is_relevant_llama(text, domain=domain))

# Filter relevant PDFs
relevant_pdfs = pdf_df[pdf_df["is_relevant"] == True]
print(f"After relevance check: {len(relevant_pdfs)} PDFs out of {len(pdf_df)} total")
relevant_pdfs.head()

## 4. OCR Text Extraction using Marker

Now we'll use Marker, a GPU-accelerated OCR tool, to extract high-quality text from the PDFs. This is the core of the pipeline and the part that benefits most from GPU acceleration.

In [None]:
# Load all Marker models (only once)
print("Loading Marker models (this may take a while)...")
model_lst = load_all_models()
print("Models loaded!")

In [None]:
def process_pdf_with_marker(pdf_path, model_lst):
    """Process a PDF file using Marker's convert_single_pdf function and return the extracted text."""
    try:
        # Convert PDF to text using Marker
        full_text, images, out_meta = convert_single_pdf(pdf_path, model_lst)
        
        # Generate a unique filename
        unique_filename = f"{uuid.uuid4().hex}_{os.path.basename(pdf_path)}"
        
        # Save the markdown to a temporary directory
        with tempfile.TemporaryDirectory() as temp_dir:
            md_dir = save_markdown(temp_dir, unique_filename, full_text, images, out_meta)
            
            # Construct the path to the .md file inside the created directory
            md_filename = os.path.basename(md_dir) + ".md"
            md_file_path = os.path.join(md_dir, md_filename)
            
            # Read the content of the markdown file
            with open(md_file_path, "r", encoding="utf-8") as md_file:
                md_content = md_file.read()
        
        return md_content
    except Exception as e:
        print(f"Error processing PDF {pdf_path}: {str(e)}")
        return f"Error: Failed to process PDF {pdf_path}"

In [None]:
# Process a sample of PDFs (for quicker execution in this educational example)
# In practice, you might process all relevant PDFs
sample_size = min(5, len(relevant_pdfs))
sample_pdfs = relevant_pdfs.head(sample_size)

# Process each PDF with Marker
start_time = time.time()
results = []

for idx, row in sample_pdfs.iterrows():
    print(f"Processing {row['pdf_path']}...")
    md_content = process_pdf_with_marker(row['pdf_path'], model_lst)
    results.append({
        "pdf_path": row['pdf_path'],
        "page_count": row['page_count'],
        "is_relevant": row['is_relevant'],
        "md_extraction_result": md_content
    })
    
end_time = time.time()
duration = end_time - start_time
print(f"Processed {len(results)} PDFs in {duration:.2f} seconds")

# Create a DataFrame with the results
results_df = pd.DataFrame(results)
results_df.head()

## 5. Text Post-processing

Finally, we'll clean up the extracted text to make it more useful for downstream tasks like language model training.

In [None]:
def extract_meaningful_text(markdown_content):
    """Extract meaningful text from markdown content."""
    # Remove metadata and formatting
    content = re.sub(r'^---.*?---', '', markdown_content, flags=re.DOTALL)
    
    # Remove title tags but keep the title text
    content = re.sub(r'^#\s*(.*?)\n', r'\1\n', content)
    
    # Remove #### symbols but keep the header content
    content = re.sub(r'^####\s*(.*?)\n', r'\1\n', content, flags=re.MULTILINE)
    content = re.sub(r'^##\s*(.*?)\n', r'\1\n', content, flags=re.MULTILINE)
    
    # Remove bold formatting
    content = re.sub(r'\*\*.*?\*\*', '', content)
    
    # Remove math formulas
    content = re.sub(r'\$.*?\$', '', content)
    
    # Remove citations and references
    content = re.sub(r'\[.*?\]', '', content)
    content = re.sub(r'\(.*?\)', '', content)
    
    # Remove Markdown tables
    content = re.sub(r'\|[^\n]*\|(\n\|[-:| ]+\|)?(\n\|[^\n]*\|)*', '', content)
    
    # Remove Keywords section
    content = re.sub(r'Keywords:.*?(?=\n\n)', '', content, flags=re.DOTALL)
    
    # Remove extra whitespace and newlines
    content = re.sub(r'\s+', ' ', content)
    content = content.strip()
    
    return content

In [None]:
# Apply text post-processing
results_df["extracted_meaningful_text"] = results_df["md_extraction_result"].apply(extract_meaningful_text)

# Save the results to a CSV file
output_file = os.path.join(output_dir, "processed_pdfs.csv")
results_df.to_csv(output_file, index=False)

print(f"Results saved to: {output_file}")

# Display a sample of the extracted text
for idx, row in results_df.head(1).iterrows():
    print(f"Sample extracted text from {os.path.basename(row['pdf_path'])}:")
    print("-" * 80)
    print(row["extracted_meaningful_text"][:500] + "...")
    print("-" * 80)

## Conclusion

This notebook has demonstrated a simplified version of the web-scale PDF processing pipeline, focusing on the core components:

1. **PDF Collection & Filtering**: Finding and filtering PDFs based on page count
2. **Quality Filtering**: Multiple approaches including rule-based and LLM-based filtering (both API and open source models)
3. **OCR Text Extraction**: Using Marker for high-quality text extraction with GPU acceleration
4. **Text Post-processing**: Cleaning the extracted text for downstream use

The full pipeline in the `web_scale_pdf_processing_pipeline` directory includes additional components for distributed processing using Apache Spark and SLURM, which enables scaling to thousands or millions of PDFs.

### Key Differences from Full Pipeline

- **Distribution**: This simplified version runs on a single machine with one GPU, while the full pipeline can distribute work across multiple nodes and GPUs
- **Parallelism**: Our example processes PDFs sequentially, while the full pipeline uses Spark for parallel processing
- **Scale**: This example is designed for educational purposes with a small number of PDFs, while the full pipeline can handle web-scale datasets

### Tips for Quality Filtering

- **Rule-based filtering** is fast and requires no API keys or model downloads, but less accurate
- **OpenAI API filtering** provides high-quality results but requires an API key and has usage costs
- **Open source model filtering** with Llama-3 offers a balance between quality and cost, but requires GPU resources

For production use, you would want to use the full pipeline with proper distribution and parallelism for efficiency at scale.