# Convert EIS Documents to Text using PyMuPDF

This notebook converts PDF documents to text using `pymupdf` (fitz), which provides fast extraction suitable for large document corpora.

**Key Features:**
- Mirrors directory structure from `documents/` to `text_conversions/`
- Very fast: processes hundreds of pages per second
- Handles both digital and scanned PDFs (with OCR fallback if needed)
- Tracks conversion progress to allow resuming
- Parallel processing support

**Output format:** Plain text files (`.txt`) with page breaks indicated.

This replaces the text extraction previously done in `make_filter_text_tables.R`.

In [None]:
# Install required packages if needed
# !pip install pymupdf pandas pyarrow tqdm

In [None]:
import os
import re
import pandas as pd
import fitz  # pymupdf
from pathlib import Path
from tqdm.notebook import tqdm
import logging
from datetime import datetime
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

In [None]:
# Configuration
REPO_ROOT = Path("../").resolve()
METADATA_DIR = REPO_ROOT / "metadata"
DOCUMENTS_DIR = REPO_ROOT / "documents"  # Source PDFs (may be symlink to Box)
OUTPUT_DIR = REPO_ROOT / "text_conversions"  # Output text files

# Input metadata file
DOC_RECORD_FILE = METADATA_DIR / "eis_document_record_api.parquet"

# Conversion tracking file
CONVERSION_STATUS_FILE = METADATA_DIR / "text_conversion_status.pkl"

# Parallel processing settings
NUM_WORKERS = min(8, multiprocessing.cpu_count())  # Adjust based on your machine

print(f"Repository root: {REPO_ROOT}")
print(f"Documents directory: {DOCUMENTS_DIR}")
print(f"Output directory: {OUTPUT_DIR}")
print(f"Parallel workers: {NUM_WORKERS}")
print(f"Is documents symlink: {DOCUMENTS_DIR.is_symlink()}")
if DOCUMENTS_DIR.is_symlink():
    print(f"Symlink target: {DOCUMENTS_DIR.resolve()}")

## Helper Functions

In [48]:
def sanitize_filename(filename: str) -> str:
    """
    Sanitize filename to match existing convention.
    - Remove special characters: ( ) & , ~
    - Replace spaces with underscores
    - Normalize PDF extension
    """
    clean = re.sub(r'[()&,~\/]', '', filename)
    clean = re.sub(r'[\s_]+', '_', clean)
    clean = re.sub(r'\.PDF$', '.pdf', clean, flags=re.IGNORECASE)
    clean = re.sub(r'\.pdf\.pdf$', '.pdf', clean, flags=re.IGNORECASE)
    clean = clean.strip('_')
    return clean


def build_local_filename(ceq_number, original_filename: str) -> str:
    """
    Build the local filename following existing convention.
    Format: {CEQ_NUMBER}_{sanitized_filename}
    """
    sanitized = sanitize_filename(original_filename)
    return f"{ceq_number}_{sanitized}"


def get_year_from_ceq(ceq_number) -> str:
    """Extract year from CEQ Number (first 4 digits)."""
    return str(ceq_number)[:4]

In [None]:
def extract_text_from_pdf(pdf_path: Path) -> tuple[str, int, bool]:
    """
    Extract text from a PDF using pymupdf.
    
    Args:
        pdf_path: Path to the PDF file
    
    Returns:
        Tuple of (extracted_text, num_pages, has_text)
    """
    try:
        doc = fitz.open(pdf_path)
        num_pages = len(doc)
        
        if num_pages == 0:
            doc.close()
            return "", 0, False
        
        text_parts = []
        total_chars = 0
        
        for page_num, page in enumerate(doc):
            page_text = page.get_text()
            total_chars += len(page_text.strip())
            
            # Add page marker and text
            text_parts.append(f"\n\n--- PAGE {page_num + 1} ---\n\n")
            text_parts.append(page_text)
        
        doc.close()
        
        full_text = "".join(text_parts)
        has_text = total_chars > (num_pages * 50)  # At least 50 chars per page average
        
        return full_text, num_pages, has_text
        
    except Exception as e:
        return f"ERROR: {str(e)}", 0, False

In [None]:
def convert_single_pdf(args: tuple) -> dict:
    """
    Convert a single PDF to text. Designed to work with ProcessPoolExecutor.
    
    Args:
        args: Tuple of (pdf_path, output_path, ceq_number, attachment_id)
    
    Returns:
        Dict with conversion status
    """
    pdf_path, output_path, ceq_number, attachment_id = args
    pdf_path = Path(pdf_path)
    output_path = Path(output_path)
    
    try:
        # Extract text
        text, num_pages, has_text = extract_text_from_pdf(pdf_path)
        
        if text.startswith("ERROR:"):
            return {
                "ceqNumber": ceq_number,
                "attachmentId": attachment_id,
                "source_file": str(pdf_path),
                "output_file": None,
                "converted": False,
                "num_pages": 0,
                "has_text": False,
                "error": text,
                "timestamp": datetime.now().isoformat()
            }
        
        # Ensure output directory exists
        output_path.parent.mkdir(parents=True, exist_ok=True)
        
        # Write text file
        with open(output_path, 'w', encoding='utf-8') as f:
            f.write(text)
        
        return {
            "ceqNumber": ceq_number,
            "attachmentId": attachment_id,
            "source_file": str(pdf_path),
            "output_file": str(output_path),
            "converted": True,
            "num_pages": num_pages,
            "has_text": has_text,
            "error": None,
            "timestamp": datetime.now().isoformat()
        }
        
    except Exception as e:
        return {
            "ceqNumber": ceq_number,
            "attachmentId": attachment_id,
            "source_file": str(pdf_path),
            "output_file": None,
            "converted": False,
            "num_pages": 0,
            "has_text": False,
            "error": str(e)[:500],
            "timestamp": datetime.now().isoformat()
        }

## Load Document Records

In [None]:
def load_document_records():
    """Load document records from the API metadata."""
    if DOC_RECORD_FILE.exists():
        return pd.read_parquet(DOC_RECORD_FILE)
    else:
        raise FileNotFoundError(
            f"Document records not found at {DOC_RECORD_FILE}.\n"
            f"Run fetch_eis_records_api.ipynb first."
        )


def load_conversion_status():
    """Load existing conversion status tracking."""
    if CONVERSION_STATUS_FILE.exists():
        return pd.read_pickle(CONVERSION_STATUS_FILE)
    return pd.DataFrame(columns=[
        'ceqNumber', 'attachmentId', 'source_file', 'output_file',
        'converted', 'num_pages', 'has_text', 'error', 'timestamp'
    ])


def save_conversion_status(status_df: pd.DataFrame):
    """Save conversion status tracking."""
    status_df.to_pickle(CONVERSION_STATUS_FILE)
    # Also save CSV for easy inspection
    status_df.to_csv(METADATA_DIR / "text_conversion_status.csv", index=False)

In [None]:
# Load document records
doc_df = load_document_records()
print(f"Loaded {len(doc_df)} document records")

# Add helper columns
doc_df['year'] = doc_df['ceqNumber'].astype(str).str[:4]
doc_df['localFilename'] = doc_df.apply(
    lambda row: build_local_filename(
        row['ceqNumber'], 
        row['name'] or row['fileNameForDownload'] or f"{row['attachmentId']}.pdf"
    ),
    axis=1
)

# Build source and output paths
doc_df['sourcePath'] = doc_df.apply(
    lambda row: DOCUMENTS_DIR / row['year'] / row['localFilename'],
    axis=1
)
doc_df['outputPath'] = doc_df.apply(
    lambda row: OUTPUT_DIR / row['year'] / (row['localFilename'].replace('.pdf', '.txt').replace('.PDF', '.txt')),
    axis=1
)

display(doc_df[['ceqNumber', 'year', 'localFilename', 'sourcePath', 'outputPath']].head())

