# Batch Processing Guide - Parallel Document Ingestion

This notebook demonstrates how to process multiple documents efficiently using parallel processing optimizations introduced in v4.0.

## What You'll Learn

- ‚úÖ Batch processing multiple documents in parallel
- ‚úÖ Performance optimization settings
- ‚úÖ Environment variable configuration
- ‚úÖ Monitoring batch progress
- ‚úÖ Error handling in batch operations

## Prerequisites

- Azure services configured (Search, Document Intelligence, OpenAI)
- Multiple documents to process (PDFs, DOCX, PPTX)
- `.env` file with credentials

## Step 1: Environment Setup

Load environment variables from `.env` file:

In [None]:
import os
from pathlib import Path
from dotenv import load_dotenv

# Load .env file
env_path = Path("../../.env")
if env_path.exists():
    load_dotenv(dotenv_path=env_path)
    print(f"‚úÖ Loaded environment from: {env_path.absolute()}")
else:
    load_dotenv()
    print("‚úÖ Loaded environment from default location")

# Verify required variables
required_vars = [
    "AZURE_SEARCH_SERVICE",
    "AZURE_SEARCH_INDEX",
    "AZURE_DOCUMENT_INTELLIGENCE_ENDPOINT",
    "AZURE_OPENAI_ENDPOINT"
]

missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
    print(f"\n‚ö†Ô∏è  Missing: {missing_vars}")
else:
    print(f"‚úÖ All required variables set")

## Step 2: Import Ingestor

In [None]:
from ingestor import run_pipeline, create_config, Pipeline
import time

print("‚úÖ Ingestor imported successfully")

## Step 3: Basic Batch Processing

Process multiple documents with default settings:

In [None]:
# Process all PDFs in a directory
start = time.time()

status = await run_pipeline(
    input_glob="../../documents/**/*.pdf"  # Adjust path as needed
)

elapsed = time.time() - start

print(f"\nüìä Batch Processing Results:")
print(f"Documents processed: {status.successful_documents}")
print(f"Documents failed: {status.failed_documents}")
print(f"Total chunks: {status.total_chunks_indexed}")
print(f"Total time: {elapsed:.2f}s")
print(f"Avg time per document: {elapsed / status.successful_documents:.2f}s")

## Step 4: Optimized Batch Processing (RECOMMENDED)

Enable parallel processing for maximum throughput:

In [None]:
# Optimized batch processing with parallel execution
start = time.time()

status = await run_pipeline(
    input_glob="../../documents/**/*.pdf",
    
    # === PARALLEL PROCESSING SETTINGS ===
    performance_max_workers=4,          # Process 4 documents in parallel
    
    # === CONCURRENCY SETTINGS ===
    azure_openai_max_concurrency=10,    # Parallel embedding batches
    azure_di_max_concurrency=5,         # Parallel DI requests
    
    # === OPTIMIZATION FLAGS ===
    use_integrated_vectorization=True   # Server-side embeddings (fastest!)
)

elapsed = time.time() - start

print(f"\nüöÄ OPTIMIZED Batch Processing Results:")
print(f"Documents processed: {status.successful_documents}")
print(f"Documents failed: {status.failed_documents}")
print(f"Total chunks: {status.total_chunks_indexed}")
print(f"Total time: {elapsed:.2f}s")
print(f"Avg time per document: {elapsed / status.successful_documents:.2f}s")
print(f"Throughput: {status.total_chunks_indexed / elapsed:.2f} chunks/sec")

# Show per-document results
print(f"\nPer-document breakdown:")
for result in status.results:
    status_icon = "‚úÖ" if result.success else "‚ùå"
    print(f"  {status_icon} {result.filename}")
    print(f"     Time: {result.processing_time_seconds:.2f}s")
    print(f"     Chunks: {result.chunks_indexed}")
    if not result.success:
        print(f"     Error: {result.error_message}")

## Step 5: Configure from Environment Variables

Set optimization settings in your `.env` file for consistent behavior:

In [None]:
# Create .env file with these settings:
"""
# Batch Processing Optimizations
AZURE_MAX_WORKERS=4
AZURE_OPENAI_MAX_CONCURRENCY=10
AZURE_DI_MAX_CONCURRENCY=5
AZURE_USE_INTEGRATED_VECTORIZATION=true
"""

# Then just load from environment:
from ingestor import PipelineConfig

config = PipelineConfig.from_env()  # Loads all settings from .env

print(f"‚úÖ Configuration loaded from environment:")
print(f"   Max workers: {config.performance.max_workers}")
print(f"   OpenAI concurrency: {config.azure_openai.max_concurrency}")
print(f"   DI concurrency: {config.document_intelligence.max_concurrency}")
print(f"   Integrated vectorization: {config.use_integrated_vectorization}")

# Use the config
pipeline = Pipeline(config)
status = await pipeline.run()
await pipeline.close()

print(f"\n‚úÖ Processed {status.successful_documents} documents using env config")

## Step 6: Different Environment Configurations

Use different `.env` files for different scenarios:

In [None]:
from dotenv import load_dotenv

# Load development settings (conservative)
load_dotenv(dotenv_path="../../.env.development", override=True)
dev_config = PipelineConfig.from_env()
print(f"Development config: max_workers={dev_config.performance.max_workers}")

# Load production settings (optimized)
load_dotenv(dotenv_path="../../.env.production", override=True)
prod_config = PipelineConfig.from_env()
print(f"Production config: max_workers={prod_config.performance.max_workers}")

# Example .env.development:
"""
AZURE_MAX_WORKERS=2
AZURE_OPENAI_MAX_CONCURRENCY=5
AZURE_DI_MAX_CONCURRENCY=3
"""

# Example .env.production:
"""
AZURE_MAX_WORKERS=8
AZURE_OPENAI_MAX_CONCURRENCY=15
AZURE_DI_MAX_CONCURRENCY=8
AZURE_USE_INTEGRATED_VECTORIZATION=true
"""

## Step 7: Performance Comparison

Compare sequential vs parallel processing:

In [None]:
import pandas as pd

# Test different configurations
test_configs = [
    {
        "name": "Sequential (Old)",
        "max_workers": 1,
        "openai_concurrency": 1,
        "integrated_vectorization": False
    },
    {
        "name": "Parallel Batching",
        "max_workers": 1,
        "openai_concurrency": 10,
        "integrated_vectorization": False
    },
    {
        "name": "Parallel Documents",
        "max_workers": 4,
        "openai_concurrency": 10,
        "integrated_vectorization": False
    },
    {
        "name": "Full Optimization",
        "max_workers": 4,
        "openai_concurrency": 10,
        "integrated_vectorization": True
    }
]

results = []

for test in test_configs:
    print(f"\nTesting: {test['name']}...")
    
    start = time.time()
    status = await run_pipeline(
        input_glob="../../sample_documents/*.pdf",  # Small test set
        performance_max_workers=test["max_workers"],
        azure_openai_max_concurrency=test["openai_concurrency"],
        use_integrated_vectorization=test["integrated_vectorization"]
    )
    elapsed = time.time() - start
    
    results.append({
        "Configuration": test["name"],
        "Time (s)": f"{elapsed:.2f}",
        "Documents": status.successful_documents,
        "Chunks": status.total_chunks_indexed,
        "Throughput": f"{status.total_chunks_indexed / elapsed:.2f} chunks/s"
    })
    
    print(f"  ‚úÖ Completed in {elapsed:.2f}s")

# Display comparison
df = pd.DataFrame(results)
print(f"\nüìä Performance Comparison:")
df

## Step 8: Monitoring Batch Progress

Monitor processing in real-time:

In [None]:
from IPython.display import display, HTML
import time

# Create config with progress tracking
config = create_config(
    input_glob="../../documents/**/*.pdf",
    performance_max_workers=4,
    azure_openai_max_concurrency=10,
    use_integrated_vectorization=True
)

# Process with pipeline (gives more control)
pipeline = Pipeline(config)

print("üîÑ Processing documents in parallel...\n")
start = time.time()

status = await pipeline.run()

elapsed = time.time() - start

# Display results
print(f"\n‚úÖ Batch processing complete!")
print(f"   Total time: {elapsed:.2f}s")
print(f"   Successful: {status.successful_documents}")
print(f"   Failed: {status.failed_documents}")
print(f"   Total chunks: {status.total_chunks_indexed}")

await pipeline.close()

## Step 9: Error Handling in Batch Operations

Handle failures gracefully in batch processing:

In [None]:
# Process batch and handle errors
status = await run_pipeline(
    input_glob="../../documents/**/*.pdf",
    performance_max_workers=4,
    azure_openai_max_concurrency=10
)

# Check for failures
if status.failed_documents > 0:
    print(f"‚ö†Ô∏è  {status.failed_documents} document(s) failed:\n")
    
    for result in status.results:
        if not result.success:
            print(f"‚ùå {result.filename}")
            print(f"   Error: {result.error_message}")
            print(f"   Time spent: {result.processing_time_seconds:.2f}s\n")
    
    # Retry failed documents
    print("üîÑ Retrying failed documents...")
    failed_files = [r.filename for r in status.results if not r.success]
    
    # Process failed documents individually with more conservative settings
    for failed_file in failed_files:
        try:
            retry_status = await run_pipeline(
                input_glob=failed_file,
                performance_max_workers=1,
                azure_openai_max_concurrency=3  # Reduce concurrency
            )
            if retry_status.successful_documents > 0:
                print(f"   ‚úÖ Retry succeeded: {failed_file}")
            else:
                print(f"   ‚ùå Retry failed: {failed_file}")
        except Exception as e:
            print(f"   ‚ùå Retry error for {failed_file}: {e}")
else:
    print(f"‚úÖ All {status.successful_documents} documents processed successfully!")

## Step 10: Recommended .env Configuration

Add these settings to your `.env` file for optimal batch processing:

In [None]:
# Recommended .env settings for batch processing
recommended_env = """
# === Azure Services ===
AZURE_SEARCH_SERVICE=your-service
AZURE_SEARCH_INDEX=documents-index
AZURE_SEARCH_KEY=your-key

AZURE_DOCUMENT_INTELLIGENCE_ENDPOINT=https://your-di.cognitiveservices.azure.com/
AZURE_DOCUMENT_INTELLIGENCE_KEY=your-key

AZURE_OPENAI_ENDPOINT=https://your-openai.openai.azure.com
AZURE_OPENAI_EMBEDDING_DEPLOYMENT=text-embedding-ada-002
AZURE_OPENAI_KEY=your-key

# === Batch Processing Optimizations (NEW!) ===
AZURE_MAX_WORKERS=4                          # Process 4 documents in parallel
AZURE_OPENAI_MAX_CONCURRENCY=10              # Parallel embedding batches
AZURE_DI_MAX_CONCURRENCY=5                   # Parallel DI requests
AZURE_USE_INTEGRATED_VECTORIZATION=true      # Server-side embeddings (fastest!)

# === Input/Output ===
AZURE_INPUT_MODE=local                       # or 'blob'
AZURE_LOCAL_GLOB=documents/**/*.pdf          # Recursive glob
AZURE_ARTIFACTS_MODE=blob                    # or 'local'
"""

print("Copy this to your .env file:")
print(recommended_env)

## Summary

You've successfully learned:

‚úÖ **Batch processing** with parallel document execution  
‚úÖ **Performance optimization** with `max_workers` and concurrency settings  
‚úÖ **Environment configuration** with `.env` files  
‚úÖ **Progress monitoring** and error handling  
‚úÖ **Performance comparison** between configurations  

## Performance Improvements

With optimizations enabled:
- **Single 100-page PDF**: 50-60% faster
- **4 x 100-page PDFs**: 75-85% faster
- **Parallel batching**: 3-5x faster embedding generation
- **Integrated vectorization**: Eliminates client-side embedding time

## Next Steps

- **07_performance_tuning.ipynb**: Deep dive into performance optimization
- **06_troubleshooting.ipynb**: Debug rate limits and errors
- **03_advanced_features.ipynb**: Explore advanced configuration options

## Related Documentation

- [QUICK_START_OPTIMIZATIONS.md](../../QUICK_START_OPTIMIZATIONS.md)
- [PARALLEL_OPTIMIZATIONS_APPLIED.md](../../docs/PARALLEL_OPTIMIZATIONS_APPLIED.md)
- [PERFORMANCE_SUMMARY.md](../../docs/PERFORMANCE_SUMMARY.md)