Skip to content

feat: migrate to pull-based Worker + EventHandler pattern#45

Merged
rorybyrne merged 7 commits into
mainfrom
041-feat-refactor-pull-clean
Feb 3, 2026
Merged

feat: migrate to pull-based Worker + EventHandler pattern#45
rorybyrne merged 7 commits into
mainfrom
041-feat-refactor-pull-clean

Conversation

@rorybyrne

Copy link
Copy Markdown
Contributor

Summary

Migrates from push-based EventListener to pull-based Worker + EventHandler pattern. Workers claim events using FOR UPDATE SKIP LOCKED, enabling concurrent processing without coordination.

Changes

  • EventHandler base class with configuration via classvars (__routing_key__, __batch_size__, __poll_interval__, __max_retries__, __claim_timeout__)
  • Simplified Worker accepts handler_type directly, reads config from classvars
  • WorkerPool.register() for handler-based registration
  • Renamed listener/handler/ across all domains
  • VectorIndexHandler (batch_size=100) and KeywordIndexHandler (batch_size=1) with routing key support
  • WorkerConfig as Pydantic model with Field constraints for validation
  • CI runs on PRs to any branch (not just main)

Database Migration

Adds columns to events table: routing_key, retry_count, claimed_at, updated_at with partial indexes.

Test Plan

  • Unit tests (108 tests)
  • Integration tests (15 tests)
  • Lint and type checks pass
  • Manual test: server starts, ingestion works

Replace push-based EventListener with unified pull-based architecture:

- Add EventHandler base class with config via classvars (__routing_key__,
  __batch_size__, __poll_interval__, __max_retries__, __claim_timeout__)
- Simplify Worker to accept handler_type directly, read config from classvars
- Add WorkerPool.register() for handler-based registration
- Rename listener/ to handler/ across all domains (source, validation,
  curation, record, index)
- Add VectorIndexHandler and KeywordIndexHandler with routing key support
- Add database migration for worker columns (routing_key, retry_count,
  claimed_at, updated_at) and partial indexes for efficient claiming

Workers now claim events using FOR UPDATE SKIP LOCKED, enabling concurrent
processing without coordination. Each handler declares its own batch size
and polling configuration.

Test coverage: 108 unit tests, 15 integration tests for event claiming
and concurrent worker behavior.
Use Pydantic Field constraints (ge, gt) for numeric validation instead of
__post_init__. This provides consistent validation behavior with other
domain models and better error messages.
@github-actions

github-actions Bot commented Feb 2, 2026

Copy link
Copy Markdown

Code Coverage

