# Processor/Executor - Event Processing with Flow-Based State Tracking

The **Processor/Executor** pattern provides production-grade event processing with:
- **Priority Queue**: Lower priority values processed first
- **Capacity Control**: Rate limiting with refresh intervals
- **Flow-Based State**: O(1) status queries via EventStatus-aligned progressions
- **Background Processing**: Continuous execution with `execute()`
- **Permission Checks**: Override `request_permission()` for custom gating

**Architecture**:
- `Executor`: Flow-based state management (1:1 EventStatus mapping)
- `Processor`: Priority queue + background processing
- `Flow.progressions`: One progression per EventStatus value

In [1]:
# Imports
import asyncio
from typing import Any, ClassVar

from lionpride.core import Event, EventStatus, Executor, Processor


# Define test events
class SimpleEvent(Event):
    """Basic event for testing."""

    task_name: str = "task"
    result: int = 0

    async def _invoke(self):
        await asyncio.sleep(0.01)  # Simulate work
        return self.result


class SimpleProcessor(Processor):
    """Basic processor for SimpleEvent."""

    event_type: ClassVar[type[Event]] = SimpleEvent


class SimpleExecutor(Executor):
    """Basic executor for SimpleEvent."""

    processor_type: ClassVar[type[Processor]] = SimpleProcessor

## 1. Basic Executor Setup

Create executor with processor configuration.

In [2]:
# Create executor with processor config
executor = SimpleExecutor(
    processor_config={
        "queue_capacity": 10,  # Process 10 events per batch
        "capacity_refresh_time": 0.1,  # 100ms between batches
        "concurrency_limit": 5,  # Max 5 concurrent executions
    },
    name="simple_executor",
)

print(f"Executor: {executor}")
print(f"Event type: {executor.event_type}")
print(f"Progressions: {[p.name for p in executor.states.progressions]}")
print(f"\nStatus counts: {executor.status_counts()}")

Executor: Executor(total=0, pending=0, processing=0, completed=0, failed=0, cancelled=0, skipped=0, aborted=0)
Event type: <class '__main__.SimpleEvent'>
Progressions: ['pending', 'processing', 'completed', 'failed', 'cancelled', 'skipped', 'aborted']

Status counts: {'pending': 0, 'processing': 0, 'completed': 0, 'failed': 0, 'cancelled': 0, 'skipped': 0, 'aborted': 0}


## 2. Event Queueing and Priority

Add events with priority values (lower = higher priority).

In [3]:
# Create events with different priorities
events = [
    SimpleEvent(task_name="low_priority", result=1),
    SimpleEvent(task_name="high_priority", result=2),
    SimpleEvent(task_name="medium_priority", result=3),
]

# Add to executor with priorities (lower = processed first)
await executor.append(events[0], priority=10.0)  # Low priority
await executor.append(events[1], priority=1.0)  # High priority
await executor.append(events[2], priority=5.0)  # Medium priority

print(f"Executor after append: {executor}")
print(f"Pending events: {len(executor.pending_events)}")

# Events are in Flow.items pile
for event in executor.pending_events:
    print(f"  - {event.task_name}: {event.id}")

Executor after append: Executor(total=3, pending=3, processing=0, completed=0, failed=0, cancelled=0, skipped=0, aborted=0)
Pending events: 3
  - low_priority: ab8b8904-f441-4cc2-b402-be4f0b23455f
  - high_priority: 8ba64e7e-41b7-4a33-88cf-bfa727ad4ff1
  - medium_priority: d1145746-962b-4348-8145-75cff3aafa2c


## 3. Status Queries (O(1))

Query events by status using Flow progressions.

In [4]:
# Start executor to initialize processor
await executor.start()

# Process events manually (one batch)
await executor.forward()
await asyncio.sleep(0.05)  # Let events complete

# O(1) status queries via Flow progressions
print("Status queries:")
print(f"  Pending: {len(executor.pending_events)}")
print(f"  Processing: {len(executor.processing_events)}")
print(f"  Completed: {len(executor.completed_events)}")
print(f"  Failed: {len(executor.failed_events)}")

# Get events by status
completed = executor.get_events_by_status(EventStatus.COMPLETED)
print("\nCompleted events:")
for event in completed:
    print(f"  - {event.task_name}: result={event.execution.response}")

# Inspect full state
print(f"\n{executor.inspect_state()}")

Status queries:
  Pending: 0
  Processing: 0
  Completed: 3
  Failed: 0

Completed events:
  - low_priority: result=1
  - high_priority: result=2
  - medium_priority: result=3

Executor State (simple_executor):
  pending: 0 events
  processing: 0 events
  completed: 3 events
  failed: 0 events
  cancelled: 0 events
  skipped: 0 events
  aborted: 0 events


