# Marker PDF Preprocessing for RAG Pipeline

**Purpose**: Process PDFs with GPU-accelerated Marker for improved table/figure extraction.

**Workflow**:
1. Upload PDFs to Colab
2. Process with Marker (GPU) -> Markdown with tables/figures
3. Chunk markdown for RAG
4. Export as JSON
5. Download for local use

**Why Marker over PyMuPDF?**
- Table extraction: 0% (PyMuPDF) -> ~96% (Marker)
- Figure extraction with captions
- Layout-aware processing

---

## 1. Setup Environment

In [None]:
# Check GPU availability
!nvidia-smi

In [None]:
# Install marker-pdf and langchain text splitter (takes ~2-3 minutes)
!pip install -q marker-pdf>=1.10.0 langchain-text-splitters

In [None]:
# Setup OpenAI for Vision LLM (figure descriptions)
# Add your API key to Colab secrets: Settings > Secrets > Add OPENAI_API_KEY
!pip install -q openai

import openai
from google.colab import userdata

try:
    openai.api_key = userdata.get('OPENAI_API_KEY')
    print("OpenAI API key loaded from Colab secrets")
except Exception as e:
    print(f"Warning: Could not load OPENAI_API_KEY from secrets: {e}")
    print("Figure descriptions will be skipped. Add key in Settings > Secrets")

In [None]:
import os
import re
import json
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass, field, asdict
from typing import Optional
from google.colab import files

print("Imports successful!")

## 2. Upload PDFs

In [None]:
# Create input directory
INPUT_DIR = Path("/content/pdfs")
OUTPUT_DIR = Path("/content/output")
INPUT_DIR.mkdir(exist_ok=True)
OUTPUT_DIR.mkdir(exist_ok=True)

print(f"Upload PDFs to: {INPUT_DIR}")
print(f"Output will be saved to: {OUTPUT_DIR}")

In [None]:
# Upload PDFs (interactive file picker)
print("Select PDF files to upload...")
uploaded = files.upload()

# Move uploaded files to input directory
for filename, content in uploaded.items():
    if filename.lower().endswith('.pdf'):
        filepath = INPUT_DIR / filename
        with open(filepath, 'wb') as f:
            f.write(content)
        print(f"Saved: {filepath}")
    else:
        print(f"Skipped (not PDF): {filename}")

# List uploaded PDFs
pdf_files = list(INPUT_DIR.glob("*.pdf"))
print(f"\nTotal PDFs ready for processing: {len(pdf_files)}")
for pdf in pdf_files:
    print(f"  - {pdf.name}")

## 3. Initialize Marker Processor

In [None]:
# Data classes for structured output
@dataclass
class TableInfo:
    """Extracted table information."""
    table_id: str
    headers: list
    rows: list
    markdown: str

@dataclass
class FigureInfo:
    """Extracted figure information."""
    figure_id: str
    image_key: str
    caption: str
    alt_text: str
    context_before: str = ""
    context_after: str = ""

@dataclass
class ChunkInfo:
    """Chunk with metadata."""
    id: str
    content: str
    metadata: dict

@dataclass
class ProcessedDocument:
    """Full processed document output."""
    source: str
    processor: str
    processed_date: str
    markdown: str
    chunks: list
    tables: list
    figures: list
    stats: dict

In [None]:
def extract_tables(markdown: str) -> list:
    """Extract structured table information from markdown."""
    tables = []
    # Pattern for markdown tables: header row, separator row, data rows
    table_pattern = r"(\|[^\n]+\|\n)(\|[-:\s|]+\|\n)((?:\|[^\n]+\|\n?)+)"

    for i, match in enumerate(re.finditer(table_pattern, markdown)):
        try:
            header_row = match.group(1).strip()
            headers = [c.strip() for c in header_row.split("|")[1:-1]]

            data_section = match.group(3).strip()
            rows = []
            for row_line in data_section.split("\n"):
                if row_line.strip():
                    cells = [c.strip() for c in row_line.split("|")[1:-1]]
                    if cells:
                        rows.append(cells)

            tables.append(TableInfo(
                table_id=f"table_{i + 1}",
                headers=headers,
                rows=rows,
                markdown=match.group(0),
            ))
        except Exception as e:
            print(f"Warning: Failed to parse table {i + 1}: {e}")

    return tables


