# Batch Indexing and Archive Processing

This notebook demonstrates production-ready workflows:
- **Batch indexing** directories with many files
- **Archive processing** (.zip, .tar.gz extraction)
- **Performance optimization** for large datasets
- **Error handling** and validation
- **Incremental updates** to existing indexes

## Why Batch Processing?

Real scientific datasets:
- Often contain 100s-1000s of files
- Frequently distributed as archives (Zenodo, Figshare)
- Need robust error handling
- Require progress tracking

## Workflow

```
Data Directory → Validation → Extraction → Companion Discovery → Indexing → Search
     ↓
Archives (.zip) → Extract → Find Data Files → Process Each → Add to Index
```

In [1]:
# Setup
import sys
from pathlib import Path
import time
sys.path.insert(0, str(Path.cwd().parent))

from search_engine import FAIRSearchEngine
from archive_handler import ArchiveHandler, ArchiveAwareIndexer
from file_validator import FileValidator
from metadata_extractors import MetadataExtractor
from companion_finder import CompanionDocFinder

## Example 1: Create Test Dataset with Archives

In [2]:
import netCDF4
import numpy as np
import zipfile
import shutil

# Create test directory
test_dir = Path("batch_test_data")
test_dir.mkdir(exist_ok=True)

# Create multiple NetCDF files
datasets = [
    {'name': 'ocean_temp_jan_2023.nc', 'var': 'temperature', 'value': 15},
    {'name': 'ocean_temp_feb_2023.nc', 'var': 'temperature', 'value': 16},
    {'name': 'ocean_temp_mar_2023.nc', 'var': 'temperature', 'value': 17},
    {'name': 'wind_speed_jan_2023.nc', 'var': 'wind', 'value': 10},
    {'name': 'wind_speed_feb_2023.nc', 'var': 'wind', 'value': 12},
]

print("Creating test dataset...")
for ds_info in datasets:
    filepath = test_dir / ds_info['name']
    
    with netCDF4.Dataset(filepath, 'w') as ds:
        ds.title = f"Test {ds_info['var']} data"
        ds.institution = "Demo Lab"
        
        ds.createDimension('time', 10)
        ds.createDimension('lat', 30)
        ds.createDimension('lon', 40)
        
        var = ds.createVariable(ds_info['var'], 'f4', ('time', 'lat', 'lon'))
        var[:] = np.random.randn(10, 30, 40) + ds_info['value']
    
    print(f"  ✓ {ds_info['name']}")

# Create README
with open(test_dir / "README.md", 'w') as f:
    f.write("# Test Dataset Collection\n\nOcean and atmospheric data for 2023.")

print(f"\n✓ Created {len(datasets)} files in {test_dir}/")

Creating test dataset...
  ✓ ocean_temp_jan_2023.nc
  ✓ ocean_temp_feb_2023.nc
  ✓ ocean_temp_mar_2023.nc
  ✓ wind_speed_jan_2023.nc
  ✓ wind_speed_feb_2023.nc

✓ Created 5 files in batch_test_data/


## Example 2: Create Archive

In [3]:
# Create a .zip archive
archive_path = Path("research_data_2023.zip")

with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as zf:
    # Add all .nc files
    for nc_file in test_dir.glob("*.nc"):
        zf.write(nc_file, f"data/{nc_file.name}")
    
    # Add README
    readme = test_dir / "README.md"
    if readme.exists():
        zf.write(readme, "README.md")

print(f"✓ Created archive: {archive_path}")
print(f"  Size: {archive_path.stat().st_size / 1024:.1f} KB")

# Inspect archive structure
handler = ArchiveHandler()
structure = handler.get_archive_structure(archive_path)

print(f"\nArchive contents:")
print(f"  Total files: {len(structure['files'])}")
print(f"  Data files: {len(structure['data_files'])}")
print(f"  Documentation: {len(structure['companion_files'])}")
print(f"\nData files in archive:")
for df in structure['data_files']:
    print(f"    - {df}")

✓ Created archive: research_data_2023.zip
  Size: 200.7 KB

Archive contents:
  Total files: 6
  Data files: 5
  Documentation: 1

Data files in archive:
    - data/ocean_temp_feb_2023.nc
    - data/wind_speed_feb_2023.nc
    - data/ocean_temp_jan_2023.nc
    - data/wind_speed_jan_2023.nc
    - data/ocean_temp_mar_2023.nc


## Example 3: Batch Validate Before Indexing

In [4]:
# Always validate before indexing
validator = FileValidator()

print("Validating files...")
results = validator.validate_directory(test_dir)

print("\nValidation Results:")
print("=" * 60)
print(f"Total files: {results['total_files']}")
print(f"✓ Valid: {len(results['valid'])}")
print(f"✗ Invalid: {len(results['invalid'])}")

if results['invalid']:
    print("\nInvalid files:")
    for inv in results['invalid']:
        print(f"  - {Path(inv['filepath']).name}")
        print(f"    Issues: {inv['issues']}")
else:
    print("\n✓ All files passed validation!")

Validating files...

Validation Results:
Total files: 5
✓ Valid: 5
✗ Invalid: 0

✓ All files passed validation!


## Example 4: Batch Index Directory

In [5]:
# Initialize search engine
print("Initializing search engine...")
engine = FAIRSearchEngine(load_existing=False)

# Batch index the directory
print(f"\nIndexing directory: {test_dir}")
print("=" * 60)

start_time = time.time()

result = engine.index_directory(
    test_dir,
    validate=True,
    include_companions=True,
    extract_archives=False,  # Just files, not archives yet
    show_progress=True
)

elapsed = time.time() - start_time

print("\nIndexing Results:")
print("=" * 60)
print(f"✓ Successfully indexed: {result['indexed']}")
print(f"✗ Errors: {result['errors']}")
print(f"⏱️  Time: {elapsed:.2f} seconds")
print(f"📊 Speed: {result['indexed'] / elapsed:.1f} files/second")

if result['errors'] > 0:
    print("\nErrors:")
    for error in result['details']['errors'][:5]:
        print(f"  - {error}")

Initializing search engine...
Loading embedding model: sentence-transformers/all-MiniLM-L6-v2
Model loaded. Embedding dimension: 384
Loaded embedding cache: 10 entries
Creating new index...
Initialized FAISS index (dim=384)

Indexing directory: batch_test_data


Indexing files: 100%|█| 5/5 [00:00<00:00, 42.


Indexing Results:
✓ Successfully indexed: 5
✗ Errors: 0
⏱️  Time: 0.15 seconds
📊 Speed: 34.4 files/second





## Example 5: Index Archive (Auto-Extract)

In [6]:
# Index the archive - it will auto-extract
print(f"Indexing archive: {archive_path}")
print("=" * 60)

# Create archive-aware indexer
indexer = ArchiveAwareIndexer(
    metadata_extractor=MetadataExtractor(),
    companion_finder=CompanionDocFinder()
)

# Index the archive
archive_results = indexer.index_path(archive_path, extract_archives=True)

print("\nArchive Indexing Results:")
print("=" * 60)
print(f"Archives processed: {len(archive_results['archives_processed'])}")
print(f"Files indexed: {len(archive_results['indexed_files'])}")
print(f"Errors: {len(archive_results['errors'])}")

print("\nIndexed files from archive:")
for meta in archive_results['indexed_files'][:5]:  # Show first 5
    filename = Path(meta['filepath']).name
    print(f"  - {filename}")
    if 'archive_context' in meta:
        ctx = meta['archive_context']
        print(f"    From: {ctx['from_archive']}")
        print(f"    Path: {ctx['relative_path']}")

# Add to main index
for meta in archive_results['indexed_files']:
    searchable_text = engine.metadata_extractor.create_searchable_text(meta)
    embedding = engine.embedding_generator.encode_single(searchable_text)
    engine.vector_index.add(embedding.reshape(1, -1), [meta])

print(f"\n✓ Added {len(archive_results['indexed_files'])} files from archive to index")

Indexing archive: research_data_2023.zip

Archive Indexing Results:
Archives processed: 1
Files indexed: 5
Errors: 0

Indexed files from archive:
  - ocean_temp_feb_2023.nc
    From: research_data_2023.zip
    Path: data/wind_speed_jan_2023.nc
  - ocean_temp_jan_2023.nc
    From: research_data_2023.zip
    Path: data/wind_speed_jan_2023.nc
  - ocean_temp_mar_2023.nc
    From: research_data_2023.zip
    Path: data/wind_speed_jan_2023.nc
  - wind_speed_feb_2023.nc
    From: research_data_2023.zip
    Path: data/wind_speed_jan_2023.nc
  - wind_speed_jan_2023.nc
    From: research_data_2023.zip
    Path: data/wind_speed_jan_2023.nc

✓ Added 5 files from archive to index


## Example 6: Search Across All Indexed Data

In [7]:
# Test searches
queries = [
    "ocean temperature",
    "wind data",
    "january 2023"
]

for query in queries:
    print(f"\n{'='*60}")
    print(f"Query: '{query}'")
    print('='*60)
    
    results = engine.search(query, top_k=3)
    
    if results:
        for i, result in enumerate(results, 1):
            print(f"\n{i}. {Path(result['filepath']).name}")
            print(f"   Score: {result['similarity_score']:.3f}")
            print(f"   Title: {result.get('title', 'N/A')}")
            
            # Show if from archive
            if 'archive_context' in result:
                ctx = result['archive_context']
                print(f"   📦 From: {ctx['from_archive']}")
    else:
        print("No results found")


Query: 'ocean temperature'

1. ocean_temp_feb_2023.nc
   Score: 0.548
   Title: Test temperature data
   📦 From: research_data_2023.zip

2. ocean_temp_jan_2023.nc
   Score: 0.529
   Title: Test temperature data
   📦 From: research_data_2023.zip

3. ocean_temp_mar_2023.nc
   Score: 0.524
   Title: Test temperature data
   📦 From: research_data_2023.zip

Query: 'wind data'

1. wind_speed_feb_2023.nc
   Score: 0.581
   Title: Test wind data
   📦 From: research_data_2023.zip

2. wind_speed_jan_2023.nc
   Score: 0.558
   Title: Test wind data
   📦 From: research_data_2023.zip

3. wind_speed_feb_2023.nc
   Score: 0.540
   Title: Test wind data

Query: 'january 2023'

1. ocean_temp_feb_2023.nc
   Score: 0.285
   Title: Test temperature data

2. ocean_temp_jan_2023.nc
   Score: 0.272
   Title: Test temperature data

3. ocean_temp_feb_2023.nc
   Score: 0.240
   Title: Test temperature data
   📦 From: research_data_2023.zip


## Example 7: Incremental Updates

Add new files to existing index without rebuilding everything.

In [8]:
# Save current index
print("Saving index...")
engine.save()

stats_before = engine.get_stats()
print(f"Index contains {stats_before['total_vectors']} datasets")

# Create a new file
new_file = test_dir / "ocean_temp_apr_2023.nc"
with netCDF4.Dataset(new_file, 'w') as ds:
    ds.title = "April 2023 Temperature"
    ds.institution = "Demo Lab"
    
    ds.createDimension('time', 10)
    ds.createDimension('lat', 30)
    ds.createDimension('lon', 40)
    
    var = ds.createVariable('temperature', 'f4', ('time', 'lat', 'lon'))
    var[:] = np.random.randn(10, 30, 40) + 18

print(f"\n✓ Created new file: {new_file.name}")

# Load existing index and add new file
print("\nLoading existing index...")
engine2 = FAIRSearchEngine(load_existing=True)

print("Adding new file to index...")
result = engine2.index_file(new_file)

if result.get('success'):
    print(f"✓ Added: {new_file.name}")

# Save updated index
engine2.save()

stats_after = engine2.get_stats()
print(f"\nIndex now contains {stats_after['total_vectors']} datasets")
print(f"Added: {stats_after['total_vectors'] - stats_before['total_vectors']} new file(s)")

Saving index...
Index saved to /home/vastdata/vast-fair-stack/lib/indexes/faiss_index.bin
Index contains 10 datasets

✓ Created new file: ocean_temp_apr_2023.nc

Loading existing index...
Loading embedding model: sentence-transformers/all-MiniLM-L6-v2
Model loaded. Embedding dimension: 384
Loaded embedding cache: 22 entries
Loading existing index...
Initialized FAISS index (dim=384)
Index loaded: {'total_vectors': 10, 'total_metadata': 10, 'unique_files': 10, 'embedding_dim': 384, 'index_type': 'IndexFlatIP'}
Adding new file to index...
✓ Added: ocean_temp_apr_2023.nc
Index saved to /home/vastdata/vast-fair-stack/lib/indexes/faiss_index.bin

Index now contains 11 datasets
Added: 1 new file(s)


## Example 8: Performance Benchmarks

In [9]:
# Benchmark search speed
print("Benchmarking search performance...")
print("=" * 60)

query = "ocean temperature data"
num_searches = 100

start = time.time()
for _ in range(num_searches):
    engine2.search(query, top_k=10)
elapsed = time.time() - start

avg_time_ms = (elapsed / num_searches) * 1000
searches_per_sec = num_searches / elapsed

print(f"Ran {num_searches} searches in {elapsed:.2f} seconds")
print(f"\nPerformance:")
print(f"  Average search time: {avg_time_ms:.2f} ms")
print(f"  Searches per second: {searches_per_sec:.0f}")
print(f"  Target: <200ms per search - {'✓ PASS' if avg_time_ms < 200 else '✗ FAIL'}")

# Index size
stats = engine2.get_stats()
print(f"\nIndex Statistics:")
print(f"  Total datasets: {stats['total_vectors']}")
print(f"  Unique files: {stats['unique_files']}")
print(f"  Cache size: {stats['cache_size']} embeddings")

Benchmarking search performance...
Ran 100 searches in 0.02 seconds

Performance:
  Average search time: 0.16 ms
  Searches per second: 6130
  Target: <200ms per search - ✓ PASS

Index Statistics:
  Total datasets: 11
  Unique files: 11
  Cache size: 24 embeddings


## Example 9: Error Handling and Recovery

In [10]:
# Create a corrupt file to test error handling
corrupt_file = test_dir / "corrupt_data.nc"
with open(corrupt_file, 'w') as f:
    f.write("This is not a valid NetCDF file")

print("Testing error handling with corrupt file...")
print("=" * 60)

# Try to index - should handle gracefully
result = engine2.index_file(corrupt_file, validate=True)

if result.get('error'):
    print(f"✓ Error detected correctly: {result['error']}")
    print("✓ System continues running (no crash)")
else:
    print("⚠️  File was indexed (should have been rejected)")

# Clean up
corrupt_file.unlink()
print("\n✓ Error handling test complete")

Testing error handling with corrupt file...
✓ Error detected correctly: Invalid: File too small (31 bytes)
✓ System continues running (no crash)

✓ Error handling test complete


## Example 10: Production Workflow Summary

In [11]:
# Summary of production workflow
print("Production Workflow Summary")
print("=" * 60)
print("""
1. VALIDATE
   └─ Check file signatures before processing
   └─ Identify corrupt/invalid files early
   
2. EXTRACT METADATA
   └─ Handle minimal metadata gracefully
   └─ Find companion documentation
   
3. PROCESS ARCHIVES
   └─ Auto-extract .zip, .tar.gz files
   └─ Preserve archive context
   
4. BATCH INDEX
   └─ Process directories efficiently
   └─ Show progress for long operations
   └─ Handle errors without crashing
   
5. INCREMENTAL UPDATES
   └─ Add new files without rebuilding
   └─ Fast updates for ongoing projects
   
6. VERIFY
   └─ Test search functionality
   └─ Benchmark performance
   └─ Save index for reuse
""")

final_stats = engine2.get_stats()
print("Final Index State:")
print(f"  📊 Total datasets: {final_stats['total_vectors']}")
print(f"  📁 Unique files: {final_stats['unique_files']}")
print(f"  🤖 Model: {final_stats['model']}")
print(f"  💾 Cache: {final_stats['cache_size']} embeddings")

Production Workflow Summary

1. VALIDATE
   └─ Check file signatures before processing
   └─ Identify corrupt/invalid files early

2. EXTRACT METADATA
   └─ Handle minimal metadata gracefully
   └─ Find companion documentation

3. PROCESS ARCHIVES
   └─ Auto-extract .zip, .tar.gz files
   └─ Preserve archive context

4. BATCH INDEX
   └─ Process directories efficiently
   └─ Show progress for long operations
   └─ Handle errors without crashing

5. INCREMENTAL UPDATES
   └─ Add new files without rebuilding
   └─ Fast updates for ongoing projects

6. VERIFY
   └─ Test search functionality
   └─ Benchmark performance
   └─ Save index for reuse

Final Index State:
  📊 Total datasets: 11
  📁 Unique files: 11
  🤖 Model: sentence-transformers/all-MiniLM-L6-v2
  💾 Cache: 24 embeddings


## Best Practices for Production

### ✅ Do This
1. **Always validate first** - Catch bad files early
2. **Use progress bars** - Monitor long operations
3. **Handle errors gracefully** - Log but don't crash
4. **Incremental updates** - Don't rebuild everything
5. **Test search** - Verify quality after indexing

### ⚠️ Avoid This
1. Indexing without validation
2. Processing archives without checking contents
3. Ignoring error messages
4. Rebuilding index for small updates
5. Not testing search quality

## Performance Expectations

| Operation | Speed | Notes |
|-----------|-------|-------|
| Validation | 1000+ files/sec | Very fast |
| Indexing | 5-15 files/min | Depends on file size |
| Archive extraction | Varies | Depends on size |
| Search | <200ms | Fast even with 10k files |

## Cleanup (Optional)

In [None]:
# Uncomment to clean up test files
# import shutil
# shutil.rmtree(test_dir)
# archive_path.unlink()
# print("✓ Test files cleaned up")

## Next Steps

- **Notebook 06**: Optional LLM enrichment for enhanced metadata
- **Notebook 99**: Complete end-to-end workflows

## Command-Line Usage

For production, use the CLI tools:

```bash
# Batch index
python fair_index.py index /path/to/data --extract-archives

# With validation
python fair_index.py validate /path/to/data
python fair_index.py index /path/to/data

# Check progress
python fair_index.py stats
```