Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Justfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# OSA Monorepo Justfile
# Production deployment and development orchestration commands

mod server

default:
@just --list

Expand Down
3 changes: 2 additions & 1 deletion server/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ COPY migrations/ ./migrations/
COPY alembic.ini ./
COPY entrypoint.sh ./
COPY README.md ./
COPY config.demo.yaml ./config.yaml
COPY osa.yaml ./config.yaml
Comment thread
rorybyrne marked this conversation as resolved.

# Install the project
RUN --mount=type=cache,target=/root/.cache/uv \
Expand Down Expand Up @@ -55,6 +55,7 @@ ENV PATH="/app/.venv/bin:$PATH"
# Create data directory
RUN mkdir -p /data && chown appuser:appuser /data
ENV OSA_DATA_DIR=/data
ENV OSA_CONFIG_FILE=/app/config.yaml

COPY --from=builder --chown=appuser:appuser /app/entrypoint.sh /app/entrypoint.sh
RUN chmod +x /app/entrypoint.sh
Expand Down
11 changes: 11 additions & 0 deletions server/osa/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,23 @@ def file(self) -> str | None:
return os.environ.get("OSA_LOG_FILE")


class WorkerConfig(BaseModel):
"""Background worker configuration (nested in Config, uses env_nested_delimiter).

Controls the behavior of the outbox polling worker that processes events.
"""

poll_interval: float = 0.5 # Seconds between outbox polls
batch_size: int = 100 # Maximum events to fetch per poll cycle


class Config(BaseSettings):
# These are BaseModel, so env_nested_delimiter handles their env vars
server: Server = Server()
frontend: Frontend = Frontend()
database: DatabaseConfig = DatabaseConfig()
logging: LoggingConfig = LoggingConfig()
worker: WorkerConfig = WorkerConfig() # Background worker settings
indexes: list[IndexConfig] = [] # list of index configs
sources: list[SourceConfig] = [] # list of source configs

Expand Down
5 changes: 5 additions & 0 deletions server/osa/domain/index/event/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Index domain events."""

from osa.domain.index.event.index_record import IndexRecord

__all__ = ["IndexRecord"]
26 changes: 26 additions & 0 deletions server/osa/domain/index/event/index_record.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""IndexRecord event - per-backend indexing request for a single record."""

from typing import Any

from osa.domain.shared.event import Event, EventId
from osa.domain.shared.model.srn import RecordSRN


class IndexRecord(Event):
"""Request to index a single record into a specific backend.

This event is created by FanOutToIndexBackends when a RecordPublished
event is received. Each backend gets its own IndexRecord event,
enabling independent retry and failure isolation.

Attributes:
id: Unique event identifier (inherited from Event).
backend_name: Target backend name (e.g., "vector", "keyword").
record_srn: Structured Resource Name of the record.
metadata: Record metadata to index.
"""

id: EventId
backend_name: str
record_srn: RecordSRN
metadata: dict[str, Any]
9 changes: 6 additions & 3 deletions server/osa/domain/index/listener/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""Index domain listeners."""

from osa.domain.index.listener.flush_listener import FlushIndexesOnSourceComplete
from osa.domain.index.listener.index_projector import ProjectNewRecordToIndexes
from osa.domain.index.listener.fanout_listener import FanOutToIndexBackends
from osa.domain.index.listener.index_batch_listener import IndexRecordBatch

__all__ = ["FlushIndexesOnSourceComplete", "ProjectNewRecordToIndexes"]
__all__ = [
"FanOutToIndexBackends",
"IndexRecordBatch",
]
41 changes: 41 additions & 0 deletions server/osa/domain/index/listener/fanout_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""FanOutToIndexBackends - creates per-backend IndexRecord events from RecordPublished."""

import logging
from uuid import uuid4

from osa.domain.index.event.index_record import IndexRecord
from osa.domain.index.model.registry import IndexRegistry
from osa.domain.record.event.record_published import RecordPublished
from osa.domain.shared.event import EventId, EventListener
from osa.domain.shared.outbox import Outbox

logger = logging.getLogger(__name__)


class FanOutToIndexBackends(EventListener[RecordPublished]):
"""Creates per-backend IndexRecord events from RecordPublished.

When a record is published, this listener creates one IndexRecord event
per registered backend. Each IndexRecord is stored in the outbox,
enabling independent retry and failure isolation per backend.

This replaces the previous pattern where a single RecordPublished event
triggered immediate indexing to all backends in a single transaction.
"""

indexes: IndexRegistry
outbox: Outbox

async def handle(self, event: RecordPublished) -> None:
"""Create IndexRecord events for each registered backend."""
backend_names = list(self.indexes)
logger.debug(f"FanOut: {event.record_srn} -> {len(backend_names)} backends")

for backend_name in backend_names:
index_event = IndexRecord(
id=EventId(uuid4()),
backend_name=backend_name,
record_srn=event.record_srn,
metadata=event.metadata,
)
await self.outbox.append(index_event)
21 changes: 0 additions & 21 deletions server/osa/domain/index/listener/flush_listener.py

This file was deleted.

92 changes: 92 additions & 0 deletions server/osa/domain/index/listener/index_batch_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""IndexRecordBatch - batch processes IndexRecord events per backend."""

import logging
from collections import defaultdict

from osa.domain.index.event.index_record import IndexRecord
from osa.domain.index.model.registry import IndexRegistry
from osa.domain.shared.error import SkippedEventsError
from osa.domain.shared.event import BatchEventListener

logger = logging.getLogger(__name__)


class IndexRecordBatch(BatchEventListener[IndexRecord]):
"""Batch processes IndexRecord events by grouping per backend.

The BackgroundWorker groups IndexRecord events and calls handle_batch()
with all events of this type. This listener further groups events by
backend_name and calls ingest_batch() on each backend.

This enables:
- Efficient batch embedding generation
- Crash-safe processing (events remain in outbox until committed)

Note: If a backend is not found (e.g., removed from config), raises
SkippedEventsError so those events are marked as skipped rather than failed.
"""

indexes: IndexRegistry

async def handle_batch(self, events: list[IndexRecord]) -> None:
"""Process a batch of IndexRecord events grouped by backend.

Args:
events: List of IndexRecord events to process

Raises:
SkippedEventsError: If backend not found (events should be skipped)
RuntimeError: If backend fails to index (events will be retried)
"""
if not events:
return

# Group events by backend
by_backend: dict[str, list[IndexRecord]] = defaultdict(list)
for event in events:
by_backend[event.backend_name].append(event)

logger.debug(
f"IndexRecordBatch: grouping {len(events)} events for {len(by_backend)} backends"
)

# Process each backend's batch
for backend_name, backend_events in by_backend.items():
backend = self.indexes.get(backend_name)
if backend is None:
# Backend not found (may have been removed from config)
# Raise SkippedEventsError so events are marked as skipped, not failed
record_srns = [str(e.record_srn) for e in backend_events]
reason = (
f"Backend '{backend_name}' not found (may have been removed). "
f"Skipping {len(backend_events)} events. "
f"Records: {record_srns[:5]}{'...' if len(record_srns) > 5 else ''}"
)
logger.error(reason)
raise SkippedEventsError(
event_ids=[e.id for e in backend_events],
reason=reason,
)

# Prepare records for batch ingestion
records = [(str(event.record_srn), event.metadata) for event in backend_events]

logger.debug(
f"Batch indexing {len(records)} records to backend '{backend_name}' "
f"(batch efficiency: {len(records)} records in single call)"
)

try:
await backend.ingest_batch(records)
logger.debug(f"Indexed {len(records)} records to backend '{backend_name}'")
except Exception as e:
# Enhanced error with backend name and record SRNs (T025, T026)
record_srns = [srn for srn, _ in records]
error_context = (
f"Backend '{backend_name}' failed to index {len(records)} records. "
f"Records: {record_srns[:3]}{'...' if len(record_srns) > 3 else ''}. "
f"Error: {e}"
)
logger.error(error_context)
# Re-raise with context so worker can record in delivery_error
raise RuntimeError(error_context) from e
21 changes: 0 additions & 21 deletions server/osa/domain/index/listener/index_projector.py

This file was deleted.

66 changes: 27 additions & 39 deletions server/osa/domain/index/service/index.py
Original file line number Diff line number Diff line change
@@ -1,60 +1,48 @@
"""IndexService - orchestrates indexing of records into storage backends."""

import logging
from typing import Any

from osa.domain.index.model.registry import IndexRegistry
from osa.domain.shared.model.srn import RecordSRN
from osa.domain.shared.service import Service

logger = logging.getLogger(__name__)


class IndexService(Service):
"""Projects records into configured index backends.
"""Service for index-related operations.

This service encapsulates the business logic for indexing that was previously
embedded in the ProjectNewRecordToIndexes listener. It can be called from
multiple entry points (event listeners, CLI commands, bulk operations).
Note: Direct indexing (index_record, flush_all) has been replaced by the
event-driven approach using FanOutToIndexBackends and IndexRecordBatch
listeners. This service is retained for query operations and future
index management commands.
"""

indexes: IndexRegistry

async def index_record(
self,
record_srn: RecordSRN,
metadata: dict[str, Any],
) -> None:
"""Index a record into all configured backends.
async def get_count(self, backend_name: str) -> int | None:
"""Get the document count for a specific backend.

Args:
record_srn: SRN of the record to index.
metadata: The record metadata to index.
backend_name: Name of the backend to query.

Note:
This method logs errors but does not raise exceptions for individual
backend failures, allowing indexing to continue for other backends.
Returns:
Document count, or None if backend not found.
"""
srn_str = str(record_srn)

for name, backend in self.indexes.items():
try:
await backend.ingest(srn_str, metadata)
logger.debug(f"Buffered {srn_str} for backend '{name}'")
except Exception as e:
logger.error(f"Failed to index {srn_str} into '{name}': {e}")

async def flush_all(self) -> None:
"""Flush all backends to ensure buffered records are persisted.

This should be called:
- When a source run's final chunk completes
- On application shutdown
- After bulk import operations
backend = self.indexes.get(backend_name)
if backend is None:
return None
return await backend.count()

async def check_health(self, backend_name: str) -> bool | None:
"""Check health of a specific backend.

Args:
backend_name: Name of the backend to check.

Returns:
True if healthy, False if unhealthy, None if backend not found.
"""
for name, backend in self.indexes.items():
try:
await backend.flush()
logger.info(f"Flushed index backend '{name}'")
except Exception as e:
logger.error(f"Failed to flush backend '{name}': {e}")
backend = self.indexes.get(backend_name)
if backend is None:
return None
return await backend.health()
18 changes: 18 additions & 0 deletions server/osa/domain/shared/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,21 @@ class ExternalServiceError(InfrastructureError):

class ConfigurationError(InfrastructureError):
"""System misconfiguration detected."""


# =============================================================================
# Event Processing Errors
# =============================================================================


class SkippedEventsError(Exception):
"""Raised when some events should be skipped (not failed, not delivered).

Used when a backend is intentionally removed and leftover events should
be marked as skipped for clean semantic separation from failures.
"""

def __init__(self, event_ids: list, reason: str) -> None:
self.event_ids = event_ids
self.reason = reason
super().__init__(reason)
Loading
Loading