def extract_figures(markdown: str) -> list:
    """Extract figure information with captions from markdown."""
    figures = []
    # Pattern for image references with optional caption
    figure_pattern = r"!\[([^\]]*)\]\(([^)]+)\)(?:\s*\n\s*(?:\*\*)?(?:Figure\s*\d+[.:]?)?\s*([^\n]+)(?:\*\*)?)?"

    for i, match in enumerate(re.finditer(figure_pattern, markdown, re.IGNORECASE)):
        alt_text = match.group(1) or ""
        image_path = match.group(2) or ""
        caption = match.group(3) or alt_text

        # Get surrounding context
        start = max(0, match.start() - 150)
        end = min(len(markdown), match.end() + 150)
        context_before = markdown[start:match.start()].split("\n")[-1].strip()
        context_after = markdown[match.end():end].split("\n")[0].strip()

        figures.append(FigureInfo(
            figure_id=f"figure_{i + 1}",
            image_key=image_path,
            caption=caption.strip() if caption else "",
            alt_text=alt_text,
            context_before=context_before,
            context_after=context_after,
        ))

    return figures

print("Extraction functions defined.")

In [None]:
import base64
import io
import time

def describe_figure(image, caption: str = "", max_retries: int = 3) -> str:
    """
    Generate description of figure using GPT-4o-mini vision.

    Args:
        image: PIL Image object from Marker output
        caption: Figure caption for context
        max_retries: Maximum retry attempts for rate limit errors

    Returns:
        AI-generated description of the visual content
    """
    if not openai.api_key:
        return ""  # Skip if no API key

    # Convert PIL Image to bytes (do this once, outside retry loop)
    buffer = io.BytesIO()
    image.save(buffer, format='PNG')
    image_bytes = buffer.getvalue()
    base64_image = base64.b64encode(image_bytes).decode('utf-8')

    for attempt in range(max_retries):
        try:
            response = openai.chat.completions.create(
                model="gpt-4o-mini",
                messages=[{
                    "role": "user",
                    "content": [
                        {
                            "type": "text",
                            "text": f"Describe this academic figure in 2-3 sentences. Include: axes labels, data trends, key findings. Caption context: {caption}"
                        },
                        {
                            "type": "image_url",
                            "image_url": {"url": f"data:image/png;base64,{base64_image}"}
                        }
                    ]
                }],
                max_tokens=150
            )
            return response.choices[0].message.content.strip()

        except openai.RateLimitError as e:
            # Exponential backoff: 1s, 2s, 4s
            wait_time = 2 ** attempt
            print(f"  Rate limit hit, retrying in {wait_time}s... (attempt {attempt + 1}/{max_retries})")
            time.sleep(wait_time)

        except Exception as e:
            print(f"  Warning: Vision API failed: {e}")
            return ""

    print(f"  Warning: Max retries ({max_retries}) exceeded for vision API")
    return ""

print("Vision description function defined (GPT-4o-mini with retry logic).")

In [None]:
from langchain_text_splitters import RecursiveCharacterTextSplitter

def create_text_splitter(chunk_size: int = 1000, chunk_overlap: int = 200):
    """Create markdown-aware text splitter with proper overlap."""
    return RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,  # 20% overlap - matches local marker_processor.py
        separators=[
            "\n## ",   # H2 headers
            "\n### ",  # H3 headers
            "\n#### ", # H4 headers
            "\n\n",    # Paragraphs
            "\n",      # Lines
            ". ",      # Sentences
            " ",       # Words
            "",
        ],
    )

def chunk_markdown(
    markdown: str,
    source: str,
    chunk_size: int = 1000,
    chunk_overlap: int = 200,
) -> list:
    """
    Chunk markdown text using LangChain's RecursiveCharacterTextSplitter.
    
    Uses markdown-aware separators and proper overlap between chunks.
    Default: 1000 chars with 200 char overlap (20%).
    """
    splitter = create_text_splitter(chunk_size, chunk_overlap)
    raw_chunks = splitter.split_text(markdown)

    # Create ChunkInfo objects with metadata
    chunks = []
    for i, content in enumerate(raw_chunks):
        # Detect if chunk contains table or figure
        has_table = bool(re.search(r"\|[^|]+\|.*\n\|[-:]+\|", content))
        has_figure = bool(re.search(r"!\[[^\]]*\]\([^)]+\)", content))

        chunk = ChunkInfo(
            id=f"{source}_chunk_{i}",
            content=content.strip(),
            metadata={
                "chunk_index": i,
                "content_type": "text",
                "has_table": has_table,
                "has_figure": has_figure,
                "char_count": len(content),
            }
        )
        chunks.append(chunk)

    return chunks

