# Tutorial: Building an Event Processing System

This tutorial walks through building a production-ready task execution system using **Processor/Executor** patterns.

**What You'll Build**: A distributed task executor with:
- Priority-based task scheduling
- Rate limiting and quota management
- Failure handling with retry logic
- Monitoring and observability

**Prerequisites**: Familiarity with async Python and lionherd-core basics (Event, Flow, Pile).

## Introduction

**Problem**: You need to execute thousands of API calls with:
- Rate limits (100 requests per minute)
- Priority (critical tasks first)
- Retry logic (transient failures)
- Observability (track status in real-time)

**Solution**: Processor/Executor pattern provides:
- **Executor**: State management with Flow (O(1) status queries)
- **Processor**: Background processing with priority queue
- **Permission Checks**: Custom rate limiting and quota enforcement
- **Flow Progressions**: 1:1 mapping with EventStatus for observability

In [None]:
# Setup
import asyncio
import time
from typing import Any, ClassVar

from lionherd_core.base import Event, EventStatus, Executor, Processor

## Part 1: Define Your Events

Create custom Event subclass for your domain.

In [None]:
class APICallEvent(Event):
    """Event representing an API call."""

    endpoint: str  # API endpoint to call
    method: str = "GET"  # HTTP method
    payload: dict[str, Any] | None = None  # Request payload
    priority_level: str = "normal"  # Priority: critical, high, normal, low

    async def _invoke(self):
        """Simulate API call."""
        # In production: use httpx or aiohttp
        await asyncio.sleep(0.1)  # Simulate network latency

        # Simulate occasional failures
        if "fail" in self.endpoint:
            raise Exception(f"API error: {self.endpoint}")

        return {"status": 200, "endpoint": self.endpoint, "method": self.method}


# Test event
event = APICallEvent(endpoint="/users/123", method="GET", priority_level="high")
result = await event.invoke()
print(f"Result: {result}")
print(f"Status: {event.status}")
print(f"Duration: {event.execution.duration:.4f}s")

## Part 2: Create Your Processor

Define Processor subclass to handle your events.

In [None]:
class APIProcessor(Processor):
    """Processor for API call events."""

    event_type: ClassVar[type[Event]] = APICallEvent

    # No custom logic yet - basic processor


# Processor will be created by Executor
print(f"Processor handles: {APIProcessor.event_type}")

## Part 3: Create Your Executor

Executor manages Flow-based state and processor lifecycle.

In [None]:
class APIExecutor(Executor):
    """Executor for API calls with Flow-based state tracking."""

    processor_type: ClassVar[type[Processor]] = APIProcessor


# Create executor with processor config
executor = APIExecutor(
    processor_config={
        "queue_capacity": 10,  # Process 10 events per batch
        "capacity_refresh_time": 1.0,  # 1 second between batches
        "concurrency_limit": 5,  # Max 5 concurrent API calls
    },
    name="api_executor",
)

print(f"Executor: {executor}")
print(f"Progressions (status tracking): {[p.name for p in executor.states.progressions]}")

## Part 4: Add and Process Events

Add events to executor and process them.

In [None]:
# Create API call events
endpoints = [
    "/users/1",
    "/posts/42",
    "/comments/99",
    "/users/2",
    "/posts/43",
]

for endpoint in endpoints:
    event = APICallEvent(endpoint=endpoint, method="GET")
    # No priority specified - defaults to created_at timestamp
    await executor.append(event)

print(f"Executor after adding events: {executor}")
print(f"Pending: {len(executor.pending_events)}")

# Start executor and process
await executor.start()
await executor.forward()  # Process one batch
await asyncio.sleep(0.6)  # Wait for completion

print("\nAfter processing:")
print(f"  Pending: {len(executor.pending_events)}")
print(f"  Completed: {len(executor.completed_events)}")
print(f"  Failed: {len(executor.failed_events)}")

# Inspect completed events
for event in executor.completed_events:
    print(f"  ✓ {event.endpoint}: {event.execution.response['status']}")

## Part 5: Add Rate Limiting

