Skip to content

feat: redesign event processing with per-backend indexing and batch support#40

Merged
rorybyrne merged 4 commits into
mainfrom
035-feat-redesign-event
Feb 1, 2026
Merged

feat: redesign event processing with per-backend indexing and batch support#40
rorybyrne merged 4 commits into
mainfrom
035-feat-redesign-event

Conversation

@rorybyrne

Copy link
Copy Markdown
Contributor

Summary

Redesigns the event processing system to address critical reliability issues:

  • Per-backend failure isolation: Replace fan-out-in-service with explicit IndexRecord events per backend, enabling independent retry
  • Crash-safe processing: Remove in-memory buffers; use outbox as durable buffer with event-level batching
  • Efficient batch processing: Add BatchEventListener protocol for batch embedding generation
  • Fair queuing: Round-robin event fetching ensures pipeline stages run in parallel, not waves
  • Better observability: Structured logging with batch summaries at INFO, details at DEBUG

Key Changes

  • Add BatchEventListener protocol and IndexRecord event type
  • Add FanOutToIndexBackends listener (RecordPublished → IndexRecord per backend)
  • Add IndexRecordBatch batch listener (groups by backend, calls ingest_batch)
  • Implement round-robin fair queuing in fetch_pending()
  • Fix source service infinite loop when offset >= limit
  • Add OSA_CONFIG_FILE env var to Dockerfile

Test Plan

  • Unit tests for new listeners and batch processing
  • Integration tests for crash recovery scenarios
  • All 67 tests passing
  • Manual testing with GEO source ingestion

Closes #35

