# PDF Processing Pipeline — Notebook Interface

This notebook is a **thin wrapper** around the `pipeline/` package. All pipeline
code lives in the package modules — this notebook only handles configuration,
Colab environment setup, and orchestration.

It installs every dependency, optionally mounts Google Drive, finds all PDFs in
the folder you specify, and runs the full flow:

**PDF → Markdown → NLP → Chunking → Vector DB → Manifest**

In [None]:
import importlib.util
import subprocess
import sys


def _has(mod: str) -> bool:
    return importlib.util.find_spec(mod) is not None


# Full dependency list matching pyproject.toml
PACKAGES = [
    ("docling", "docling"),
    ("docling-core", "docling_core"),
    ("easyocr", "easyocr"),
    ("google-api-python-client", "googleapiclient"),
    ("google-auth-httplib2", "google_auth_httplib2"),
    ("google-auth-oauthlib", "google_auth_oauthlib"),
    ("nltk", "nltk"),
    ("opencv-python-headless==4.13.0.92", "cv2"),
    ("qdrant-client[fastembed]", "qdrant_client"),
    ("rake-nltk", "rake_nltk"),
    ("scikit-learn", "sklearn"),
    ("sentence-transformers", "sentence_transformers"),
    ("spacy", "spacy"),
    ("torch", "torch"),
    ("tqdm", "tqdm"),
    ("transformers", "transformers"),
    ("vaderSentiment", "vaderSentiment"),
    ("chromadb", "chromadb"),
]

missing = [pip for pip, mod in PACKAGES if not _has(mod)]
if missing:
    print(f"Installing: {missing}")
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", *missing])
else:
    print("All packages already installed.")

if not _has("en_core_web_sm"):
    print("Downloading spaCy model: en_core_web_sm")
    subprocess.check_call([sys.executable, "-m", "spacy", "download", "en_core_web_sm"])

print("Dependency setup complete.")

In [None]:
import sys
from pathlib import Path

if "google.colab" in sys.modules:
    from google.colab import drive

    drive.mount("/content/drive")

# ╔══════════════════════════════════════════════════════════════╗
# ║  USER CONFIGURATION — edit these before running             ║
# ╚══════════════════════════════════════════════════════════════╝
FOLDER_NAME = "output"
BACKEND = "qdrant"  # "qdrant" or "chroma"
RUN_NLP = True
DISABLE_OCR = False
FORCE_REPROCESS = False
REBUILD_INDEX = False

DRIVE_ROOT = Path("/content/drive/MyDrive")
OUTPUT_BASE = Path("/content/output")
QDRANT_PATH = Path("/content/qdrant_data")

assert FOLDER_NAME, "Set FOLDER_NAME first"
assert BACKEND in {"qdrant", "chroma"}, "BACKEND must be 'qdrant' or 'chroma'"
assert DRIVE_ROOT.exists(), f"Drive root not found: {DRIVE_ROOT}"

In [None]:
import json
import logging
import os
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("pipeline")

# Ensure the project root (containing pipeline/) is on sys.path.
# In Colab: clone the repo first, then set PROJECT_ROOT to the clone path.
# Locally: defaults to the notebook's working directory.
PROJECT_ROOT = os.environ.get("PROJECT_ROOT", os.getcwd())
if PROJECT_ROOT not in sys.path:
    sys.path.insert(0, PROJECT_ROOT)

# Import everything from the pipeline package
from pipeline import (
    COLLECTION_NAME,
    EMBEDDING_DIM,
    EMBEDDING_MODEL,
    MAX_TOKENS,
    DocRecord,
    analyze_sentiment,
    build_qdrant_collection,
    chunk_documents,
    classify_document_type,
    convert_single_pdf,
    create_chroma_collection,
    create_chunker,
    create_ocr_converter,
    discover_pdfs,
    ensure_output_dirs,
    extract_entities,
    extract_keywords_rake,
    extract_tfidf_topics,
    extractive_summary,
    init_nlp,
    insert_chunks,
    run_nlp_analysis,
    save_manifest,
)
from pipeline.utils import (
    file_fingerprint,
    is_unchanged_file,
    load_pipeline_state,
    save_pipeline_state,
)

print(f"Pipeline loaded from: {PROJECT_ROOT}")
print("Pipeline imports loaded.")

In [None]:
def resolve_folder_by_name(drive_root: Path, folder_name: str) -> Path:
    """Find a folder by name under drive_root (direct child first, then recursive)."""
    if folder_name.startswith("/"):
        candidate = Path(folder_name)
        if candidate.exists() and candidate.is_dir():
            return candidate

    direct = drive_root / folder_name
    if direct.exists() and direct.is_dir():
        return direct

    matches = sorted(
        [p for p in drive_root.rglob("*") if p.is_dir() and p.name == folder_name]
    )
    if not matches:
        raise FileNotFoundError(
            f"No folder named '{folder_name}' found under {drive_root}"
        )

    if len(matches) > 1:
        print("Multiple folders found with same name; using first match:")
        for idx, path in enumerate(matches[:10], start=1):
            print(f"  {idx}. {path}")

    return matches[0]


# --- Step 1: Resolve folder, create output dirs, discover PDFs ---
target_folder = resolve_folder_by_name(DRIVE_ROOT, FOLDER_NAME)
output_dir = OUTPUT_BASE / target_folder.name
md_dir, db_dir = ensure_output_dirs(output_dir)
nlp_dir = output_dir / "nlp"
nlp_dir.mkdir(parents=True, exist_ok=True)

pdf_files = discover_pdfs(target_folder)
if not pdf_files:
    raise RuntimeError("No PDFs found in selected folder")

# --- Resume check: skip unchanged PDFs ---
state = load_pipeline_state(output_dir)
state_files: dict[str, dict[str, Any]] = state.setdefault("files", {})
pdf_files_to_process: list[Path] = []
skipped_unchanged = 0

for pdf_path in pdf_files:
    state_key = str(pdf_path.resolve())
    previous = state_files.get(state_key)
    md_file = md_dir / f"{pdf_path.stem}.md"
    nlp_file = nlp_dir / f"{pdf_path.stem}_nlp.json"
    has_output_artifacts = md_file.exists() and (not RUN_NLP or nlp_file.exists())
    if (
        not FORCE_REPROCESS
        and has_output_artifacts
        and is_unchanged_file(pdf_path, previous)
    ):
        skipped_unchanged += 1
        continue
    pdf_files_to_process.append(pdf_path)

print(f"Resolved folder: {target_folder}")
print(f"PDF count:       {len(pdf_files)}")
print(f"Skipped cached:  {skipped_unchanged}")
print(f"To process:      {len(pdf_files_to_process)}")
print(f"Output dir:      {output_dir}")

In [None]:
from tqdm import tqdm

# --- Step 2: Convert PDFs to Markdown ---
docling_docs: dict[str, object] = {}
records: list[DocRecord] = []

if pdf_files_to_process:
    log.info("Initializing Docling OCR converter...")
    converter, ocr_engine = create_ocr_converter(enable_ocr=not DISABLE_OCR)
    log.info(f"Converter OCR profile: {ocr_engine}")

    for pdf_path in tqdm(pdf_files_to_process, desc="Converting PDFs"):
        record, doc = convert_single_pdf(converter, pdf_path)
        records.append(record)
        if doc is not None:
            docling_docs[record.filename] = doc
            md_file = md_dir / f"{pdf_path.stem}.md"
            md_file.write_text(record.markdown, encoding="utf-8")
else:
    log.info("No new/changed PDFs detected; conversion stage skipped.")

success = [r for r in records if r.status == "success"]
failed = [r for r in records if r.status == "error"]
print(f"OCR engine:  {ocr_engine if pdf_files_to_process else 'n/a'}")
print(f"Conversion:  {len(success)} success, {len(failed)} failed")

# --- Step 3: NLP Analysis ---
nlp_results: dict[str, dict] = {}
if not RUN_NLP:
    log.info("NLP stage disabled via RUN_NLP=False")