print("Chunking function defined (using RecursiveCharacterTextSplitter with 20% overlap).")

In [None]:
# Initialize Marker converter (loads models - takes ~1-2 minutes first time)
print("Loading Marker models (this may take 1-2 minutes)...")

from marker.converters.pdf import PdfConverter
from marker.models import create_model_dict
from marker.output import text_from_rendered

# Create model dict with GPU
model_dict = create_model_dict(device="cuda")
converter = PdfConverter(artifact_dict=model_dict)

print("Marker models loaded successfully!")

## 4. Process PDFs

In [None]:
def process_pdf(pdf_path: Path, converter) -> ProcessedDocument:
    """
    Process a single PDF with Marker and return structured output.
    Includes Vision LLM descriptions for figures.
    """
    print(f"\nProcessing: {pdf_path.name}")
    start_time = datetime.now()

    # Convert PDF to markdown
    rendered = converter(str(pdf_path))
    markdown_text, _, images = text_from_rendered(rendered)

    # Get native image count from Marker (more accurate than regex)
    native_image_count = len(images) if images else 0

    # Extract structured elements (for creating dedicated chunks)
    tables = extract_tables(markdown_text)
    figures = extract_figures(markdown_text)  # For figure chunks with context

    # Chunk the markdown
    chunks = chunk_markdown(markdown_text, pdf_path.name)

    # Add dedicated table chunks
    for table in tables:
        table_chunk = ChunkInfo(
            id=f"{pdf_path.name}_{table.table_id}",
            content=table.markdown,
            metadata={
                "content_type": "table",
                "table_id": table.table_id,
                "headers": table.headers,
                "row_count": len(table.rows),
            }
        )
        chunks.append(table_chunk)

    # Add dedicated figure chunks with Vision LLM descriptions
    figures_described = 0
    for figure in figures:
        # Try to get vision description if image available
        vision_description = ""
        if images and figure.image_key in images:
            image_bytes = images[figure.image_key]
            vision_description = describe_figure(image_bytes, figure.caption)
            if vision_description:
                figures_described += 1

        # Build figure chunk content
        figure_text = ""
        if figure.context_before:
            figure_text += f"{figure.context_before}\n\n"
        figure_text += f"Figure: {figure.caption}"
        if vision_description:
            figure_text += f"\n\nVisual Description: {vision_description}"
        if figure.context_after:
            figure_text += f"\n\n{figure.context_after}"

        figure_chunk = ChunkInfo(
            id=f"{pdf_path.name}_{figure.figure_id}",
            content=figure_text,
            metadata={
                "content_type": "figure",
                "figure_id": figure.figure_id,
                "caption": figure.caption,
                "image_key": figure.image_key,
                "has_vision_description": bool(vision_description),
            }
        )
        chunks.append(figure_chunk)

    elapsed = (datetime.now() - start_time).total_seconds()

    # Build output
    doc = ProcessedDocument(
        source=pdf_path.name,
        processor="marker",
        processed_date=datetime.now().isoformat(),
        markdown=markdown_text,
        chunks=[asdict(c) for c in chunks],
        tables=[asdict(t) for t in tables],
        figures=[asdict(f) for f in figures],
        stats={
            "total_chunks": len(chunks),
            "text_chunks": sum(1 for c in chunks if c.metadata.get("content_type") == "text"),
            "tables_extracted": len(tables),
            "images_extracted": native_image_count,  # Native Marker count
            "figure_chunks_created": len(figures),   # Regex-based chunks
            "figures_with_vision": figures_described,  # Vision LLM descriptions
            "markdown_chars": len(markdown_text),
            "processing_time_seconds": round(elapsed, 2),
        }
    )

    print(f"  Completed in {elapsed:.1f}s")
    print(f"  - {len(chunks)} total chunks")
    print(f"  - {len(tables)} tables extracted")
    print(f"  - {native_image_count} images extracted (Marker native)")
    print(f"  - {figures_described}/{len(figures)} figures with vision descriptions")

    return doc

