# Module 2 — Parsing Large PDFs Efficiently

## Objectives
- Extract text from large PDFs using **PyMuPDF** efficiently. 📄
- Implement **smart chunking** with overlap and hierarchical hints (TOC, headings).
- Support multiple PDFs in a directory.
- Store **rich metadata** (doc_id, page number, section title/path, char offsets).
- **Mini Task**: Upload a long PDF → Chunk → Save with metadata. ✅

## Dependencies
- **PyMuPDF (fitz)**: PDF parsing and layout info.
- **tqdm**: progress bars.
- **Optional** (not required): `pytesseract` + `PIL` for OCR fallback when scanned pages are detected. We add a stub that only runs if available.

In [14]:
# import sys
# !{sys.executable} -m ensurepip --upgrade
# !{sys.executable} -m pip install PyMuPDF
# !{sys.executable} -m pip install tqdm
# !{sys.executable} -m pip install -q pillow pytesseract
# !{sys.executable} -m pip install ipywidgets


In [15]:
# If running in a fresh environment, uncomment:
# %pip install -q pymupdf tqdm

# Optional OCR (only if you plan to use it)
# %pip install -q pillow pytesseract

import os
import sys
import json
import math
import time
import glob
import hashlib
from pathlib import Path
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple

import fitz  # PyMuPDF
from tqdm.notebook import tqdm # Use notebook-friendly version of tqdm

try:
    from PIL import Image
    import pytesseract
    OCR_AVAILABLE = True
except Exception:
    OCR_AVAILABLE = False

print(f"Python: {sys.version.split()[0]}, PyMuPDF: {fitz.version[0]}")
print(f"OCR available: {OCR_AVAILABLE}")

Python: 3.12.9, PyMuPDF: 1.26.3
OCR available: True


## Configuration
- `INPUT_DIR`: directory of PDFs or a single PDF path.
- `OUTPUT_DIR`: where chunk JSONL files will be written.
- `Chunk params`: max_chars and overlap.
- `Heuristics`: minimum repeated header/footer frequency to strip.

In [16]:
INPUT_DIR = Path("./data/pdfs")  # set to a folder or a single file path
OUTPUT_DIR = Path("./data/outputs")

# Create directories if they don't exist
INPUT_DIR.mkdir(parents=True, exist_ok=True)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# Chunking parameters
MAX_CHARS = 1200
OVERLAP = 200

# Header/Footer detection parameters
HEADER_FOOTER_MIN_FREQ = 0.2  # if a line repeats on >=20% of pages, treat as header/footer
MIN_HEADER_FOOTER_LEN = 6

## Utilities
- Safe JSONL writer
- Token estimate (optional, for info)
- Hash helper for deduplication

In [None]:
def save_jsonl(records: List[Dict[str, Any]], out_path: Path):
    out_path.parent.mkdir(parents=True, exist_ok=True)
    with out_path.open("w", encoding="utf-8") as f:
        for r in records:
            f.write(json.dumps(r, ensure_ascii=False) + "\n")

def load_jsonl(path: Path) -> List[Dict[str, Any]]:
    items = []
    with path.open("r", encoding="utf-8") as f:
        for line in f:
            if line.strip():
                items.append(json.loads(line))
    return items

def est_tokens_from_chars(n_chars: int) -> int:
    return max(1, n_chars // 4)  # crude approximation

def text_hash(s: str) -> str:
    return hashlib.sha1(s.encode("utf-8")).hexdigest()[:16]

## Building a Table of Contents (TOC) map
- PyMuPDF `get_toc(simple=True)` returns `[level, title, page_num(1-based)]`
- We convert it to a page-indexed map of the nearest section path for each page.

In [17]:
def build_toc_map(doc: fitz.Document) -> Dict[int, Dict[str, Any]]:
    toc = doc.get_toc(simple=True) or []
    # Normalize to 0-based pages
    norm = [(lvl, title.strip(), max(0, page - 1)) for (lvl, title, page) in toc]
    
    # Build a running section path per page
    page_to_section = {}
    current_path: List[str] = []
    current_page = 0
    i = 0
    
    while current_page < doc.page_count:
        # Advance TOC entries that start on or before current_page
        while i < len(norm) and norm[i][2] <= current_page:
            lvl, title, _ = norm[i]
            # Adjust path to the current level
            current_path = current_path[:max(0, lvl - 1)]
            current_path.append(title)
            i += 1
        
        page_to_section[current_page] = {
            "section_path": current_path.copy(),
            "section_title": current_path[-1] if current_path else None
        }
        current_page += 1
        
    return page_to_section

## Extracting page text
- We use `page.get_text("text")` for performance and reliability on digital PDFs.
- We detect scanned pages if no text is returned and images exist on the page.

In [None]:
def extract_page_text(page: fitz.Page) -> Tuple[str, bool]:
    txt = page.get_text("text") or ""
    scanned = False
    if not txt.strip():
        # If no text but images present, likely scanned
        try:
            scanned = len(page.get_images(full=True)) > 0
        except Exception:
            scanned = False
    return txt, scanned

## Optional OCR Fallback
- If a page is scanned and OCR is available, we can rasterize the page and run Tesseract.
- This is optional and can be slow; only used when necessary.

In [18]:
def ocr_page(page: fitz.Page, dpi: int = 200) -> str:
    if not OCR_AVAILABLE:
        return ""
    mat = fitz.Matrix(dpi / 72, dpi / 72)
    pix = page.get_pixmap(matrix=mat, alpha=False)
    img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
    return pytesseract.image_to_string(img)

## Header/Footer detection and removal
- Many long PDFs repeat headers/footers; we can remove them to improve chunk quality.
- **Heuristic**: collect first and last non-empty line of each page; if a line repeats on >= threshold of pages and has sufficient length, treat it as a header/footer and strip it from all pages.

In [19]:
from collections import Counter

def first_last_lines_per_page(pages_text: List[str]) -> Tuple[Counter, Counter]:
    firsts, lasts = Counter(), Counter()
    for t in pages_text:
        lines = [ln.strip() for ln in t.splitlines() if ln.strip()]
        if lines:
            firsts[lines[0]] += 1
            lasts[lines[-1]] += 1
    return firsts, lasts

def detect_repeated_headers_footers(pages_text: List[str], min_freq: float) -> Tuple[set, set]:
    n_pages = max(1, len(pages_text))
    firsts, lasts = first_last_lines_per_page(pages_text)
    
    headers = {ln for ln, c in firsts.items() 
               if c / n_pages >= min_freq and len(ln) >= MIN_HEADER_FOOTER_LEN}
    footers = {ln for ln, c in lasts.items() 
               if c / n_pages >= min_freq and len(ln) >= MIN_HEADER_FOOTER_LEN}
    
    return headers, footers

def strip_headers_footers(text: str, headers: set, footers: set) -> str:
    lines = text.splitlines()
    # Strip only if exact match (conservative)
    if lines and lines[0].strip() in headers:
        lines = lines[1:]
    if lines and lines[-1].strip() in footers:
        lines = lines[:-1]
    return "\n".join(lines)

## Hierarchical, overlap-aware chunking
- Try to split by **paragraph** first, then **sentences**, then hard-wrap by characters with overlap.
- Maintain `char_start` and `char_end` offsets within the page text to keep traceability.

In [20]:
import re

SENTENCE_SPLIT = re.compile(r"(?<=[.!?])\s+")
PARA_SPLIT = re.compile(r"\n\s*\n+")

def split_to_paragraphs(text: str) -> List[Tuple[int, int, str]]:
    spans = []
    start = 0
    for m in PARA_SPLIT.finditer(text):
        end = m.start()
        chunk = text[start:end].strip()
        if chunk:
            spans.append((start, end, chunk))
        start = m.end()
    # Remainder
    if start < len(text):
        chunk = text[start:].strip()
        if chunk:
            spans.append((start, len(text), chunk))
    return spans

def chunk_hierarchical(text: str, max_chars: int = 1200, overlap: int = 200) -> List[Dict[str, Any]]:
    if not text.strip():
        return []

    chunks = []
    paras = split_to_paragraphs(text)
    buf = ""
    buf_start, buf_end = 0, 0

    def flush_buffer():
        nonlocal buf, buf_start, buf_end
        if buf:
            chunks.append({"text": buf, "char_start": buf_start, "char_end": buf_end})
            # Create overlap for the next buffer
            if overlap > 0:
                overlap_text = buf[-overlap:]
                buf = overlap_text
                buf_start = buf_end - len(overlap_text)
            else:
                buf = ""
                buf_start = buf_end
    
    for p_start, p_end, p_text in paras:
        if not buf:
            buf = p_text
            buf_start, buf_end = p_start, p_end
        elif len(buf) + len(p_text) + 2 <= max_chars:
            buf += "\n\n" + p_text
            buf_end = p_end
        else:
            flush_buffer()
            # If the new paragraph is still too large with the overlap, just use the new para
            if len(buf) + len(p_text) + 2 > max_chars:
                buf = p_text
                buf_start, buf_end = p_start, p_end
            else:
                buf += "\n\n" + p_text
                buf_end = p_end

    if buf:  # Flush any remaining buffer
        chunks.append({"text": buf, "char_start": buf_start, "char_end": buf_end})
        
    # Final safeguard for chunks that are still too long (e.g., one very long paragraph)
    final_chunks = []
    for ch in chunks:
        if len(ch["text"]) <= max_chars:
            final_chunks.append(ch)
            continue
        
        # Hard split the oversized chunk
        text_part = ch["text"]
        start_offset = ch["char_start"]
        i = 0
        while i < len(text_part):
            end = i + max_chars
            chunk_text = text_part[i:end]
            final_chunks.append({
                "text": chunk_text,
                "char_start": start_offset + i,
                "char_end": start_offset + i + len(chunk_text)
            })
            i += (max_chars - overlap)
            
    return final_chunks

## End-to-end PDF ingestion and chunking for one file
- Extract per-page text and scanned flag.
- Detect repeated headers/footers and strip.
- Use TOC to assign `section_path`/`section_title`; fallback to `None` if no TOC.
- Produce chunk records with metadata: `doc_id`, `source_path`, `page`, `section_title/path`, `char_start/end`, `chunk_index`.

In [21]:
@dataclass
class PDFIngestStats:
    pages_total: int = 0
    pages_scanned: int = 0
    chunks_total: int = 0
    avg_chars_per_chunk: float = 0.0
    duration_s: float = 0.0

def ingest_pdf(path: Path,
             doc_id: Optional[str] = None,
             max_chars: int = MAX_CHARS,
             overlap: int = OVERLAP,
             ocr_scanned: bool = False) -> Tuple[List[Dict[str, Any]], PDFIngestStats]:
    t0 = time.time()
    path = Path(path)
    doc_id = doc_id or path.stem
    chunks_out: List[Dict[str, Any]] = []
    
    with fitz.open(str(path)) as doc:
        toc_map = build_toc_map(doc)
        pages_text = []
        scanned_flags = []
        for pno in tqdm(range(doc.page_count), desc=f"Extracting {path.name}", leave=False):
            page = doc.load_page(pno)
            txt, scanned = extract_page_text(page)
            if scanned and ocr_scanned and OCR_AVAILABLE:
                ocr_txt = ocr_page(page)
                if ocr_txt.strip():
                    txt = ocr_txt
                    scanned = False  # recovered by OCR
            pages_text.append(txt)
            scanned_flags.append(scanned)

        headers, footers = detect_repeated_headers_footers(pages_text, HEADER_FOOTER_MIN_FREQ)

        chunk_idx = 0
        for pno, (txt, scanned) in enumerate(zip(pages_text, scanned_flags)):
            clean_txt = strip_headers_footers(txt, headers, footers)
            page_chunks = chunk_hierarchical(clean_txt, max_chars=max_chars, overlap=overlap)
            section_meta = toc_map.get(pno, {"section_title": None, "section_path": []})

            for ch in page_chunks:
                record = {
                    "id": f"{doc_id}_p{pno+1}_{chunk_idx}",
                    "text": ch["text"],
                    "metadata": {
                        "doc_id": doc_id,
                        "source_path": str(path),
                        "page": pno + 1,  # human-friendly
                        "char_start": ch["char_start"],
                        "char_end": ch["char_end"],
                        "section_title": section_meta.get("section_title"),
                        "section_path": section_meta.get("section_path") or [],
                        "scanned": scanned,
                    }
                }
                chunks_out.append(record)
                chunk_idx += 1

    dur = time.time() - t0
    n_chars = sum(len(c["text"]) for c in chunks_out) or 1
    stats = PDFIngestStats(
        pages_total=len(pages_text),
        pages_scanned=sum(1 for s in scanned_flags if s),
        chunks_total=len(chunks_out),
        avg_chars_per_chunk=n_chars / max(1, len(chunks_out)),
        duration_s=round(dur, 2),
    )
    return chunks_out, stats

## Multi-PDF ingestion
- Process all PDFs in a directory (non-recursive by default).
- Writes one combined JSONL and one per-document JSONL (optional).
- Returns combined chunk list and stats summary.

In [23]:
def ingest_directory(input_path: Path,
                     pattern: str = "*.pdf",
                     per_doc_output: bool = False,
                     **ingest_kwargs) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
    input_path = Path(input_path)
    files = []
    if input_path.is_file() and input_path.suffix.lower() == ".pdf":
        files = [input_path]
    elif input_path.is_dir():
        files = [Path(p) for p in glob.glob(str(input_path / pattern))]
    
    all_chunks: List[Dict[str, Any]] = []
    summary = {"docs": []}

    for pdf_path in tqdm(files, desc="Processing documents"):
        chunks, stats = ingest_pdf(pdf_path, **ingest_kwargs)
        all_chunks.extend(chunks)
        summary["docs"].append({
            "doc_id": pdf_path.stem,
            "source_path": str(pdf_path),
            "stats": stats.__dict__
        })
        if per_doc_output:
            out_path = OUTPUT_DIR / f"{pdf_path.stem}_chunks.jsonl"
            save_jsonl(chunks, out_path)

    summary["total_chunks"] = len(all_chunks)
    summary["total_docs"] = len(files)
    return all_chunks, summary

## Mini Task
1.  Place a long PDF (100+ pages recommended) in the `data/pdfs` directory or set a direct path.
2.  Run ingestion → chunk → save chunks with metadata to JSONL.
3.  Inspect a few sample chunks.

> **Note**: If you don't have a PDF, you can find many public domain examples online, such as government reports or classic literature from Project Gutenberg.

In [24]:
# To run this cell, make sure you have at least one PDF in your INPUT_DIR
if not any(INPUT_DIR.iterdir()):
    print(f"⚠️ INPUT_DIR '{INPUT_DIR}' is empty. Please add at least one PDF file to run the ingestion task.")
else:
    # Ingest the entire directory
    chunks, summary = ingest_directory(INPUT_DIR, per_doc_output=True, ocr_scanned=False)
    
    # Save the combined results
    out_path = OUTPUT_DIR / "all_chunks.jsonl"
    save_jsonl(chunks, out_path)
    
    print(f"\nWrote {summary['total_chunks']} chunks from {summary['total_docs']} docs to {out_path}")
    print(json.dumps(summary, indent=2))

Processing documents:   0%|          | 0/1 [00:00<?, ?it/s]

Extracting Efficient_management_and_compliance_check_of_HVAC_.pdf:   0%|          | 0/31 [00:00<?, ?it/s]


Wrote 113 chunks from 1 docs to data\outputs\all_chunks.jsonl
{
  "docs": [
    {
      "doc_id": "Efficient_management_and_compliance_check_of_HVAC_",
      "source_path": "data\\pdfs\\Efficient_management_and_compliance_check_of_HVAC_.pdf",
      "stats": {
        "pages_total": 31,
        "pages_scanned": 0,
        "chunks_total": 113,
        "avg_chars_per_chunk": 1025.787610619469,
        "duration_s": 0.23
      }
    }
  ],
  "total_chunks": 113,
  "total_docs": 1
}


## Inspect sample chunks
- View first few chunks with metadata.
- Search for a keyword and view matching chunks (quick sanity check).

In [25]:
def find_chunks(keyword: str, chunk_list: list, limit: int = 5):
    hits = []
    for r in chunk_list:
        if keyword.lower() in r["text"].lower():
            hits.append(r)
        if len(hits) >= limit:
            break
    return hits

if 'chunks' in locals() and chunks:
    sample = chunks[:5]
    print("--- First 5 Chunks ---")
    for r in sample:
        print(f"id={r['id']}, page={r['metadata']['page']}, section='{r['metadata']['section_title']}'")
        print(r["text"][:200].replace("\n", " ") + "...")
        print("-" * 80)
    
    keyword = "agreement" # Change this keyword to search for something relevant in your doc
    hits = find_chunks(keyword, chunks, limit=3)
    print(f"\n--- Found {len(hits)} hits for '{keyword}' ---")
    for r in hits:
        print(f"id={r['id']} | page={r['metadata']['page']} | section='{r['metadata']['section_title']}'")
        print("..." + r["text"][:250].replace("\n", " ") + "...")
        print("-" * 80)
else:
    print("No chunks found. Please run the ingestion cell first.")

--- First 5 Chunks ---
id=Efficient_management_and_compliance_check_of_HVAC__p1_0, page=1, section='A document-centric AEC industry'
Semantic Web -1 (2024) 1–31 1 DOI 10.3233/SW-243595 IOS Press CORRECTED PROOF Efﬁcient management and compliance check of HVAC information in the building design phase using Semantic Web technologies ...
--------------------------------------------------------------------------------
id=Efficient_management_and_compliance_check_of_HVAC__p1_1, page=1, section='A document-centric AEC industry'
 building services and HVAC components. The Flow Systems Ontology was recently proposed to address this need, but it does not include HVAC components’ size and capacity-related properties. Also, despi...
--------------------------------------------------------------------------------
id=Efficient_management_and_compliance_check_of_HVAC__p1_2, page=1, section='A document-centric AEC industry'
d Air Conditioning (HVAC), SHACL, Semantic Web technologies, Linked Data, com

## Efficiency notes for 100+ page PDFs
- Use `page.get_text("text")` for speed; only switch to OCR for scanned pages and only if needed.
- Avoid storing heavy page objects; collect text then release page references.
- Strip headers/footers to reduce repeated noise.
- Chunk with overlap to preserve context across boundaries.
- For very large corpora, write per-document JSONL during processing to limit memory.

## Output schema (for Module 3)
Each JSONL line is a chunk with the following structure:
```json
{
  "id": "string",
  "text": "string",
  "metadata": {
    "doc_id": "string",
    "source_path": "string",
    "page": "int",
    "char_start": "int",
    "char_end": "int",
    "section_title": "string | null",
    "section_path": "list[string]",
    "scanned": "bool"
  }
}
```
Module 3 will read these JSONL files, generate embeddings, index with FAISS, and enable semantic search.

## Appendix: Troubleshooting
- **If pages come back empty but not scanned**: some PDFs embed text as vector shapes; try `page.get_text("rawdict")` or `"blocks"` to diagnose.
- **If TOC is missing**: `section_title`/`path` will be `None`/`[]`; consider custom heading detection using `page.get_text("dict")` and analyzing font sizes.
- **OCR quality**: install language packs for Tesseract and tune DPI (200–300). Higher DPI → better OCR but slower processing.