Skip to content

feat: pull-based event processing with Worker + EventHandler pattern#44

Closed
rorybyrne wants to merge 5 commits into
mainfrom
035-feat-redesign-event
Closed

feat: pull-based event processing with Worker + EventHandler pattern#44
rorybyrne wants to merge 5 commits into
mainfrom
035-feat-redesign-event

Conversation

@rorybyrne

Copy link
Copy Markdown
Contributor

Summary

Complete redesign of event processing from push-based EventListener to pull-based Worker + EventHandler pattern. Workers claim events using FOR UPDATE SKIP LOCKED, enabling concurrent processing without coordination.

Key Changes

  • EventHandler base class with configuration via classvars (__routing_key__, __batch_size__, __poll_interval__, __max_retries__, __claim_timeout__)
  • Simplified Worker that accepts handler_type directly and reads config from classvars
  • WorkerPool.register() for handler-based registration
  • Per-backend indexing via routing keys - FanOutToIndexBackends creates one IndexRecord per backend
  • VectorIndexHandler (batch_size=100) and KeywordIndexHandler (batch_size=1) with routing key filtering
  • Fair queuing - round-robin event fetching prevents high-volume event types from starving others
  • Crash recovery - events remain claimed until explicitly marked delivered/failed
  • Stale claim cleanup - periodic task resets claims older than claim_timeout

Architecture

WorkerPool
├── Worker(TriggerInitialSourceRun)     # ServerStarted → SourceRequested
├── Worker(PullFromSource)              # SourceRequested → pulls from source
├── Worker(ValidateDeposition)          # DepositionSubmitted → ValidationCompleted
├── Worker(AutoApproveCuration)         # ValidationCompleted → DepositionApproved
├── Worker(ConvertDepositionToRecord)   # DepositionApproved → RecordPublished
├── Worker(FanOutToIndexBackends)       # RecordPublished → IndexRecord per backend
├── Worker(VectorIndexHandler)          # IndexRecord[routing_key=vector]
└── Worker(KeywordIndexHandler)         # IndexRecord[routing_key=keyword]

Database Migration

Adds columns to events table:

  • routing_key - for per-backend event targeting
  • retry_count - for tracking retry attempts
  • claimed_at - for stale claim detection
  • updated_at - for state tracking

Plus partial indexes for efficient claiming.

Test Plan

  • Unit tests (108 tests)
  • Integration tests (15 tests for claiming and concurrency)
  • Contract tests (worker lifecycle)
  • Lint and type checks pass
  • Manual test: start server, trigger source ingestion, verify records indexed

…upport (#35)

Replace fan-out-in-service pattern with explicit per-backend IndexRecord events
for failure isolation and crash-safe processing.

Key changes:
- Add BatchEventListener protocol for efficient batch processing
- Create IndexRecord event type for per-backend indexing
- Add FanOutToIndexBackends listener (RecordPublished → IndexRecord per backend)
- Add IndexRecordBatch batch listener (groups IndexRecords by backend)
- Remove in-memory buffer from VectorStorageBackend (crash-safe)
- Update BackgroundWorker to detect and dispatch to batch listeners
- Add configurable batch_size to WorkerConfig

This enables:
- Per-backend failure isolation (retry just failed backend)
- Crash-safe processing (events stay in outbox until committed)
- Efficient batch embedding generation
- Clear failure visibility with backend name and record SRN

Closes #35
Follow-up improvements to event processing redesign:

- Add round-robin fair queuing to fetch_pending() for parallel pipeline stages
- Fix source service infinite loop when offset >= limit (remaining <= 0)
- Add OSA_CONFIG_FILE env var to Dockerfile (config was being ignored)
- Reduce log noise: batch summaries at INFO, per-event details at DEBUG
- Add logging to GEO source for UID fetching and limit edge cases
* feat: migrate to pull-based Worker + EventHandler pattern

Replace push-based EventListener with unified pull-based architecture:

- Add EventHandler base class with config via classvars (__routing_key__,
  __batch_size__, __poll_interval__, __max_retries__, __claim_timeout__)
- Simplify Worker to accept handler_type directly, read config from classvars
- Add WorkerPool.register() for handler-based registration
- Rename listener/ to handler/ across all domains (source, validation,
  curation, record, index)
- Add VectorIndexHandler and KeywordIndexHandler with routing key support
- Add database migration for worker columns (routing_key, retry_count,
  claimed_at, updated_at) and partial indexes for efficient claiming

Workers now claim events using FOR UPDATE SKIP LOCKED, enabling concurrent
processing without coordination. Each handler declares its own batch size
and polling configuration.

Test coverage: 108 unit tests, 15 integration tests for event claiming
and concurrent worker behavior.

* refactor: convert WorkerConfig from dataclass to Pydantic model

Use Pydantic Field constraints (ge, gt) for numeric validation instead of
__post_init__. This provides consistent validation behavior with other
domain models and better error messages.

* ci: run CI on PRs to any branch, not just main
@rorybyrne

Copy link
Copy Markdown
Contributor Author

Closing - will re-PR from 041 branch directly against main since PR #40 already merged the original 035 work.

@rorybyrne rorybyrne closed this Feb 2, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant