# Large Document Splitter

This notebook splits large documents (>100MB or >500 pages) that exceed the limits of Snowflake's `AI_PARSE_DOCUMENT` function.

## Process Flow:
1. Query the `large_document_registry` table for pending documents
2. Download each large document from the external stage
3. Split PDFs into smaller parts (max 400 pages each)
4. **Upload split parts back to the SAME external stage**
5. The existing event-driven pipeline automatically processes the split parts

## How It Works:
- Split parts uploaded to S3 trigger the directory table auto-refresh
- The stream captures the new files
- Tasks automatically run: Parse ‚Üí Classify ‚Üí Extract ‚Üí Chunk
- No manual intervention needed after splitting!

## Requirements:
- Snowflake Container Runtime with Python 3.10+
- pypdf package for PDF splitting
- Write access to the external S3 stage


In [None]:
# ============================================
# CELL 1: Import Libraries and Initialize Session
# ============================================

import io
import os
import json
import tempfile
from datetime import datetime
from typing import List, Dict, Tuple, Optional

# Snowpark imports
from snowflake.snowpark import Session
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.functions import col, lit, current_timestamp
from snowflake.snowpark.types import StructType, StructField, StringType, IntegerType, BinaryType

# PDF processing
try:
    from pypdf import PdfReader, PdfWriter
    print("‚úÖ pypdf imported successfully")
except ImportError:
    print("‚ö†Ô∏è pypdf not found, installing...")
    import subprocess
    subprocess.check_call(['pip', 'install', 'pypdf'])
    from pypdf import PdfReader, PdfWriter
    print("‚úÖ pypdf installed and imported")

# Get active Snowflake session (Container Runtime)
session = get_active_session()
print(f"‚úÖ Connected to Snowflake")
print(f"   Current Database: {session.get_current_database()}")
print(f"   Current Schema: {session.get_current_schema()}")
print(f"   Current Warehouse: {session.get_current_warehouse()}")


In [None]:
# ============================================
# CELL 2: Configuration Constants
# ============================================

# Processing limits (stay under AI_PARSE_DOCUMENT limits)
MAX_PAGES_PER_PART = 400  # AI_PARSE_DOCUMENT limit is 500 pages
MAX_SIZE_BYTES = 100 * 1024 * 1024  # 100MB
MAX_SIZE_PER_PART_MB = 90  # Stay under 100MB limit per part

# Database objects
DATABASE = "document_db"
SCHEMA = "s3_documents"
EXTERNAL_STAGE = f"@{DATABASE}.{SCHEMA}.document_stage"

# Tables
LARGE_DOC_REGISTRY = f"{DATABASE}.{SCHEMA}.large_document_registry"
SPLIT_PARTS_TABLE = f"{DATABASE}.{SCHEMA}.document_split_parts"

# Folder for split documents (within the same external stage)
SPLIT_FOLDER = "split_documents"

print("üìã Configuration:")
print(f"   Max pages per part: {MAX_PAGES_PER_PART}")
print(f"   Max size per part: {MAX_SIZE_PER_PART_MB}MB")
print(f"   External stage: {EXTERNAL_STAGE}")
print(f"   Split folder: {SPLIT_FOLDER}/")
print(f"")
print("üí° Split parts will be uploaded to the SAME external stage")
print("   The event-driven pipeline will automatically process them!")


In [None]:
# ============================================
# CELL 3: Helper Functions - PDF Operations
# ============================================

def get_pdf_info(pdf_bytes: bytes) -> Dict:
    """
    Get information about a PDF file.
    
    Args:
        pdf_bytes: Binary content of the PDF
    
    Returns:
        Dictionary with page count, size, and metadata
    """
    try:
        pdf_stream = io.BytesIO(pdf_bytes)
        reader = PdfReader(pdf_stream)
        
        return {
            'page_count': len(reader.pages),
            'size_bytes': len(pdf_bytes),
            'size_mb': round(len(pdf_bytes) / (1024 * 1024), 2),
            'is_encrypted': reader.is_encrypted,
            'metadata': dict(reader.metadata) if reader.metadata else {}
        }
    except Exception as e:
        return {'error': str(e)}