In [54]:
# Check which source files exist
doc_df['sourceExists'] = doc_df['sourcePath'].apply(lambda p: p.exists())
print(f"Source files found: {doc_df['sourceExists'].sum()} / {len(doc_df)}")

# Check which have already been converted
doc_df['alreadyConverted'] = doc_df['outputPath'].apply(lambda p: p.exists())
print(f"Already converted: {doc_df['alreadyConverted'].sum()}")

# Documents needing conversion
to_convert = doc_df[doc_df['sourceExists'] & ~doc_df['alreadyConverted']].copy()
print(f"\nDocuments to convert: {len(to_convert)}")

print(f"\nBy year:")
print(to_convert['year'].value_counts().sort_index())

Source files found: 45559 / 45704
Already converted: 0

Documents to convert: 45559

By year:
year
1987       2
1988       1
1990       2
1991       4
1992       3
1993       3
1994      10
1995      20
1996      59
1997      82
1998      58
1999     202
2000     369
2001     481
2002     596
2003     562
2004     631
2005     755
2006     712
2007     912
2008     911
2009     894
2010     661
2011     406
2012     991
2013    3007
2014    2918
2015    3991
2016    3651
2017    2697
2018    3239
2019    2865
2020    3280
2021    2309
2022    2206
2023    1633
2024    2498
2025    1938
Name: count, dtype: int64


## Conversion Settings

In [None]:
# ============================================
# CONVERSION SETTINGS - MODIFY AS NEEDED
# ============================================

# Filter by year (set to None to convert all years)
YEAR_FILTER = None
# YEAR_FILTER = [2024, 2025]  # Example: only recent years

# Maximum number of files to convert (set to None for all)
# Useful for testing
MAX_CONVERSIONS = None
# MAX_CONVERSIONS = 100  # Example: test with 100 files

print(f"Settings:")
print(f"  YEAR_FILTER: {YEAR_FILTER}")
print(f"  MAX_CONVERSIONS: {MAX_CONVERSIONS}")
print(f"  NUM_WORKERS: {NUM_WORKERS}")

In [57]:
# Apply filters to conversion queue
conversion_queue = to_convert.copy()

if YEAR_FILTER:
    year_filter_str = [str(y) for y in YEAR_FILTER]
    conversion_queue = conversion_queue[conversion_queue['year'].isin(year_filter_str)]
    print(f"Filtered to years {YEAR_FILTER}: {len(conversion_queue)} documents")

if MAX_CONVERSIONS and len(conversion_queue) > MAX_CONVERSIONS:
    conversion_queue = conversion_queue.head(MAX_CONVERSIONS)
    print(f"Limited to {MAX_CONVERSIONS} conversions")

print(f"\nFinal conversion queue: {len(conversion_queue)} files")

Filtered to years [2024, 2025]: 4436 documents
Limited to 10 conversions

Final conversion queue: 10 files


## Create Output Directory Structure

In [59]:
# Create output directory structure mirroring documents/
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# Get all year directories from documents
year_dirs = [d for d in DOCUMENTS_DIR.iterdir() if d.is_dir() and d.name.isdigit()]
print(f"Found {len(year_dirs)} year directories in documents/")

# Create corresponding directories in marker_conversions/
for year_dir in year_dirs:
    output_year_dir = OUTPUT_DIR / year_dir.name
    output_year_dir.mkdir(exist_ok=True)

print(f"Created directory structure in {OUTPUT_DIR}")

Found 38 year directories in documents/
Created directory structure in /Users/admin-tascott/Documents/GitHub/eis_documents/enepa_repository/marker_conversions


## Run Conversions