In [None]:
# Process all uploaded PDFs
processed_docs = []

print(f"Processing {len(pdf_files)} PDF(s)...")
print("=" * 50)

for pdf_path in pdf_files:
    try:
        doc = process_pdf(pdf_path, converter)
        processed_docs.append(doc)
    except Exception as e:
        print(f"ERROR processing {pdf_path.name}: {e}")

print("\n" + "=" * 50)
print(f"Processed {len(processed_docs)}/{len(pdf_files)} PDFs successfully")

## 5. Export Results

In [None]:
# Export each document as JSON
for doc in processed_docs:
    output_file = OUTPUT_DIR / f"{Path(doc.source).stem}_processed.json"
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(asdict(doc), f, indent=2, ensure_ascii=False)
    print(f"Saved: {output_file}")

# Also create a combined output file
combined_output = {
    "processed_date": datetime.now().isoformat(),
    "processor": "marker",
    "total_documents": len(processed_docs),
    "documents": [asdict(doc) for doc in processed_docs],
    "summary": {
        "total_chunks": sum(doc.stats["total_chunks"] for doc in processed_docs),
        "total_tables": sum(doc.stats["tables_extracted"] for doc in processed_docs),
        "total_images": sum(doc.stats["images_extracted"] for doc in processed_docs),
    }
}

combined_file = OUTPUT_DIR / "all_documents_processed.json"
with open(combined_file, 'w', encoding='utf-8') as f:
    json.dump(combined_output, f, indent=2, ensure_ascii=False)

print(f"\nCombined output saved: {combined_file}")
print(f"\nSummary:")
print(f"  Total chunks: {combined_output['summary']['total_chunks']}")
print(f"  Total tables: {combined_output['summary']['total_tables']}")
print(f"  Total images: {combined_output['summary']['total_images']} (Marker native count)")

## 6. Download Results

In [None]:
# Download individual files
print("Downloading JSON files...\n")

for json_file in OUTPUT_DIR.glob("*.json"):
    print(f"Downloading: {json_file.name}")
    files.download(str(json_file))

In [None]:
# Alternative: Download as ZIP
import shutil

zip_path = "/content/marker_processed_output"
shutil.make_archive(zip_path, 'zip', OUTPUT_DIR)

print(f"Created: {zip_path}.zip")
files.download(f"{zip_path}.zip")

## 7. Preview Sample Output

In [None]:
# Preview first document's tables
if processed_docs:
    doc = processed_docs[0]
    print(f"\n=== Tables from {doc.source} ===")
    for table in doc.tables[:3]:  # Show first 3 tables
        print(f"\n{table['table_id']}:")
        print(f"Headers: {table['headers']}")
        print(f"Rows: {len(table['rows'])}")
        print(table['markdown'][:500] + "..." if len(table['markdown']) > 500 else table['markdown'])

In [None]:
# Preview first few chunks
if processed_docs:
    doc = processed_docs[0]
    print(f"\n=== Sample Chunks from {doc.source} ===")
    for chunk in doc.chunks[:3]:
        print(f"\n--- {chunk['id']} ---")
        print(f"Type: {chunk['metadata'].get('content_type', 'text')}")
        print(f"Content preview: {chunk['content'][:300]}...")

---

## Usage Instructions

1. **Run all cells** in order
2. **Upload your PDFs** when prompted in cell 4
3. **Wait for processing** (typically 1-3 minutes per PDF with GPU)
4. **Download the JSON files** from the download cells
5. **Place JSON files** in your local `data/preprocessed/` folder

### Local Loading (Future Integration)

```python
import json
from langchain_core.documents import Document

def load_preprocessed_chunks(json_path: str) -> list[Document]:
    with open(json_path) as f:
        data = json.load(f)
    
    documents = []
    for chunk in data['chunks']:
        doc = Document(
            page_content=chunk['content'],
            metadata={
                'id': chunk['id'],
                'source': data['source'],
                'processor': 'marker',
                **chunk['metadata']
            }
        )
        documents.append(doc)
    return documents
```