Package Line Rate Complexity Health
. 70% 0
application 0% 0
application.api 100% 0
application.api.rest 0% 0
application.api.v1 0% 0
application.api.v1.routes 0% 0
application.event 100% 0
cli 40% 0
cli.commands 18% 0
cli.util 53% 0
domain 100% 0
domain.auth 100% 0
domain.auth.adapter 100% 0
domain.auth.command 100% 0
domain.auth.event 100% 0
domain.auth.model 100% 0
domain.auth.port 100% 0
domain.auth.query 100% 0
domain.auth.service 100% 0
domain.curation 100% 0
domain.curation.adapter 100% 0
domain.curation.command 100% 0
domain.curation.event 0% 0
domain.curation.handler 0% 0
domain.curation.model 100% 0
domain.curation.port 100% 0
domain.curation.query 100% 0
domain.curation.service 100% 0
domain.deposition 100% 0
domain.deposition.adapter 100% 0
domain.deposition.command 0% 0
domain.deposition.event 100% 0
domain.deposition.model 0% 0
domain.deposition.port 0% 0
domain.deposition.query 100% 0
domain.deposition.service 0% 0
domain.export 100% 0
domain.export.adapter 100% 0
domain.export.command 100% 0
domain.export.event 100% 0
domain.export.model 100% 0
domain.export.port 100% 0
domain.export.query 100% 0
domain.export.service 100% 0
domain.index 100% 0
domain.index.event 100% 0
domain.index.handler 76% 0
domain.index.model 84% 0
domain.index.service 100% 0
domain.record 100% 0
domain.record.adapter 100% 0
domain.record.command 100% 0
domain.record.event 100% 0
domain.record.handler 0% 0
domain.record.model 100% 0
domain.record.port 100% 0
domain.record.query 100% 0
domain.record.service 100% 0
domain.schema 100% 0
domain.schema.adapter 100% 0
domain.schema.command 100% 0
domain.schema.event 100% 0
domain.schema.model 100% 0
domain.schema.port 100% 0
domain.schema.query 100% 0
domain.schema.service 100% 0
domain.search 100% 0
domain.search.adapter 100% 0
domain.search.command 100% 0
domain.search.event 100% 0
domain.search.model 100% 0
domain.search.port 100% 0
domain.search.query 100% 0
domain.search.service 100% 0
domain.shared 74% 0
domain.shared.model 90% 0
domain.shared.port 100% 0
domain.source 100% 0
domain.source.event 100% 0
domain.source.handler 0% 0
domain.source.model 76% 0
domain.source.schedule 0% 0
domain.source.service 92% 0
domain.validation 100% 0
domain.validation.adapter 100% 0
domain.validation.command 0% 0
domain.validation.event 0% 0
domain.validation.handler 0% 0
domain.validation.model 0% 0
domain.validation.port 0% 0
domain.validation.query 100% 0
domain.validation.service 0% 0
infrastructure 100% 0
infrastructure.event 66% 0
infrastructure.index 0% 0
infrastructure.index.vector 76% 0
infrastructure.messaging 100% 0
infrastructure.oci 0% 0
infrastructure.persistence 0% 0
infrastructure.persistence.adapter 0% 0
infrastructure.source 0% 0
sdk 100% 0
sdk.index 100% 0
sdk.source 100% 0
util 100% 0
util.di 8% 0
Summary 39% (1230 / 3133) 0

- Add exponential backoff to event claiming (min(30, 5^retry_count) seconds)
- Rename SkippedEventsError to SkippedEvents (control flow, not error)
- Handlers now raise SkippedEvents when backend unavailable instead of
  silently returning (which incorrectly marked events as delivered)
- Move sleep outside UoW scope to release DB connection during idle time
- Update tests for new handler-based pattern
@rorybyrne

Copy link
Copy Markdown
Contributor Author

@greptile

@greptile-apps

greptile-apps Bot commented Feb 3, 2026

Copy link
Copy Markdown
Contributor

Greptile Overview

Greptile Summary

This PR migrates from push-based EventListener to pull-based Worker and EventHandler pattern, enabling concurrent event processing without coordination.

Key Changes

  • Pull-based claiming: Workers claim events using FOR UPDATE SKIP LOCKED, preventing race conditions between concurrent workers
  • EventHandler pattern: Unified base class replaces separate listener types with configuration via classvars (routing key, batch size, etc.)
  • Routing keys: Events can be routed to specific workers for backend-specific processing
  • Retry with backoff: Failed events retry with exponential backoff up to 30 seconds
  • Stale claim recovery: Background task resets claims from crashed workers every 60s
  • WorkerConfig validation: Converted to Pydantic model with Field constraints

Architecture

The migration enables true concurrent processing:

  1. Multiple workers can claim different events simultaneously
  2. Each backend has its own worker with optimized batch sizes (vector: 100, keyword: 1)
  3. Events are locked at claim time, preventing duplicate processing
  4. Failed events automatically retry with backoff, or become permanently failed after max retries

Database Migration

Adds columns for routing_key, retry_count, claimed_at, updated_at with partial indexes optimized for claiming queries.

Testing

Comprehensive test coverage with 108 unit tests and 15 integration tests, including concurrent worker tests and crash recovery scenarios.

Confidence Score: 4/5

  • Safe to merge with minimal risk - well-tested architectural refactoring with comprehensive test coverage
  • Score reflects solid implementation with thorough testing (123 tests), clear migration path, and proper error handling. One minor style improvement suggested.
  • No files require special attention

Important Files Changed

Filename Overview
server/osa/infrastructure/event/worker.py Migrated to pull-based Worker pattern with EventHandler delegation, FOR UPDATE SKIP LOCKED claiming, and graceful shutdown
server/osa/domain/shared/event.py Added EventHandler base class with classvars for configuration, WorkerConfig as Pydantic model with validation, and ClaimResult dataclass
server/osa/domain/shared/outbox.py Added claim(), mark_failed_with_retry(), and reset_stale_claims() methods for pull-based event processing
server/osa/infrastructure/persistence/repository/event.py Implemented claim() with FOR UPDATE SKIP LOCKED, exponential backoff logic, and retry handling
server/migrations/versions/add_worker_columns.py Added routing_key, retry_count, claimed_at, updated_at columns with partial indexes for efficient claiming queries
server/osa/domain/index/handler/fanout_to_index_backends.py Creates per-backend IndexRecord events with routing keys for independent retry and failure isolation

Sequence Diagram

sequenceDiagram
    participant App as FastAPI App
    participant Pool as WorkerPool
    participant Worker as Worker
    participant Outbox as Outbox
    participant Repo as EventRepository
    participant Handler as EventHandler
    participant DB as PostgreSQL

    App->>Pool: start()
    Pool->>Pool: emit ServerStarted event
    Pool->>Worker: start() (per handler)
    
    loop Poll Loop
        Worker->>Worker: _poll_once()
        Worker->>Outbox: claim(event_types, limit, routing_key)
        Outbox->>Repo: claim()
        Repo->>DB: SELECT ... FOR UPDATE SKIP LOCKED
        Note over DB: Locks pending events<br/>matching routing_key<br/>respecting backoff
        DB-->>Repo: locked rows
        Repo->>DB: UPDATE status='claimed', claimed_at=now
        Repo-->>Outbox: ClaimResult(events)
        Outbox-->>Worker: ClaimResult
        
        alt Events claimed
            Worker->>Handler: handle() or handle_batch()
            Handler-->>Worker: success
            Worker->>Outbox: mark_delivered(event_id)
            Outbox->>Repo: update_status('delivered')
            Worker->>DB: commit()
        else Handler raises SkippedEvents
            Worker->>Outbox: mark_skipped(event_id, reason)
            Outbox->>Repo: update_status('skipped')
            Worker->>DB: commit()
        else Handler raises Exception
            Worker->>Outbox: mark_failed_with_retry(event_id, error, max_retries)
            Outbox->>Repo: mark_failed_with_retry()
            alt retry_count < max_retries
                Repo->>DB: UPDATE status='pending', retry_count++
                Note over DB: Event will be retried<br/>after backoff delay
            else retry_count >= max_retries
                Repo->>DB: UPDATE status='failed' (permanent)
            end
            Worker->>DB: commit()
        else No events
            Worker->>Worker: sleep(poll_interval)
        end
    end
    
    loop Stale Claim Cleanup (every 60s)
        Pool->>Outbox: reset_stale_claims(max_timeout)
        Outbox->>Repo: reset_stale_claims()
        Repo->>DB: UPDATE claimed events older than timeout<br/>SET status='pending'
        Note over DB: Recovers events from<br/>crashed workers
    end
Loading

@greptile-apps greptile-apps Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

6 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Comment thread server/osa/domain/index/handler/fanout_to_index_backends.py Outdated
The routing_key was duplicated: set on the IndexRecord payload and
passed to outbox.append(). Only the DB column (via outbox.append())
is used for routing in claim() - the payload field was never read.
@rorybyrne rorybyrne merged commit de534fd into main Feb 3, 2026
6 checks passed
@rorybyrne rorybyrne deleted the 041-feat-refactor-pull-clean branch February 3, 2026 15:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant