# Data Engineering Pipelines and Batch Jobs

Scaling Promethium for production data processing.

**Prerequisites:** Experience with notebooks 01-05

**Topics:** Batch processing, parallel execution, result management

In [None]:
# !pip install promethium-seismic==1.0.3

In [None]:
import promethium
from promethium import (
    generate_synthetic_traces,
    add_noise,
    bandpass_filter,
    evaluate_reconstruction,
    set_seed,
)

import numpy as np
import os
import json
from datetime import datetime
from concurrent.futures import ProcessPoolExecutor, as_completed
from tqdm import tqdm

set_seed(42)
print(f"Promethium {promethium.__version__}")

## 1. Batch Processing Framework

In [None]:
def process_single_dataset(data, config):
    """Process a single dataset with given configuration."""
    # Preprocessing
    processed = np.array([
        bandpass_filter(t, config['lowcut'], config['highcut'], config['fs'])
        for t in data
    ])
    
    # Recovery (simplified)
    from scipy.ndimage import gaussian_filter1d
    result = np.array([gaussian_filter1d(t, sigma=config.get('sigma', 2)) for t in processed])
    
    return result

def batch_process(datasets, config, output_dir):
    """Process multiple datasets."""
    os.makedirs(output_dir, exist_ok=True)
    results = []
    
    for i, data in enumerate(tqdm(datasets, desc="Processing")):
        result = process_single_dataset(data, config)
        
        # Save result
        output_path = os.path.join(output_dir, f'result_{i:04d}.npy')
        np.save(output_path, result)
        
        results.append({
            'index': i,
            'input_shape': data.shape,
            'output_path': output_path
        })
    
    return results

## 2. Generate Test Datasets

In [None]:
# Create multiple test datasets
n_datasets = 5
datasets = []

for i in range(n_datasets):
    clean, _ = generate_synthetic_traces(n_traces=50, n_samples=256, seed=42+i)
    noisy = add_noise(clean, noise_level=0.3, seed=42+i)
    datasets.append(noisy)

print(f"Created {len(datasets)} datasets")
print(f"Each dataset: {datasets[0].shape}")

## 3. Run Batch Processing

In [None]:
config = {
    'lowcut': 2.0,
    'highcut': 80.0,
    'fs': 250.0,
    'sigma': 2.0
}

output_dir = './batch_output'
results = batch_process(datasets, config, output_dir)

print(f"\nProcessed {len(results)} datasets")
for r in results:
    print(f"  {r['output_path']}")

## 4. Result Management

In [None]:
# Save processing manifest
manifest = {
    'timestamp': datetime.now().isoformat(),
    'config': config,
    'n_datasets': len(results),
    'outputs': results
}

manifest_path = os.path.join(output_dir, 'manifest.json')
with open(manifest_path, 'w') as f:
    json.dump(manifest, f, indent=2)

print(f"Manifest saved to: {manifest_path}")

In [None]:
# Load and aggregate results
all_results = []
for r in results:
    data = np.load(r['output_path'])
    all_results.append(data)

stacked = np.stack(all_results)
print(f"Aggregated shape: {stacked.shape}")

## 5. Parallel Processing Example

In [None]:
# Note: Parallel processing works best for independent datasets
print("Parallel Processing Pattern:")
print("""
from concurrent.futures import ProcessPoolExecutor

def process_one(args):
    idx, data, config = args
    result = process_single_dataset(data, config)
    return idx, result

with ProcessPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(process_one, (i, d, config)) 
               for i, d in enumerate(datasets)]
    
    for future in as_completed(futures):
        idx, result = future.result()
        np.save(f'output_{idx}.npy', result)
""")

## Summary

This notebook covered:
1. Batch processing framework
2. Result persistence
3. Manifest management
4. Parallel processing patterns