def split_pdf(pdf_bytes: bytes, max_pages: int = MAX_PAGES_PER_PART) -> List[Tuple[bytes, Dict]]:
    """
    Split a PDF into smaller parts.
    
    Args:
        pdf_bytes: Binary content of the PDF
        max_pages: Maximum pages per split part
    
    Returns:
        List of tuples containing (part_bytes, part_metadata)
    """
    pdf_stream = io.BytesIO(pdf_bytes)
    reader = PdfReader(pdf_stream)
    total_pages = len(reader.pages)
    
    # Calculate number of parts needed
    num_parts = (total_pages + max_pages - 1) // max_pages
    
    parts = []
    
    for part_num in range(num_parts):
        start_page = part_num * max_pages
        end_page = min((part_num + 1) * max_pages, total_pages)
        
        # Create a new PDF writer for this part
        writer = PdfWriter()
        
        for page_idx in range(start_page, end_page):
            writer.add_page(reader.pages[page_idx])
        
        # Write part to bytes
        output_stream = io.BytesIO()
        writer.write(output_stream)
        part_bytes = output_stream.getvalue()
        
        part_metadata = {
            'part_number': part_num + 1,
            'total_parts': num_parts,
            'page_start': start_page + 1,
            'page_end': end_page,
            'total_pages': total_pages,
            'part_size_bytes': len(part_bytes),
            'part_size_mb': round(len(part_bytes) / (1024 * 1024), 2)
        }
        
        parts.append((part_bytes, part_metadata))
        print(f"   Created part {part_num + 1}/{num_parts}: pages {start_page + 1}-{end_page}, size: {part_metadata['part_size_mb']}MB")
    
    return parts

print("‚úÖ PDF helper functions defined")


In [None]:
# ============================================
# CELL 4: Helper Functions - Snowflake Stage Operations
# ============================================

def download_file_from_stage(file_path: str) -> bytes:
    """
    Download a file from the external stage.
    
    Args:
        file_path: Relative path to the file in the stage
    
    Returns:
        Binary content of the file
    """
    with tempfile.TemporaryDirectory() as tmp_dir:
        # Use GET command to download file
        get_query = f"GET {EXTERNAL_STAGE}/{file_path} file://{tmp_dir}/"
        session.sql(get_query).collect()
        
        # Read the downloaded file
        file_name = os.path.basename(file_path)
        local_path = os.path.join(tmp_dir, file_name)
        
        # Handle potential .gz compression from GET
        if os.path.exists(local_path + '.gz'):
            import gzip
            with gzip.open(local_path + '.gz', 'rb') as f:
                return f.read()
        elif os.path.exists(local_path):
            with open(local_path, 'rb') as f:
                return f.read()
        else:
            raise FileNotFoundError(f"Downloaded file not found: {local_path}")


def upload_part_to_external_stage(part_bytes: bytes, part_name: str, subfolder: str = None) -> str:
    """
    Upload a split part back to the SAME external stage.
    The event-driven pipeline will automatically pick it up!
    
    Args:
        part_bytes: Binary content of the part
        part_name: Name for the uploaded file
        subfolder: Optional subfolder within the stage
    
    Returns:
        Stage path of the uploaded file
    """
    with tempfile.TemporaryDirectory() as tmp_dir:
        # Write bytes to temp file
        local_path = os.path.join(tmp_dir, part_name)
        with open(local_path, 'wb') as f:
            f.write(part_bytes)
        
        # Build upload path - same external stage, optional subfolder
        if subfolder:
            upload_path = f"{EXTERNAL_STAGE}/{subfolder}/"
        else:
            upload_path = f"{EXTERNAL_STAGE}/{SPLIT_FOLDER}/"
        
        # Upload to external stage (AUTO_COMPRESS=FALSE to keep PDF intact)
        put_query = f"PUT file://{local_path} {upload_path} AUTO_COMPRESS=FALSE OVERWRITE=TRUE"
        session.sql(put_query).collect()
        
        stage_path = f"{upload_path}{part_name}"
        return stage_path


def remove_original_large_file(file_path: str) -> bool:
    """
    Optionally remove the original large file after successful splitting.
    
    Args:
        file_path: Relative path to the file in the stage
    
    Returns:
        True if removed successfully, False otherwise
    """
    try:
        remove_query = f"REMOVE {EXTERNAL_STAGE}/{file_path}"
        session.sql(remove_query).collect()
        return True
    except Exception as e:
        print(f"   ‚ö†Ô∏è Could not remove original file: {str(e)[:50]}")
        return False

print("‚úÖ Snowflake stage helper functions defined")


In [None]:
# ============================================
# CELL 5: Query Pending Large Documents
# ============================================

def get_pending_large_documents() -> list:
    """
    Query the large_document_registry for documents pending processing.
    
    Returns:
        List of pending document records
    """
    query = f"""
    SELECT 
        registry_id,
        original_file_path,
        original_file_name,
        original_file_size,
        estimated_pages,
        split_status,
        created_timestamp
    FROM {LARGE_DOC_REGISTRY}
    WHERE split_status IN ('pending', 'pending_split', 'requires_external_processing')
      AND split_required = TRUE
    ORDER BY created_timestamp ASC
    """
    
    results = session.sql(query).collect()
    return results

# Get pending documents
pending_docs = get_pending_large_documents()

print(f"\nüìã Found {len(pending_docs)} large documents pending processing:")
print("-" * 80)

for doc in pending_docs:
    size_mb = round(doc['ORIGINAL_FILE_SIZE'] / (1024 * 1024), 2) if doc['ORIGINAL_FILE_SIZE'] else 'N/A'
    print(f"  ‚Ä¢ {doc['ORIGINAL_FILE_NAME']}")
    print(f"    Registry ID: {doc['REGISTRY_ID']}")
    print(f"    Size: {size_mb} MB")
    print(f"    Status: {doc['SPLIT_STATUS']}")
    print()


In [None]:
# ============================================
# CELL 7: Main Processing Function - Split & Upload
# ============================================

# Configuration: Whether to remove original large file after splitting
REMOVE_ORIGINAL_AFTER_SPLIT = False  # Set to True to delete originals

def split_large_document(doc_record: dict) -> Dict:
    """
    Split a large document and upload parts back to the external stage.
    The event-driven pipeline will automatically process the split parts!
    
    Args:
        doc_record: Document record from the registry
    
    Returns:
        Dictionary with processing results
    """
    registry_id = doc_record['REGISTRY_ID']
    file_path = doc_record['ORIGINAL_FILE_PATH']
    file_name = doc_record['ORIGINAL_FILE_NAME']
    
    print(f"\n{'='*60}")
    print(f"Splitting: {file_name}")
    print(f"Registry ID: {registry_id}")
    print(f"{'='*60}")
    
    result = {
        'registry_id': registry_id,
        'file_name': file_name,
        'status': 'pending',
        'parts_created': 0,
        'parts_uploaded': 0
    }
    
    try:
        # Step 1: Update status to processing
        update_registry_status(registry_id, 'splitting')
        
        # Step 2: Download the file from external stage
        print(f"\nüì• Step 1: Downloading file from stage...")
        pdf_bytes = download_file_from_stage(file_path)
        print(f"   Downloaded: {len(pdf_bytes) / (1024*1024):.2f} MB")
        
        # Step 3: Get PDF info
        print(f"\nüìä Step 2: Analyzing PDF...")
        pdf_info = get_pdf_info(pdf_bytes)
        
        if 'error' in pdf_info:
            raise Exception(f"PDF analysis failed: {pdf_info['error']}")
        
        print(f"   Pages: {pdf_info['page_count']}")
        print(f"   Size: {pdf_info['size_mb']} MB")
        print(f"   Encrypted: {pdf_info['is_encrypted']}")
        
        # Update estimated pages
        update_registry_status(registry_id, 'splitting', 
                               estimated_pages=pdf_info['page_count'])
        
        # Check if document actually needs splitting
        needs_split = (pdf_info['page_count'] > MAX_PAGES_PER_PART or 
                       pdf_info['size_mb'] > MAX_SIZE_PER_PART_MB)
        
        if not needs_split:
            print(f"\n‚úÖ Document is within limits - no split needed!")
            print(f"   The existing pipeline can process this file directly.")
            update_registry_status(registry_id, 'no_split_needed', split_count=0)
            result['status'] = 'no_split_needed'
            return result
        
        # Step 4: Split PDF
        print(f"\n‚úÇÔ∏è Step 3: Splitting PDF into parts...")
        parts = split_pdf(pdf_bytes, MAX_PAGES_PER_PART)
        result['parts_created'] = len(parts)
        print(f"   Created {len(parts)} part(s)")
        
        # Step 5: Upload parts back to external stage
        print(f"\nüì§ Step 4: Uploading parts to external stage...")
        print(f"   (Event-driven pipeline will process them automatically!)")
        
        for part_bytes, part_meta in parts:
            part_num = part_meta['part_number']
            total_parts = part_meta['total_parts']
            
            # Generate part filename
            base_name = file_name.rsplit('.', 1)[0]
            part_name = f"{base_name}_part_{part_num}_of_{total_parts}.pdf"
            
            print(f"\n   Uploading part {part_num}/{total_parts}: {part_name}")
            
            # Upload part to EXTERNAL stage (same as source!)
            stage_path = upload_part_to_external_stage(part_bytes, part_name)
            print(f"   üì§ Uploaded to: {stage_path}")
            
            # Insert part record for tracking
            insert_split_part_record(registry_id, part_meta, stage_path)
            result['parts_uploaded'] += 1
        
        # Step 6: Optionally remove the original large file
        if REMOVE_ORIGINAL_AFTER_SPLIT:
            print(f"\nüóëÔ∏è Step 5: Removing original large file...")
            if remove_original_large_file(file_path):
                print(f"   ‚úÖ Original file removed: {file_path}")
            else:
                print(f"   ‚ö†Ô∏è Could not remove original file")
        
        # Step 7: Update registry status
        update_registry_status(registry_id, 'split_uploaded', split_count=len(parts))
        
        result['status'] = 'success'
        
        print(f"\n‚úÖ Successfully split: {file_name}")
        print(f"   ‚Üí {len(parts)} parts uploaded to external stage")
        print(f"   ‚Üí Event-driven pipeline will process them automatically!")
        
    except Exception as e:
        error_msg = str(e)[:500]
        print(f"\n‚ùå Error splitting {file_name}: {error_msg}")
        
        # Update registry with error status
        update_registry_status(registry_id, f'error: {error_msg[:100]}')
        
        result['status'] = 'error'
        result['error'] = error_msg
    
    return result

print("‚úÖ Main split function defined")


In [None]:
# ============================================
# CELL 8: Split All Pending Large Documents
# ============================================

# Process all pending documents
print(f"\n{'#'*60}")
print(f"# SPLITTING LARGE DOCUMENTS")
print(f"# Documents to split: {len(pending_docs)}")
print(f"{'#'*60}")

splitting_results = []

for i, doc in enumerate(pending_docs, 1):
    print(f"\n[{i}/{len(pending_docs)}] Splitting document...")
    result = split_large_document(doc)
    splitting_results.append(result)

# Summary
print(f"\n\n{'='*60}")
print(f"SPLITTING SUMMARY")
print(f"{'='*60}")

success_count = sum(1 for r in splitting_results if r['status'] == 'success')
no_split_count = sum(1 for r in splitting_results if r['status'] == 'no_split_needed')
error_count = sum(1 for r in splitting_results if r['status'] == 'error')
total_parts = sum(r.get('parts_uploaded', 0) for r in splitting_results)

print(f"\nTotal Documents: {len(splitting_results)}")
print(f"  ‚úÖ Split Successfully: {success_count}")
print(f"  ‚è≠Ô∏è  No Split Needed: {no_split_count}")
print(f"  ‚ùå Errors: {error_count}")
print(f"  üìÑ Total Parts Created: {total_parts}")

if splitting_results:
    print(f"\nDetails:")
    for result in splitting_results:
        if result['status'] == 'success':
            status_icon = '‚úÖ'
        elif result['status'] == 'no_split_needed':
            status_icon = '‚è≠Ô∏è'
        else:
            status_icon = '‚ùå'
        
        print(f"  {status_icon} {result['file_name']}")
        if result['status'] == 'success':
            print(f"      ‚Üí {result['parts_created']} parts uploaded to external stage")
        elif result['status'] == 'error':
            print(f"      Error: {result.get('error', 'Unknown')[:80]}...")

print(f"\n{'='*60}")
print(f"üöÄ Event-driven pipeline will automatically process the split parts!")
print(f"   Check parsed_documents table in a few minutes for results.")
print(f"{'='*60}")


In [None]:
# ============================================
# CELL 9: Check Pipeline Progress (Optional)
# ============================================

# The event-driven pipeline runs automatically!
# This cell lets you check the progress of split parts through the pipeline.

print("\n" + "="*60)
print("EVENT-DRIVEN PIPELINE STATUS")
print("="*60)

print("\nüí° Split parts were uploaded to the external stage.")
print("   The following happens automatically:")
print("   1. Directory table auto-refresh detects new files")
print("   2. Stream captures the new split parts")
print("   3. parse_documents_task triggers and parses the parts")
print("   4. classify_documents_task classifies them")
print("   5. extract_documents_task extracts attributes")
print("   6. chunk_documents_task creates searchable chunks")
print("   7. Cortex Search Service updates (within 2 minutes)")

# Check if any split parts have been processed yet
try:
    check_query = f"""
    SELECT 
        'Stream' as stage,
        COUNT(*) as count
    FROM {DATABASE}.{SCHEMA}.new_documents_stream
    WHERE relative_path LIKE '%{SPLIT_FOLDER}%'
       OR relative_path LIKE '%_part_%'
    """
    stream_check = session.sql(check_query).collect()
    
    print(f"\nüìä Current Status:")
    print(f"   Split parts in stream (pending): {stream_check[0]['COUNT']}")
    
    # Check parsed documents for split parts
    parsed_query = f"""
    SELECT COUNT(*) as count
    FROM {DATABASE}.{SCHEMA}.parsed_documents
    WHERE file_path LIKE '%{SPLIT_FOLDER}%'
       OR file_name LIKE '%_part_%'
    """
    parsed_check = session.sql(parsed_query).collect()
    print(f"   Split parts already parsed: {parsed_check[0]['COUNT']}")
    
except Exception as e:
    print(f"\n‚ö†Ô∏è Could not check pipeline status: {str(e)[:50]}")

print("\nüïê Check back in a few minutes for processing to complete.")


In [None]:
# ============================================
# CELL 10: Verify Splitting Results
# ============================================

print("\n" + "="*60)
print("VERIFICATION: LARGE DOCUMENT REGISTRY STATUS")
print("="*60)

# Check registry status
status_query = f"""
SELECT 
    split_status,
    COUNT(*) as count
FROM {LARGE_DOC_REGISTRY}
GROUP BY split_status
ORDER BY count DESC
"""

status_df = session.sql(status_query).to_pandas()
print("\nRegistry Status Summary:")
print(status_df.to_string(index=False))

# Check split parts tracking
print("\n" + "="*60)
print("VERIFICATION: SPLIT PARTS UPLOADED")
print("="*60)

parts_query = f"""
SELECT 
    registry_id,
    COUNT(*) as parts_count,
    SUM(part_size) as total_size_bytes
FROM {SPLIT_PARTS_TABLE}
GROUP BY registry_id
ORDER BY registry_id
"""

try:
    parts_df = session.sql(parts_query).to_pandas()
    if len(parts_df) > 0:
        print("\nSplit Parts by Document:")
        print(parts_df.to_string(index=False))
    else:
        print("\nNo split parts recorded yet.")
except Exception as e:
    print(f"  Could not query split parts: {str(e)[:50]}")

# Check files in external stage split folder
print("\n" + "="*60)
print("VERIFICATION: FILES IN EXTERNAL STAGE")
print("="*60)

try:
    stage_query = f"""
    SELECT relative_path, size, last_modified
    FROM DIRECTORY({EXTERNAL_STAGE})
    WHERE relative_path LIKE '%{SPLIT_FOLDER}%'
       OR relative_path LIKE '%_part_%'
    ORDER BY last_modified DESC
    LIMIT 20
    """
    stage_df = session.sql(stage_query).to_pandas()
    
    if len(stage_df) > 0:
        print(f"\nSplit files in stage ({len(stage_df)} found):")
        print(stage_df.to_string(index=False))
    else:
        print("\nNo split files found in stage yet.")
except Exception as e:
    print(f"  Could not query stage directory: {str(e)[:50]}")


In [None]:
# ============================================
# CELL 11: Optional - Remove Original Large Files
# ============================================

# After splitting is complete and verified, you can optionally remove
# the original large files from the external stage.
# 
# ‚ö†Ô∏è WARNING: Only do this AFTER verifying split parts were uploaded successfully!