## 4. Custom Permission Checks

Override `request_permission()` for rate limiting, auth, quotas.

In [5]:
# Custom processor with rate limiting
class RateLimitedProcessor(SimpleProcessor):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.request_count = 0
        self.max_requests = 2  # Allow only 2 requests

    async def request_permission(self, **kwargs: Any) -> bool:
        # Custom permission check
        if self.request_count >= self.max_requests:
            print(f"Rate limit exceeded: {self.request_count}/{self.max_requests}")
            return False
        self.request_count += 1
        print(f"Permission granted: {self.request_count}/{self.max_requests}")
        return True


class RateLimitedExecutor(Executor):
    processor_type: ClassVar[type[Processor]] = RateLimitedProcessor


# Test rate limiting
rate_executor = RateLimitedExecutor(
    processor_config={
        "queue_capacity": 10,
        "capacity_refresh_time": 0.1,
    }
)

# Add 4 events (only 2 will be processed)
for i in range(4):
    await rate_executor.append(SimpleEvent(task_name=f"task_{i}", result=i))

await rate_executor.start()
await rate_executor.forward()
await asyncio.sleep(0.05)

print(f"\nFinal state: {rate_executor}")
print(f"Completed: {len(rate_executor.completed_events)}")
print(f"Pending: {len(rate_executor.pending_events)}")
# Note: Denied events are requeued with backoff (3 attempts before ABORTED)

Permission granted: 1/2
Permission granted: 2/2
Rate limit exceeded: 2/2

Final state: Executor(total=4, pending=2, processing=0, completed=2, failed=0, cancelled=0, skipped=0, aborted=0)
Completed: 2
Pending: 2


## 5. Concurrency Control

Control concurrent executions with semaphore.

In [6]:
# Track concurrent executions
active_count = 0
max_concurrent = 0


class ConcurrencyTrackingEvent(Event):
    async def _invoke(self):
        global active_count, max_concurrent
        active_count += 1
        max_concurrent = max(max_concurrent, active_count)
        await asyncio.sleep(0.02)  # Simulate work
        active_count -= 1
        return "done"


class ConcurrencyProcessor(Processor):
    event_type: ClassVar[type[Event]] = ConcurrencyTrackingEvent


class ConcurrencyExecutor(Executor):
    processor_type: ClassVar[type[Processor]] = ConcurrencyProcessor


# Test with concurrency_limit=2
conc_executor = ConcurrencyExecutor(
    processor_config={
        "queue_capacity": 10,
        "capacity_refresh_time": 0.1,
        "concurrency_limit": 2,  # Max 2 concurrent
    }
)

# Add 5 events
for _ in range(5):
    await conc_executor.append(ConcurrencyTrackingEvent())

await conc_executor.start()
await conc_executor.forward()
await asyncio.sleep(0.15)  # Wait for completion

print(f"Max concurrent executions: {max_concurrent} (limit was 2)")
print(f"Completed: {len(conc_executor.completed_events)}")

Max concurrent executions: 2 (limit was 2)
Completed: 5


## 6. Streaming Events

Process streaming events with async generators.

In [7]:
# Streaming event
class StreamingEvent(Event):
    chunk_count: int = 3
    streaming: bool = True  # Enable streaming mode

    async def _invoke(self):
        # This won't be called for streaming events
        return "not_used"

    async def _stream(self):
        for i in range(self.chunk_count):
            await asyncio.sleep(0.01)
            yield f"chunk_{i}"


class StreamingProcessor(Processor):
    event_type: ClassVar[type[Event]] = StreamingEvent


class StreamingExecutor(Executor):
    processor_type: ClassVar[type[Processor]] = StreamingProcessor


# Test streaming
stream_executor = StreamingExecutor(
    processor_config={
        "queue_capacity": 5,
        "capacity_refresh_time": 0.1,
    }
)

stream_event = StreamingEvent(chunk_count=3)
await stream_executor.append(stream_event)

await stream_executor.start()
await stream_executor.forward()
await asyncio.sleep(0.05)  # Let stream complete

print(f"Streaming event status: {stream_event.status}")
print(f"Chunks streamed: {stream_event.chunk_count}")
print(f"Completed: {len(stream_executor.completed_events)}")

Streaming event status: pending
Chunks streamed: 3
Completed: 0


  async for _ in event.stream():


## 7. Manual vs Background Processing

Choose between manual batch processing or continuous background execution.

In [8]:
# Manual processing (call forward() explicitly)
manual_executor = SimpleExecutor(
    processor_config={"queue_capacity": 5, "capacity_refresh_time": 0.1}
)

for i in range(3):
    await manual_executor.append(SimpleEvent(task_name=f"manual_{i}"))

await manual_executor.start()
print("Before forward():")
print(f"  Pending: {len(manual_executor.pending_events)}")
print(f"  Completed: {len(manual_executor.completed_events)}")

await manual_executor.forward()  # Process batch manually
await asyncio.sleep(0.05)

print("\nAfter forward():")
print(f"  Pending: {len(manual_executor.pending_events)}")
print(f"  Completed: {len(manual_executor.completed_events)}")

# Background processing (continuous execution)
bg_executor = SimpleExecutor(processor_config={"queue_capacity": 5, "capacity_refresh_time": 0.05})

for i in range(3):
    await bg_executor.append(SimpleEvent(task_name=f"bg_{i}"))

# Must call start() to create processor
await bg_executor.start()

# Start background execution
exec_task = asyncio.create_task(bg_executor.processor.execute())

await asyncio.sleep(0.2)  # Let background processor run
await bg_executor.stop()
await exec_task  # Wait for cleanup

print("\nBackground execution:")
print(f"  Completed: {len(bg_executor.completed_events)}")
print(f"  Processor stopped: {bg_executor.processor.is_stopped()}")

Before forward():
  Pending: 3
  Completed: 0

After forward():
  Pending: 0
  Completed: 3

Background execution:
  Completed: 3
  Processor stopped: True


## 8. State Serialization

Serialize executor state for persistence and recovery.

In [9]:
# Create executor with events
serializable_executor = SimpleExecutor(
    processor_config={"queue_capacity": 5, "capacity_refresh_time": 0.1},
    name="serializable_exec",
)

# Add events and process some
for _ in range(5):
    await serializable_executor.append(SimpleEvent(task_name=f"task_{i}", result=i * 10))

await serializable_executor.start()
await serializable_executor.forward()
await asyncio.sleep(0.05)

print("Original executor:")
print(f"  {serializable_executor}")
print(f"  Status counts: {serializable_executor.status_counts()}")

# Serialize Flow state (events + progressions)
state_data = serializable_executor.states.to_dict()
print(f"\nSerialized state keys: {list(state_data.keys())}")
print(f"  Items: {len(state_data['items'])}")
print(f"  Progressions: {len(state_data['progressions'])}")

# Restore from serialized state
from lionpride.core import Flow

restored_flow = Flow.from_dict(state_data)
print("\nRestored Flow:")
print(f"  Name: {restored_flow.name}")
print(f"  Items: {len(restored_flow.items)}")
print(f"  Progressions: {[p.name for p in restored_flow.progressions]}")

# Verify progression integrity
for status in EventStatus:
    prog = restored_flow.get_progression(status.value)
    events = [restored_flow.items[uid] for uid in prog.order]
    print(f"  {status.value}: {len(events)} events")

Original executor:
  Executor(total=5, pending=0, processing=0, completed=5, failed=0, cancelled=0, skipped=0, aborted=0)
  Status counts: {'pending': 0, 'processing': 0, 'completed': 5, 'failed': 0, 'cancelled': 0, 'skipped': 0, 'aborted': 0}

Serialized state keys: ['id', 'created_at', 'metadata', 'name', 'items', 'progressions']
  Items: 6
  Progressions: 6

Restored Flow:
  Name: serializable_exec
  Items: 5
  Progressions: ['pending', 'processing', 'completed', 'failed', 'cancelled', 'skipped', 'aborted']
  pending: 0 events
  processing: 0 events
  completed: 5 events
  failed: 0 events
  cancelled: 0 events
  skipped: 0 events
  aborted: 0 events


## Summary

**Processor/Executor Pattern**:

1. **Basic Setup**: Executor with processor_config (queue_capacity, capacity_refresh_time, concurrency_limit)
2. **Priority Queue**: Lower priority values processed first (default: created_at timestamp)
3. **O(1) Status Queries**: Flow progressions map 1:1 with EventStatus enum
4. **Permission Checks**: Override `request_permission()` for rate limiting, auth, quotas
5. **Concurrency Control**: Semaphore limits concurrent executions (default: 100)
6. **Streaming Support**: Events with `streaming=True` consume async generators
7. **Processing Modes**: Manual (`forward()`) or background (`execute()`)
8. **Serialization**: Flow.to_dict() captures full state (events + progressions)

**Key Invariants**:
- Events stored in Flow.items (single source of truth)
- Progressions track event status (1:1 mapping with EventStatus)
- Permission denials trigger 3 retries with backoff before ABORTED
- Capacity resets after each batch (controlled by capacity_refresh_time)

**Production Patterns**:
- Use `cleanup_events()` to remove completed/failed events
- Monitor with `status_counts()` and `inspect_state()`
- Implement custom Processor for domain-specific permission checks
- Serialize state periodically for crash recovery