…upport (#35)

Replace fan-out-in-service pattern with explicit per-backend IndexRecord events
for failure isolation and crash-safe processing.

Key changes:
- Add BatchEventListener protocol for efficient batch processing
- Create IndexRecord event type for per-backend indexing
- Add FanOutToIndexBackends listener (RecordPublished → IndexRecord per backend)
- Add IndexRecordBatch batch listener (groups IndexRecords by backend)
- Remove in-memory buffer from VectorStorageBackend (crash-safe)
- Update BackgroundWorker to detect and dispatch to batch listeners
- Add configurable batch_size to WorkerConfig

This enables:
- Per-backend failure isolation (retry just failed backend)
- Crash-safe processing (events stay in outbox until committed)
- Efficient batch embedding generation
- Clear failure visibility with backend name and record SRN

Closes #35
Follow-up improvements to event processing redesign:

- Add round-robin fair queuing to fetch_pending() for parallel pipeline stages
- Fix source service infinite loop when offset >= limit (remaining <= 0)
- Add OSA_CONFIG_FILE env var to Dockerfile (config was being ignored)
- Reduce log noise: batch summaries at INFO, per-event details at DEBUG
- Add logging to GEO source for UID fetching and limit edge cases
@github-actions

github-actions Bot commented Feb 1, 2026

Copy link
Copy Markdown

Code Coverage

Package Line Rate Complexity Health
. 70% 0
application 0% 0
application.api 100% 0
application.api.rest 0% 0
application.api.v1 0% 0
application.api.v1.routes 0% 0
application.event 0% 0
cli 40% 0
cli.commands 18% 0
cli.util 53% 0
domain 100% 0
domain.auth 100% 0
domain.auth.adapter 100% 0
domain.auth.command 100% 0
domain.auth.event 100% 0
domain.auth.model 100% 0
domain.auth.port 100% 0
domain.auth.query 100% 0
domain.auth.service 100% 0
domain.curation 100% 0
domain.curation.adapter 100% 0
domain.curation.command 100% 0
domain.curation.event 0% 0
domain.curation.listener 0% 0
domain.curation.model 100% 0
domain.curation.port 100% 0
domain.curation.query 100% 0
domain.curation.service 100% 0
domain.deposition 100% 0
domain.deposition.adapter 100% 0
domain.deposition.command 0% 0
domain.deposition.event 100% 0
domain.deposition.model 0% 0
domain.deposition.port 0% 0
domain.deposition.query 100% 0
domain.deposition.service 0% 0
domain.export 100% 0
domain.export.adapter 100% 0
domain.export.command 100% 0
domain.export.event 100% 0
domain.export.model 100% 0
domain.export.port 100% 0
domain.export.query 100% 0
domain.export.service 100% 0
domain.index 100% 0
domain.index.event 100% 0
domain.index.listener 100% 0
domain.index.model 84% 0
domain.index.service 100% 0
domain.record 100% 0
domain.record.adapter 100% 0
domain.record.command 100% 0
domain.record.event 100% 0
domain.record.listener 0% 0
domain.record.model 100% 0
domain.record.port 100% 0
domain.record.query 100% 0
domain.record.service 100% 0
domain.schema 100% 0
domain.schema.adapter 100% 0
domain.schema.command 100% 0
domain.schema.event 100% 0
domain.schema.model 100% 0
domain.schema.port 100% 0
domain.schema.query 100% 0
domain.schema.service 100% 0
domain.search 100% 0
domain.search.adapter 100% 0
domain.search.command 100% 0
domain.search.event 100% 0
domain.search.model 100% 0
domain.search.port 100% 0
domain.search.query 100% 0
domain.search.service 100% 0
domain.shared 64% 0
domain.shared.model 90% 0
domain.shared.port 100% 0
domain.source 100% 0
domain.source.event 100% 0
domain.source.listener 0% 0
domain.source.model 76% 0
domain.source.schedule 0% 0
domain.source.service 92% 0
domain.validation 0% 0
domain.validation.adapter 100% 0
domain.validation.command 0% 0
domain.validation.event 0% 0
domain.validation.listener 0% 0
domain.validation.model 0% 0
domain.validation.port 0% 0
domain.validation.query 100% 0
domain.validation.service 0% 0
infrastructure 100% 0
infrastructure.event 0% 0
infrastructure.index 0% 0
infrastructure.index.vector 76% 0
infrastructure.messaging 100% 0
infrastructure.oci 0% 0
infrastructure.persistence 0% 0
infrastructure.persistence.adapter 0% 0
infrastructure.source 0% 0
sdk 100% 0
sdk.index 100% 0
sdk.source 100% 0
util 100% 0
util.di 0% 0
Summary 32% (971 / 3008) 0

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This is a significant architectural redesign of the event processing system to address critical reliability issues. The PR introduces per-backend failure isolation, crash-safe processing, and efficient batch operations. It's well-designed and comprehensively tested with 67 passing tests.

Changes:

  • Adds BatchEventListener protocol for batch event processing
  • Introduces IndexRecord event type for per-backend indexing requests
  • Replaces direct indexing with event-driven fan-out (FanOutToIndexBackendsIndexRecordBatch)
  • Removes in-memory buffers from VectorStorageBackend, making it stateless
  • Implements round-robin fair queuing in event repository
  • Fixes infinite loop in source service when offset >= limit
  • Adds worker configuration and environment variable for Docker

Reviewed changes

Copilot reviewed 28 out of 29 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
server/osa/domain/shared/event.py Added BatchEventListener protocol with metaclass support
server/osa/domain/index/event/index_record.py New event type for per-backend indexing
server/osa/domain/index/listener/fanout_listener.py Fan-out listener creates IndexRecord per backend
server/osa/domain/index/listener/index_batch_listener.py Batch processes IndexRecord events by backend
server/osa/infrastructure/event/worker.py Enhanced to support batch event dispatch
server/osa/infrastructure/persistence/repository/event.py Added round-robin fair queuing
server/osa/infrastructure/index/vector/backend.py Removed in-memory buffer, added ingest_batch
server/osa/domain/index/service/index.py Simplified to query-only operations
server/osa/domain/source/service/source.py Fixed infinite loop when offset >= limit
server/sources/geo_entrez/source.py Added early exit for empty effective_limit
server/osa/config.py Added WorkerConfig for poll settings
server/Dockerfile Updated config file reference and added OSA_CONFIG_FILE
server/pyproject.toml Added aarch64 platform support
server/osa/sdk/index/backend.py Added ingest_batch to protocol, deprecated flush
server/tests/* Comprehensive test coverage for new features

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread server/sources/geo_entrez/source.py Outdated
Comment thread server/Dockerfile
Comment thread server/pyproject.toml
Comment thread server/osa/domain/index/listener/index_batch_listener.py Outdated
@rorybyrne rorybyrne merged commit 5484889 into main Feb 1, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: redesign event processing with per-backend indexing and batch support

2 participants