# Tutorial 2: Batch Processing with Parallel Nodes

**Difficulty:** Beginner | **Time:** 25 minutes

## Learning Objectives

- Understand batch processing with multiple items
- Choose the right parallel processing strategy
- Implement SequentialNode for ordered processing
- Use MultipleThreadNode for concurrent processing
- Use MultipleProcessNode for CPU-intensive tasks

## Real-World Use Case

Imagine you need to translate a document into multiple languages, process a list of customer reviews, or analyze thousands of images. Doing these tasks one by one would be slow and inefficient. Spark's batch processing nodes allow you to handle multiple items efficiently using different parallelization strategies.

In this tutorial, we'll build a translation service that processes multiple languages simultaneously, demonstrating different approaches to batch processing.

## Core Concepts

### The `process_item()` Method Pattern

Unlike regular nodes that implement `process()`, batch processing nodes implement `process_item()`:

```python
class MyBatchNode(SequentialNode):
    async def process_item(self, item):
        # Process a single item from the batch
        result = await self.process_single_item(item)
        return result
```

### Three Batch Processing Strategies

1. **SequentialNode**: Process items one after another (ordered)
2. **MultipleThreadNode**: Process items concurrently using threads
3. **MultipleProcessNode**: Process items in parallel using separate processes

### When to Use Each Strategy

- **SequentialNode**: When order matters or when dealing with limited resources
- **MultipleThreadNode**: For I/O-bound tasks (API calls, file operations)
- **MultipleProcessNode**: For CPU-intensive tasks (image processing, data analysis)

## Setup

Let's import the necessary classes and set up our environment.

In [None]:
# Import the batch processing node types
from spark.nodes.nodes import SequentialNode, MultipleThreadNode, MultipleProcessNode
from spark.utils import arun
import asyncio
import time
from pathlib import Path
import os

## Example 1: Sequential Processing

Let's start with a simple text processing example using SequentialNode. This processes items one by one, maintaining order.

In [None]:
class SimpleTextProcessor(SequentialNode):
    """Process text items sequentially, maintaining order."""
    
    async def process_item(self, item):
        text = item['text']
        delay = item.get('delay', 0.1)  # Simulate processing time
        
        # Simulate some text processing work
        await asyncio.sleep(delay)
        
        # Process the text (convert to uppercase and add length)
        processed_text = text.upper()
        result = {
            'original': text,
            'processed': processed_text,
            'length': len(text),
            'processing_time': delay
        }
        
        print(f"Processed: '{text}' -> '{processed_text}'")
        return result

# Test data
text_data = [
    {'text': 'hello world', 'delay': 0.1},
    {'text': 'spark framework', 'delay': 0.1},
    {'text': 'batch processing', 'delay': 0.1},
    {'text': 'parallel execution', 'delay': 0.1}
]

# Create and run the sequential processor
processor = SimpleTextProcessor()
start_time = time.time()

print("=== Sequential Processing ===")
results = await processor.do(text_data)

end_time = time.time()
print(f"\nTotal time: {end_time - start_time:.2f} seconds")
print(f"Processed {len(results.content)} items")
print(f"Results: {results.content}")

## Example 2: Multi-Thread Processing

Now let's use MultipleThreadNode to process items concurrently. This is ideal for I/O-bound tasks like API calls or file operations.

In [None]:
class ConcurrentTextProcessor(MultipleThreadNode):
    """Process text items concurrently using multiple threads."""
    
    async def process_item(self, item):
        text = item['text']
        delay = item.get('delay', 0.1)
        
        # Simulate I/O-bound work (like an API call)
        await asyncio.sleep(delay)
        
        # Process the text
        processed_text = text.title()  # Title case instead of uppercase
        result = {
            'original': text,
            'processed': processed_text,
            'length': len(text),
            'thread_id': id(asyncio.current_task()),
            'processing_time': delay
        }
        
        print(f"Thread processed: '{text}' -> '{processed_text}'")
        return result

# Create and run the concurrent processor
concurrent_processor = ConcurrentTextProcessor()
start_time = time.time()

print("=== Multi-Thread Processing ===")
thread_results = await concurrent_processor.do(text_data)

end_time = time.time()
print(f"\nTotal time: {end_time - start_time:.2f} seconds")
print(f"Processed {len(thread_results.content)} items concurrently")
print(f"Results: {thread_results.content}")

## Example 3: Translation Service (Real-World Application)