elif success:
    log.info("Running NLP analysis...")
    init_nlp()

    successful = [r for r in records if r.status == "success" and r.markdown]
    all_texts = [r.markdown for r in successful]

    # Batch TF-IDF (needs all texts at once)
    log.info("Computing TF-IDF topics...")
    all_tfidf = extract_tfidf_topics(all_texts)

    # Per-record NLP with progress bar
    for i, record in enumerate(tqdm(successful, desc="NLP analysis")):
        text = record.markdown
        entities = extract_entities(text)
        analysis = {
            "keywords_rake": extract_keywords_rake(text),
            "named_entities": entities,
            "summary": extractive_summary(text),
            "sentiment": analyze_sentiment(text),
            "tfidf_topics": all_tfidf[i] if i < len(all_tfidf) else [],
            "document_type": classify_document_type(text),
            "word_count": len(text.split()),
            "char_count": len(text),
            "people": entities.get("PERSON", []),
            "organizations": entities.get("ORG", []),
            "dates": entities.get("DATE", []),
            "amounts": entities.get("MONEY", []),
        }
        nlp_results[record.filename] = analysis

        nlp_file = nlp_dir / f"{Path(record.filename).stem}_nlp.json"
        with open(nlp_file, "w", encoding="utf-8") as f:
            json.dump(analysis, f, indent=2, default=str)
else:
    log.info("No successful new/changed docs; NLP stage skipped.")

print(f"NLP analysis: {len(nlp_results)} document(s)")

In [None]:
# --- Step 4: Chunk and store in vector DB ---
log.info("Chunking and building vector database...")
step4_t0 = time.perf_counter()
chunker = create_chunker()

if BACKEND == "qdrant":
    log.info("Building Qdrant collection (chunking + embedding + upsert)...")
    vector_count = build_qdrant_collection(
        qdrant_path=QDRANT_PATH,
        chunker=chunker,
        records=records,
        docling_docs=docling_docs,
        nlp_results=nlp_results,
        recreate_collection=REBUILD_INDEX,
    )
    log.info(f"Qdrant: {vector_count} vectors stored")
else:
    log.info("Chunking documents...")
    chunks = chunk_documents(chunker, records, docling_docs, nlp_results)
    log.info(f"Created {len(chunks)} chunks, inserting into ChromaDB...")
    _, collection = create_chroma_collection(db_dir)
    vector_count = insert_chunks(collection, chunks)
    log.info(f"ChromaDB: {vector_count} vectors stored")

step4_elapsed = time.perf_counter() - step4_t0
log.info(f"Vector stage completed in {step4_elapsed:.1f}s")

# --- Step 5: Save manifest ---
manifest_path = save_manifest(output_dir, records, nlp_results)

# --- Step 6: Save pipeline state ---
current_keys = {str(path.resolve()) for path in pdf_files}
for key in list(state_files):
    if key not in current_keys:
        state_files.pop(key, None)

processed_at = datetime.now(timezone.utc).isoformat()
for record in records:
    state_key = str(Path(record.filepath).resolve())
    entry: dict[str, Any] = {
        "filename": record.filename,
        "status": record.status,
        "processed_at": processed_at,
    }
    file_path = Path(record.filepath)
    if file_path.exists():
        try:
            entry.update(file_fingerprint(file_path))
        except OSError:
            pass
    if record.status == "error" and record.error:
        entry["error"] = record.error[:500]
    state_files[state_key] = entry

state_path = save_pipeline_state(output_dir, state)

# --- Summary ---
total_time = sum(r.conversion_time_s for r in records)
print("=" * 60)
print("PIPELINE COMPLETE")
print(f"  PDFs discovered: {len(pdf_files)}")
print(f"  Processed now:   {len(records)}")
print(f"  Skipped cached:  {skipped_unchanged}")
print(f"  Succeeded:       {len(success)}")
print(f"  Failed:          {len(failed)}")
print(f"  Vectors stored:  {vector_count}")
print(f"  Markdown dir:    {md_dir}")
print(f"  Manifest:        {manifest_path}")
print(f"  State:           {state_path}")
print(f"  Vector time:     {step4_elapsed:.1f}s")
print(f"  Total conv time: {total_time:.1f}s")

In [None]:
if failed:
    print("\nFailed files:")
    for record in failed:
        print(f"- {record.filename}")
        print((record.error or "unknown error")[:1000])
        print("-" * 80)
else:
    print("All files processed successfully.")