In [None]:
def run_conversions(queue: pd.DataFrame, num_workers: int = NUM_WORKERS) -> pd.DataFrame:
    """
    Run text extraction on all files in the queue using parallel processing.
    
    Args:
        queue: DataFrame with sourcePath and outputPath columns
        num_workers: Number of parallel workers
    
    Returns:
        DataFrame with conversion results
    """
    # Prepare arguments for parallel processing
    args_list = [
        (str(row['sourcePath']), str(row['outputPath']), row['ceqNumber'], row['attachmentId'])
        for _, row in queue.iterrows()
    ]
    
    results = []
    
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        # Submit all tasks
        futures = {executor.submit(convert_single_pdf, args): args for args in args_list}
        
        # Process completed tasks with progress bar
        for future in tqdm(as_completed(futures), total=len(futures), desc="Converting"):
            try:
                result = future.result()
                results.append(result)
            except Exception as e:
                args = futures[future]
                results.append({
                    "ceqNumber": args[2],
                    "attachmentId": args[3],
                    "source_file": args[0],
                    "output_file": None,
                    "converted": False,
                    "num_pages": 0,
                    "has_text": False,
                    "error": str(e)[:500],
                    "timestamp": datetime.now().isoformat()
                })
            
            # Save progress periodically (every 500 files)
            if len(results) % 500 == 0:
                temp_results = pd.DataFrame(results)
                existing_status = load_conversion_status()
                combined = pd.concat([existing_status, temp_results], ignore_index=True)
                combined = combined.drop_duplicates(subset=['attachmentId'], keep='last')
                save_conversion_status(combined)
                logger.info(f"Progress saved: {len(results)} files processed")
    
    return pd.DataFrame(results)

In [None]:
# Run the conversions
if len(conversion_queue) > 0:
    print(f"Starting conversion of {len(conversion_queue)} files with {NUM_WORKERS} workers...")
    print(f"Progress is saved every 500 files.")
    
    import time
    start_time = time.time()
    
    conversion_results = run_conversions(conversion_queue)
    
    elapsed = time.time() - start_time
    
    # Merge with existing status
    existing_status = load_conversion_status()
    combined_status = pd.concat([existing_status, conversion_results], ignore_index=True)
    combined_status = combined_status.drop_duplicates(subset=['attachmentId'], keep='last')
    save_conversion_status(combined_status)
    
    # Summary
    success_count = conversion_results['converted'].sum()
    fail_count = len(conversion_results) - success_count
    total_pages = conversion_results['num_pages'].sum()
    low_text_count = (~conversion_results['has_text'] & conversion_results['converted']).sum()
    
    print(f"\n=== Conversion Summary ===")
    print(f"Time elapsed: {elapsed/60:.1f} minutes")
    print(f"Documents processed: {len(conversion_results)}")
    print(f"Successful: {success_count}")
    print(f"Failed: {fail_count}")
    print(f"Total pages: {total_pages:,}")
    print(f"Low/no text (may need OCR): {low_text_count}")
    print(f"Rate: {len(conversion_results)/elapsed:.1f} docs/sec, {total_pages/elapsed:.1f} pages/sec")
    
    if fail_count > 0:
        print(f"\nFailed conversions:")
        display(conversion_results[~conversion_results['converted']][['ceqNumber', 'source_file', 'error']].head(20))
else:
    print("No files to convert. All files already converted or queue is empty.")

In [88]:
conversion_results['source_file'][0]

'/Users/admin-tascott/Documents/GitHub/eis_documents/enepa_repository/documents/2025/20250186_LoMo_FRR_Comprehensive_Study_Draft_Report.pdf'

## Verify Conversions

In [None]:
def verify_conversions():
    """
    Compare expected conversions against existing output files.
    """
    doc_df_full = load_document_records()
    
    # Build expected output paths
    doc_df_full['year'] = doc_df_full['ceqNumber'].astype(str).str[:4]
    doc_df_full['localFilename'] = doc_df_full.apply(
        lambda row: build_local_filename(
            row['ceqNumber'], 
            row['name'] or row['fileNameForDownload'] or f"{row['attachmentId']}.pdf"
        ),
        axis=1
    )
    doc_df_full['sourcePath'] = doc_df_full.apply(
        lambda row: DOCUMENTS_DIR / row['year'] / row['localFilename'],
        axis=1
    )
    doc_df_full['outputPath'] = doc_df_full.apply(
        lambda row: OUTPUT_DIR / row['year'] / (row['localFilename'].replace('.pdf', '.txt').replace('.PDF', '.txt')),
        axis=1
    )
    
    # Check existence
    doc_df_full['sourceExists'] = doc_df_full['sourcePath'].apply(lambda p: p.exists())
    doc_df_full['outputExists'] = doc_df_full['outputPath'].apply(lambda p: p.exists())
    
    # Only count documents where source exists
    with_source = doc_df_full[doc_df_full['sourceExists']]
    
    total = len(with_source)
    converted = with_source['outputExists'].sum()
    remaining = total - converted
    
    print(f"=== Conversion Verification ===")
    print(f"Total source documents: {total}")
    print(f"Converted: {converted} ({100*converted/total:.1f}%)")
    print(f"Remaining: {remaining} ({100*remaining/total:.1f}%)")
    
    print(f"\nBy year:")
    summary = with_source.groupby('year').agg(
        total=('outputExists', 'count'),
        converted=('outputExists', 'sum')
    )
    summary['remaining'] = summary['total'] - summary['converted']
    summary['pct_complete'] = (100 * summary['converted'] / summary['total']).round(1)
    display(summary)
    
    return doc_df_full

verification_df = verify_conversions()

## Retry Failed Conversions

In [None]:
def retry_failed_conversions():
    """
    Retry any previously failed conversions.
    """
    status_df = load_conversion_status()
    failed = status_df[~status_df['converted']]
    
    if len(failed) == 0:
        print("No failed conversions to retry.")
        return
    
    print(f"Retrying {len(failed)} failed conversions...")
    
    # Rebuild queue from failed records
    doc_df_full = load_document_records()
    retry_queue = doc_df_full[doc_df_full['attachmentId'].isin(failed['attachmentId'])].copy()
    
    # Add required columns
    retry_queue['year'] = retry_queue['ceqNumber'].astype(str).str[:4]
    retry_queue['localFilename'] = retry_queue.apply(
        lambda row: build_local_filename(
            row['ceqNumber'], 
            row['name'] or row['fileNameForDownload'] or f"{row['attachmentId']}.pdf"
        ),
        axis=1
    )
    retry_queue['sourcePath'] = retry_queue.apply(
        lambda row: DOCUMENTS_DIR / row['year'] / row['localFilename'],
        axis=1
    )
    retry_queue['outputPath'] = retry_queue.apply(
        lambda row: OUTPUT_DIR / row['year'] / (row['localFilename'].replace('.pdf', '.txt').replace('.PDF', '.txt')),
        axis=1
    )
    
    # Run conversions
    retry_results = run_conversions(retry_queue)
    
    # Update status
    status_df = status_df[~status_df['attachmentId'].isin(retry_results['attachmentId'])]
    combined_status = pd.concat([status_df, retry_results], ignore_index=True)
    save_conversion_status(combined_status)
    
    success_count = retry_results['converted'].sum()
    print(f"Retry complete: {success_count}/{len(retry_results)} successful")

# Uncomment to retry failed conversions:
# retry_failed_conversions()

## Sample Output Inspection

In [None]:
# Inspect a sample converted file
def show_sample_output(n_chars: int = 2000):
    """
    Display the beginning of a sample converted text file.
    """
    txt_files = list(OUTPUT_DIR.glob("**/*.txt"))
    
    if not txt_files:
        print("No converted files found yet.")
        return
    
    # Pick a random file
    import random
    sample_file = random.choice(txt_files)
    
    print(f"Sample file: {sample_file}")
    print(f"File size: {sample_file.stat().st_size:,} bytes")
    print("=" * 60)
    
    with open(sample_file, 'r', encoding='utf-8') as f:
        content = f.read(n_chars)
    
    print(content)
    if len(content) == n_chars:
        print(f"\n... [truncated at {n_chars} chars]")

# Uncomment to see a sample:
# show_sample_output()