Let's build a practical translation service that processes multiple languages. We'll simulate translation since we don't want to make actual API calls in this tutorial.

In [None]:
# Mock translation function (simulates API call)
async def mock_translate(text, target_language):
    """Simulate translation by adding language prefix."""
    # Simulate API call delay
    await asyncio.sleep(0.2)
    
    translations = {
        'Spanish': f'[ES] {text}',
        'French': f'[FR] {text}',
        'German': f'[DE] {text}',
        'Chinese': f'[ZH] {text}',
        'Japanese': f'[JA] {text}',
        'Korean': f'[KO] {text}'
    }
    
    return translations.get(target_language, f'[{target_language[:2].upper()}] {text}')

class TranslationProcessor(MultipleThreadNode):
    """Translate text to multiple languages concurrently."""
    
    async def process_item(self, item):
        text = item['text']
        language = item['language']
        
        print(f"Translating to {language}...")
        
        # Translate the text
        translated_text = await mock_translate(text, language)
        
        result = {
            'original': text,
            'language': language,
            'translated': translated_text,
            'timestamp': time.time()
        }
        
        print(f"Translation complete for {language}")
        return result

# Translation data
translation_data = [
    {'text': 'Hello, how are you?', 'language': 'Spanish'},
    {'text': 'Hello, how are you?', 'language': 'French'},
    {'text': 'Hello, how are you?', 'language': 'German'},
    {'text': 'Hello, how are you?', 'language': 'Chinese'},
    {'text': 'Hello, how are you?', 'language': 'Japanese'},
    {'text': 'Hello, how are you?', 'language': 'Korean'}
]

# Create and run the translation service
translator = TranslationProcessor()
start_time = time.time()

print("=== Translation Service (Multi-Thread) ===")
translation_results = await translator.do(translation_data)

end_time = time.time()
print(f"\nTranslation complete!")
print(f"Total time: {end_time - start_time:.2f} seconds")
print(f"Translated to {len(translation_results.content)} languages")

# Display results nicely
print("\nTranslation Results:")
for result in translation_results.content:
    print(f"  {result['language']}: {result['translated']}")

## Example 4: Performance Comparison

Let's compare the performance of Sequential vs Multi-Thread processing with different workloads.

In [None]:
class BenchmarkProcessor:
    """Utility class to compare different processing strategies."""
    
    class Sequential(SequentialNode):
        async def process_item(self, item):
            await asyncio.sleep(item['delay'])
            return {'item': item['id'], 'processed': True}
    
    class Threaded(MultipleThreadNode):
        async def process_item(self, item):
            await asyncio.sleep(item['delay'])
            return {'item': item['id'], 'processed': True}
    
    @staticmethod
    async def run_benchmark(name, processor_class, data):
        print(f"\n=== {name} ===")
        processor = processor_class()
        start_time = time.time()
        results = await processor.do(data)
        end_time = time.time()
        print(f"Time: {end_time - start_time:.2f}s")
        print(f"Results: {len(results.content)} items processed")
        return end_time - start_time

# Create test data with different processing times
test_data = [
    {'id': i, 'delay': 0.1} for i in range(8)
]

print("Performance Comparison: Sequential vs Multi-Thread")
print(f"Test data: {len(test_data)} items, each with 0.1s delay")

# Run benchmarks
sequential_time = await BenchmarkProcessor.run_benchmark(
    "Sequential Processing", 
    BenchmarkProcessor.Sequential, 
    test_data
)

threaded_time = await BenchmarkProcessor.run_benchmark(
    "Multi-Thread Processing", 
    BenchmarkProcessor.Threaded, 
    test_data
)

print(f"\n=== Summary ===")
print(f"Sequential time: {sequential_time:.2f}s")
print(f"Multi-thread time: {threaded_time:.2f}s")
speedup = sequential_time / threaded_time if threaded_time > 0 else float('inf')
print(f"Speedup: {speedup:.1f}x faster")

## Example 5: Error Handling and Failure Strategies

Batch processing nodes support different failure strategies. Let's explore how to handle errors gracefully.

In [None]:
class RobustProcessor(MultipleThreadNode):
    """A processor that demonstrates error handling."""

    async def process_item(self, item):
        try:
            text = item['text']
            
            # Simulate an error for specific items
            if 'error' in text.lower():
                raise ValueError(f"Simulated error processing: {text}")
            
            # Normal processing
            await asyncio.sleep(0.1)
            return {
                'original': text,
                'processed': text.upper(),
                'status': 'success'
            }
            
        except Exception as e:
            print(f"Error processing '{text}': {str(e)}")
            return {
                'original': text,
                'error': str(e),
                'status': 'failed'
            }

# Test data with some items that will fail
test_data_with_errors = [
    {'text': 'normal item 1'},
    {'text': 'item with error'},  # This will fail
    {'text': 'normal item 2'},
    {'text': 'another ERROR'},     # This will fail
    {'text': 'normal item 3'}
]

# Test with error handling
robust_processor = RobustProcessor(failure_strategy='skip_failed')

print("=== Error Handling in Batch Processing ===")
results_with_errors = await robust_processor.do(test_data_with_errors)

print(f"\nResults summary:")
successful = [r for r in results_with_errors.content if r.get('status') == 'success']
failed = [r for r in results_with_errors.content if r.get('status') == 'failed']

print(f"Successful: {len(successful)} items")
print(f"Failed: {len(failed)} items")
print(f"Total processed: {len(results_with_errors.content)} items")

print("\nDetailed results:")
for i, result in enumerate(results_with_errors.content, 1):
    status = result.get('status', 'unknown')
    original = result.get('original', 'unknown')
    if status == 'success':
        print(f"  {i}. {original} -> {result['processed']}")
    else:
        print(f"  {i}. {original} - Error: {result['error']}")

## Key Takeaways

### When to Use Each Node Type

**SequentialNode**
- Use when order matters
- Good for debugging and testing
- Lower resource usage
- Predictable execution time

**MultipleThreadNode**
- Best for I/O-bound tasks (API calls, database operations, file I/O)
- Lower overhead than MultipleProcessNode
- Share memory (but needs thread safety)
- Great for network operations

**MultipleProcessNode**
- Best for CPU-intensive tasks (data processing, image processing, machine learning)
- True parallelism (bypasses GIL)
- Isolated memory (safer)
- Higher overhead
- Requires picklable objects

### Performance Tips

1. **Match the strategy to the workload**: I/O-bound → threads, CPU-bound → processes
2. **Consider overhead**: Don't use parallel processing for very small tasks
3. **Handle errors gracefully**: Use try-catch in `process_item()`
4. **Monitor resource usage**: Don't create too many threads/processes

### Common Patterns

```python
# Pattern 1: API processing
class APIProcessor(MultipleThreadNode):
    async def process_item(self, item):
        result = await api_call(item)
        return result

# Pattern 2: File processing  
class FileProcessor(MultipleProcessNode):
    async def process_item(self, file_path):
        data = load_and_process_file(file_path)  # CPU-intensive
        return data

# Pattern 3: Data transformation
class DataTransformer(SequentialNode):
    async def process_item(self, row):
        transformed = transform_row(row)
        return transformed
```

## Hands-On Exercises

### Exercise 1: Create a Simple Batch Processor

Create a batch processor that calculates the square of numbers. Use SequentialNode first, then modify it to use MultipleThreadNode.

```python
# Your code here
numbers = [{'value': i} for i in range(1, 11)]

class SquareCalculator(SequentialNode):
    async def process_item(self, item):
        # Implement this
        pass
```

In [None]:
# Exercise 1 Solution
class SquareCalculator(SequentialNode):
    async def process_item(self, item):
        value = item['value']
        await asyncio.sleep(0.05)  # Simulate work
        result = {
            'original': value,
            'square': value ** 2,
            'cube': value ** 3
        }
        return result

# Test it
numbers = [{'value': i} for i in range(1, 6)]
calculator = SquareCalculator()
result = await calculator.do(numbers)
print("Square Calculator Results:")
for r in result.content:
    print(f"  {r['original']}^2 = {r['square']}, {r['original']}^3 = {r['cube']}")

### Exercise 2: Convert to Multi-Thread Processing

Modify the SquareCalculator to use MultipleThreadNode and compare performance.

In [None]:
# Exercise 2 Solution
class ThreadedSquareCalculator(MultipleThreadNode):
    async def process_item(self, item):
        value = item['value']
        await asyncio.sleep(0.05)  # Simulate work
        result = {
            'original': value,
            'square': value ** 2,
            'cube': value ** 3
        }
        return result

# Performance comparison
more_numbers = [{'value': i} for i in range(1, 11)]

print("Performance Comparison:")

# Sequential
seq_calc = SquareCalculator()
start = time.time()
seq_results = await seq_calc.do(more_numbers)
seq_time = time.time() - start
print(f"Sequential: {seq_time:.2f}s")

# Threaded
thread_calc = ThreadedSquareCalculator()
start = time.time()
thread_results = await thread_calc.do(more_numbers)
thread_time = time.time() - start
print(f"Threaded: {thread_time:.2f}s")

print(f"Speedup: {seq_time/thread_time:.1f}x")

### Exercise 3: Build a Sentiment Analyzer

Create a batch processor that analyzes sentiment of text messages. Include error handling for malformed input.

In [None]:
# Exercise 3 Solution
class SentimentAnalyzer(MultipleThreadNode):
    """Analyze sentiment of text messages."""
    
    async def process_item(self, item):
        try:
            text = item.get('text', '')
            message_id = item.get('id', 'unknown')
            
            # Validate input
            if not text or not isinstance(text, str):
                raise ValueError(f"Invalid text: {text}")
            
            # Simulate sentiment analysis API call
            await asyncio.sleep(0.1)
            
            # Simple sentiment analysis (mock)
            positive_words = ['good', 'great', 'excellent', 'amazing', 'love', 'happy', 'wonderful']
            negative_words = ['bad', 'terrible', 'awful', 'hate', 'sad', 'horrible', 'worst']
            
            text_lower = text.lower()
            positive_count = sum(1 for word in positive_words if word in text_lower)
            negative_count = sum(1 for word in negative_words if word in text_lower)
            
            if positive_count > negative_count:
                sentiment = 'positive'
                score = positive_count / max(positive_count + negative_count, 1)
            elif negative_count > positive_count:
                sentiment = 'negative'
                score = negative_count / max(positive_count + negative_count, 1)
            else:
                sentiment = 'neutral'
                score = 0.5
            
            return {
                'id': message_id,
                'text': text,
                'sentiment': sentiment,
                'confidence': round(score, 2),
                'word_count': len(text.split())
            }
            
        except Exception as e:
            return {
                'id': item.get('id', 'unknown'),
                'text': item.get('text', ''),
                'error': str(e),
                'status': 'failed'
            }

# Test the sentiment analyzer
messages = [
    {'id': 1, 'text': 'I love this amazing product!'},
    {'id': 2, 'text': 'This is terrible and awful'},
    {'id': 3, 'text': 'It is good but not great'},
    {'id': 4, 'text': ''},  # This should fail
    {'id': 5, 'text': 'Wonderful experience, happy with the result'},
    {'id': 6, 'text': 'Bad bad bad'}
]

analyzer = SentimentAnalyzer()
result = await analyzer.do(messages)

print("Sentiment Analysis Results:")
for result in result.content:
    if 'error' in result:
        print(f"  ID {result['id']}: ERROR - {result['error']}")
    else:
        print(f"  ID {result['id']}: {result['sentiment'].upper()} (confidence: {result['confidence']})")
        print(f"    Text: {result['text']}")

## Summary & Next Steps

### What You Learned

- **Batch Processing Fundamentals**: Understand how to process multiple items efficiently
- **Three Processing Strategies**: SequentialNode, MultipleThreadNode, MultipleProcessNode
- **Real-World Application**: Built a translation service using concurrent processing
- **Performance Optimization**: Compared sequential vs concurrent processing
- **Error Handling**: Implemented robust error handling in batch operations
- **Practical Patterns**: Created sentiment analyzer and other useful processors

### Key Concepts Mastered

- `process_item()` method pattern for batch processing
- Choosing the right strategy for your workload
- Performance considerations and trade-offs
- Error handling and failure strategies
- Real-world applications (translation, sentiment analysis, data processing)

### Next Tutorial

In **Tutorial 3: Simple Flows and Graph Basics**, you'll learn how to:
- Connect multiple nodes into workflows
- Create edges between nodes
- Build and execute graphs
- Pass data between nodes using ExecutionContext

### Continue Your Learning

- Try building your own batch processors for tasks you encounter
- Experiment with different failure strategies
- Compare performance with your own datasets

---

**Congratulations!** You've mastered batch processing with Spark's parallel nodes. You can now efficiently process multiple items concurrently, choose the right processing strategy, and handle errors gracefully. This foundation will be crucial as we move on to building more complex workflows in the next tutorial!