Implement rate limiting with custom `request_permission()`.

In [None]:
class RateLimitedAPIProcessor(APIProcessor):
    """API processor with rate limiting."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.request_timestamps: list[float] = []
        self.rate_limit = 3  # Max 3 requests per second
        self.time_window = 1.0  # 1 second window

    async def request_permission(self, **kwargs: Any) -> bool:
        """Check rate limit before processing."""
        now = time.time()

        # Remove timestamps outside time window
        self.request_timestamps = [
            ts for ts in self.request_timestamps if now - ts < self.time_window
        ]

        # Check if under rate limit
        if len(self.request_timestamps) >= self.rate_limit:
            print(f"Rate limit exceeded: {len(self.request_timestamps)}/{self.rate_limit}")
            return False

        # Grant permission and record timestamp
        self.request_timestamps.append(now)
        return True


class RateLimitedExecutor(Executor):
    processor_type: ClassVar[type[Processor]] = RateLimitedAPIProcessor


# Test rate limiting
rate_executor = RateLimitedExecutor(
    processor_config={
        "queue_capacity": 10,
        "capacity_refresh_time": 0.5,
    },
    name="rate_limited_api",
)

# Add 6 events (rate limit: 3/sec)
for i in range(6):
    await rate_executor.append(APICallEvent(endpoint=f"/data/{i}"))

await rate_executor.start()
print("Processing with rate limit (3/sec):")
await rate_executor.forward()
await asyncio.sleep(0.4)

print(f"\nFirst batch: {rate_executor.status_counts()}")
print("(3 completed, 3 pending due to rate limit)")

## Part 6: Handle Failures

Events that fail are tracked in `failed_events` progression.

In [None]:
# Create events with some failures
failure_executor = APIExecutor(
    processor_config={"queue_capacity": 10, "capacity_refresh_time": 0.5}
)

endpoints_with_failures = [
    "/users/1",  # Success
    "/fail/endpoint",  # Fail (contains 'fail')
    "/posts/42",  # Success
    "/fail/another",  # Fail
    "/comments/99",  # Success
]

for endpoint in endpoints_with_failures:
    await failure_executor.append(APICallEvent(endpoint=endpoint))

await failure_executor.start()
await failure_executor.forward()
await asyncio.sleep(0.6)

print("Processing results:")
print(f"  Completed: {len(failure_executor.completed_events)}")
print(f"  Failed: {len(failure_executor.failed_events)}")

# Inspect failures
print("\nFailed events:")
for event in failure_executor.failed_events:
    print(f"  ✗ {event.endpoint}: {event.execution.error}")
    print(f"    Retryable: {event.execution.retryable}")

# Retry failed events
print("\nRetrying failed events...")
for failed_event in failure_executor.failed_events:
    if failed_event.execution.retryable:
        # Create fresh event for retry
        retry_event = failed_event.as_fresh_event()
        # Note: Will still fail if endpoint contains 'fail'
        print(f"  Retry queued: {retry_event.endpoint}")

## Part 7: Priority Processing

Process high-priority events first with custom priority values.

In [None]:
# Priority mapping
PRIORITY_MAP = {
    "critical": 1.0,
    "high": 5.0,
    "normal": 10.0,
    "low": 20.0,
}

priority_executor = APIExecutor(
    processor_config={"queue_capacity": 10, "capacity_refresh_time": 0.5}
)

# Add events with different priorities
tasks = [
    ("low", "/logs/upload"),
    ("critical", "/alerts/fire"),
    ("normal", "/data/sync"),
    ("high", "/payment/process"),
    ("low", "/cache/cleanup"),
]

for priority_level, endpoint in tasks:
    event = APICallEvent(endpoint=endpoint, priority_level=priority_level)
    priority = PRIORITY_MAP[priority_level]
    await priority_executor.append(event, priority=priority)
    print(f"Queued: {endpoint} (priority={priority})")

await priority_executor.start()
await priority_executor.forward()
await asyncio.sleep(0.6)

print("\nProcessing order (by priority):")
for event in priority_executor.completed_events:
    print(f"  {event.priority_level:8s} - {event.endpoint}")

## Part 8: Monitoring and Debugging

Use Flow progressions for real-time monitoring.

In [None]:
# Create monitoring executor
monitor_executor = APIExecutor(
    processor_config={
        "queue_capacity": 5,
        "capacity_refresh_time": 0.3,
        "concurrency_limit": 2,
    },
    name="monitored_api",
)

# Add events
for i in range(8):
    await monitor_executor.append(APICallEvent(endpoint=f"/task/{i}"))

await monitor_executor.start()

# Monitor during processing
print("Real-time monitoring:")
for iteration in range(3):
    await monitor_executor.forward()
    await asyncio.sleep(0.15)  # Partial wait

    counts = monitor_executor.status_counts()
    print(f"\nIteration {iteration + 1}:")
    print(f"  {monitor_executor.inspect_state()}")

    await asyncio.sleep(0.2)  # Wait for completion

# Final state
print("\n" + "=" * 40)
print("Final state:")
print(f"  Total events: {len(monitor_executor.states.items)}")
print(f"  Completed: {len(monitor_executor.completed_events)}")
print(
    f"  Average duration: {sum(e.execution.duration for e in monitor_executor.completed_events) / len(monitor_executor.completed_events):.4f}s"
)

## Part 9: Production Patterns

Best practices for production deployments.

In [None]:
# Pattern 1: Cleanup completed events
cleanup_executor = APIExecutor(processor_config={"queue_capacity": 5, "capacity_refresh_time": 0.5})

for i in range(10):
    await cleanup_executor.append(APICallEvent(endpoint=f"/task/{i}"))

await cleanup_executor.start()
await cleanup_executor.forward()
await asyncio.sleep(0.6)

print(f"Before cleanup: {cleanup_executor}")

# Clean up completed/failed events (free memory)
removed = await cleanup_executor.cleanup_events([EventStatus.COMPLETED, EventStatus.FAILED])
print(f"Removed {removed} events")
print(f"After cleanup: {cleanup_executor}")

# Pattern 2: State persistence
print("\n" + "=" * 40)
print("State persistence:")
state_data = cleanup_executor.states.to_dict()
print(
    f"Serialized state: {len(state_data['items'])} items, {len(state_data['progressions'])} progressions"
)

# Pattern 3: Graceful shutdown
print("\n" + "=" * 40)
print("Graceful shutdown:")
shutdown_executor = APIExecutor(
    processor_config={"queue_capacity": 5, "capacity_refresh_time": 0.5}
)

for i in range(5):
    await shutdown_executor.append(APICallEvent(endpoint=f"/shutdown/{i}"))

# Must call start() to create processor
await shutdown_executor.start()

# Start background processing
exec_task = asyncio.create_task(shutdown_executor.processor.execute())

await asyncio.sleep(0.3)  # Let some process

# Graceful stop
await shutdown_executor.stop()
print(f"Stopped: {shutdown_executor.processor.is_stopped()}")
await exec_task  # Wait for cleanup

print(f"Final state: {shutdown_executor}")

## Conclusion

**What You Built**:
- ✅ Custom Event subclass (APICallEvent)
- ✅ Processor with rate limiting (request_permission)
- ✅ Executor with Flow-based state tracking
- ✅ Priority-based scheduling
- ✅ Failure handling and retry logic
- ✅ Real-time monitoring
- ✅ Production patterns (cleanup, persistence, shutdown)

**Key Takeaways**:
1. **Flow Progressions**: 1:1 mapping with EventStatus enables O(1) queries
2. **Permission Checks**: Override `request_permission()` for custom gating
3. **Priority Queue**: Lower values processed first (customizable per event)
4. **Concurrency Control**: Semaphore limits concurrent executions
5. **State Management**: Serialize Flow for persistence and recovery

**Next Steps**:
- See `notebooks/tutorials/flow_state_tracking.ipynb` for deep dive on Flow progressions
- See `notebooks/references/processor_executor.ipynb` for complete API reference
- Check API docs: `docs/api/processor.md`, `docs/api/executor.md`