REMOVE_ORIGINALS_ENABLED = False  # Set to True to enable removal

if REMOVE_ORIGINALS_ENABLED:
    print("\n" + "="*60)
    print("CLEANUP: REMOVING ORIGINAL LARGE FILES")
    print("="*60)
    
    # Get documents that were successfully split
    completed_query = f"""
    SELECT registry_id, original_file_path, original_file_name, split_count
    FROM {LARGE_DOC_REGISTRY}
    WHERE split_status = 'split_uploaded'
      AND split_count > 0
    """
    
    completed = session.sql(completed_query).collect()
    
    removed_count = 0
    for record in completed:
        registry_id = record['REGISTRY_ID']
        file_path = record['ORIGINAL_FILE_PATH']
        file_name = record['ORIGINAL_FILE_NAME']
        
        print(f"\n  Removing: {file_name}")
        
        if remove_original_large_file(file_path):
            removed_count += 1
            print(f"    ‚úÖ Removed successfully")
            
            # Update registry status
            update_query = f"""
            UPDATE {LARGE_DOC_REGISTRY}
            SET split_status = 'original_removed'
            WHERE registry_id = '{registry_id}'
            """
            session.sql(update_query).collect()
        else:
            print(f"    ‚ö†Ô∏è Could not remove")
    
    print(f"\n‚úÖ Cleanup completed: {removed_count} original files removed")
else:
    print("\n‚ÑπÔ∏è Original file removal is disabled.")
    print("   Set REMOVE_ORIGINALS_ENABLED = True to remove original large files")
    print("   after verifying split parts were uploaded successfully.")


# Splitting Complete! üéâ

## Summary

This notebook has **split large documents** and uploaded the parts back to the **same external stage**. The event-driven pipeline handles everything else automatically!

### What This Notebook Did:
1. ‚úÖ Downloaded large documents from the external stage
2. ‚úÖ Analyzed PDF page counts and sizes  
3. ‚úÖ Split documents exceeding 400 pages into smaller parts
4. ‚úÖ **Uploaded split parts back to the SAME external stage**

### What Happens Automatically Next:
5. üîÑ Directory table auto-refresh detects new files
6. üîÑ Stream captures the new split parts
7. üîÑ `parse_documents_task` parses each part with AI_PARSE_DOCUMENT
8. üîÑ `classify_documents_task` classifies them
9. üîÑ `extract_documents_task` extracts attributes
10. üîÑ `chunk_documents_task` creates searchable chunks
11. üîÑ Cortex Search Service updates (within 2 minutes)

## No Manual Intervention Needed!

The split parts are now in the external stage. The existing event-driven pipeline will:
- Detect them automatically
- Parse, classify, extract, and chunk them
- Make them searchable via Cortex Search

## Scheduling

To run this notebook on a schedule in Snowflake Container Runtime:

### Option 1: Snowflake Notebooks Scheduling
Use the built-in scheduling feature in Snowflake Notebooks UI.

### Option 2: Execute via Stored Procedure
Create a stored procedure that wraps the notebook logic for task scheduling.

## Monitoring

Check the pipeline processing status:

```sql
-- Check large document registry status
SELECT * FROM document_db.s3_documents.large_document_status;

-- Check split parts in the stage
SELECT relative_path, size, last_modified
FROM DIRECTORY(@document_db.s3_documents.document_stage)
WHERE relative_path LIKE '%split_documents%'
   OR relative_path LIKE '%_part_%'
ORDER BY last_modified DESC;

-- Check if split parts have been parsed
SELECT file_name, status, parse_timestamp
FROM document_db.s3_documents.parsed_documents
WHERE file_name LIKE '%_part_%'
ORDER BY parse_timestamp DESC;
```

## Troubleshooting

If splitting fails:
1. Check the `split_status` column in `large_document_registry` for error messages
2. Review the `document_split_parts` table for tracking info
3. Re-run the notebook - it only processes pending documents

If split parts aren't being processed:
1. Check the stream: `SELECT * FROM document_db.s3_documents.new_documents_stream`
2. Check task status: `SHOW TASKS IN SCHEMA document_db.s3_documents`
3. Ensure tasks are resumed: `ALTER TASK parse_documents_task RESUME`
