Skip to content

feat(coordination): add cross-instance Coordinator for multi-instance consistency#2363

Open
magiccao wants to merge 3 commits into
volcengine:mainfrom
magiccao:feat/multi-instance-coordinator
Open

feat(coordination): add cross-instance Coordinator for multi-instance consistency#2363
magiccao wants to merge 3 commits into
volcengine:mainfrom
magiccao:feat/multi-instance-coordinator

Conversation

@magiccao
Copy link
Copy Markdown

@magiccao magiccao commented Jun 1, 2026

Description

Add a Coordinator abstraction that unifies several process-local singletons
(EmbeddingTaskTracker, RequestWaitTracker, SemanticQueue coalesce
versions, request stats) behind a shared backend, enabling consistent state
across multiple load-balanced server instances.

Distributed Fault-Tolerance Design

Single-instance deployments are unaffected (the default memory backend is a
thin wrapper over existing dict/lock patterns). For Redis-backed deployments, a
layered self-healing approach prevents queuefs event loss:

1. Owner-side completion poller (EmbeddingTaskTracker)

Under the distributed backend, the final decrement() may land on a
non-owner instance that holds neither the callback nor the event loop it
must run on. The registering instance launches a background poller that
watches the shared remaining counter in the Coordinator. When the counter
reaches zero — regardless of which instance decremented it — the owner
fires the completion callback locally. The poller also holds the deadline
for the distributed-only watchdog timeout, triggering on_timeout for
graceful self-healing when embedding tasks stall.

2. TTL refresh (_touch_reg)

All Coordinator keys carry a configurable TTL so that a crash or restart
does not leave orphaned state. Long-running tasks extend their key TTLs
via a throttled _touch_reg call (refresh interval = ttl_sec / 4)
to prevent premature expiry without generating unbounded Redis traffic.

3. Exponential backoff on transient coordinator errors

Both EmbeddingTaskTracker._watch_until_resolved and
RequestWaitTracker.wait_for_request absorb transient Redis errors and
retry with exponential backoff (0.25 s → 0.5 s → 1.0 s → 2.0 s cap) rather
than propagating exceptions to callers or tight-looping under failure.

4. Two-phase request wait with grace period (RequestWaitTracker)

wait_for_request splits into a processing gate (waits for pending sets to
empty) and a status retrieval phase. On transition to the status phase a
2-second grace window is added on top of any remaining deadline, ensuring
the final queue-status snapshot is readable even when the original timeout
is almost exhausted.

5. Graceful cleanup degradation

cleanup() under the distributed backend catches deletion errors and falls
back to TTL-based expiry with a warning log, avoiding cleanup failures from
blocking the request path.


Configuration

Coordination is configured under the storage.coordination key in ov.conf.
The default memory backend requires no additional configuration.

Redis backend

"storage": {
  "coordination": {
    "backend": "redis",
    "redis": {
      "dsn": "redis://redis:6379/0",
      "key_prefix": "ov:coord:",
      "ttl_sec": 3600
    },
    "embedding_completion_timeout_sec": 1800
  }
}

The redis.dsn field can also be omitted and supplied via the
OPENVIKING_COORD_DSN environment variable (preferred for credential
management).

Custom backend

Set backend to a fully-qualified dotted class path. The class must expose a
from_config(cfg: CoordinationConfig) -> Coordinator classmethod. Arbitrary
extra parameters can be passed through custom_params.

"storage": {
  "coordination": {
    "backend": "mycompany.coordination.CustomCoordinator",
    "custom_params": {
      "option_a": "value_a",
      "option_b": 42
    },
    "embedding_completion_timeout_sec": 1800
  }
}

Key options

Field Default Description
backend "memory" "memory", "redis", or dotted class path
redis.dsn env OPENVIKING_COORD_DSN Redis connection string
redis.key_prefix "ov:coord:" Key namespace for tenant/cluster isolation
redis.ttl_sec 3600 Key TTL; 0 disables expiry
embedding_completion_timeout_sec 1800 Distributed-only embedding watchdog (seconds)
custom_params {} Passed to from_config() for third-party backends

Related Issue

N/A

Type of Change

  • Bug fix (non-breaking change that fixes an issue)
  • New feature (non-breaking change that adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation update
  • Refactoring (no functional changes)
  • Performance improvement

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Jun 1, 2026

PR Reviewer Guide 🔍

(Review updated until commit ea14930)

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
🏅 Score: 90
🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ No major issues detected

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Jun 1, 2026

PR Code Suggestions ✨

No code suggestions found for the PR.

dongcao added 2 commits June 2, 2026 10:20
… consistency

Several process-local singletons (EmbeddingTaskTracker, RequestWaitTracker,
SemanticQueue coalesce versions, request stats) were backed by plain
dict + threading.Lock. On a single machine this is fine; across multiple
load-balanced instances the state silently diverges.

This PR introduces a Coordinator abstraction that unifies these behind a
small set of generic KV primitives:

- InProcessCoordinator (default): zero new dependencies, behaviour identical
  to the existing singletons. Single-machine deployments are unaffected.
- RedisCoordinator (opt-in): maps each primitive onto an atomic Redis command
  (INCRBY, SET NX EX, SADD, RPUSH, etc.) for cross-instance consistency.
  Requires `pip install 'openviking[coordination]'`.

Configuration (ov.conf):

    storage:
      coordination:
        backend: redis          # or "memory" (default)
        dsn: redis://host:6379  # or env OPENVIKING_COORD_DSN
        key_prefix: "ov:coord:"
        ttl_sec: 3600

Bug fix: atomic memory semantic dedupe (volcengine#769)
The previous get_int → check → set_int pattern in SemanticQueue was a
TOCTOU race: two instances could both read 0 and both enqueue the same
memory semantic task. Replaced with a single atomic set_if_absent()
(Redis SET NX EX; in-process monotonic-deadline check under lock).

Memory safety: amortized claim pruning
_claim_deadlines in InProcessCoordinator is swept in bulk once the map
exceeds a threshold, so one-shot mem_dedupe:* keys that are never
revisited cannot grow the map unboundedly.

Refactor: extract RequestQueueStats / RequestStatsAccumulator
The inline stats dict+lock duplicated across SemanticProcessor and
collection_schemas is unified into openviking/telemetry/request_queue_stats.py.
…d class path

- Introduce coordinator_factory.py with create_coordinator(config),
  extracting the routing logic from StorageConfig.build_coordinator().
  build_coordinator() is now a one-liner delegating to the factory,
  mirroring the vectordb_adapters/factory.py pattern.

- Expand CoordinationConfig:
  - backend: str now accepts a full dotted class path
    (e.g. mycompany.module.CredisCoordinator) in addition to the
    built-in "memory" and "redis" values, enabling custom coordinator
    plugins without modifying OpenViking source.
  - Nested redis sub-config (RedisCoordinationConfig) groups dsn,
    key_prefix, and ttl_sec under [storage.coordination.redis].
  - custom_params: Dict[str, Any] escape hatch passes arbitrary
    configuration to third-party from_config() implementations.

- RedisCoordinator.__init__ now accepts a pre-built client object in
  addition to a DSN string, so callers can inject any Redis-compatible
  client (e.g. a proprietary SDK) without subclassing.

- Add 20 config/factory tests in tests/test_coordinator_config.py
  and 3 client-injection tests in tests/misc/test_coordinator.py.
@magiccao magiccao force-pushed the feat/multi-instance-coordinator branch from 82d9a3a to ff57950 Compare June 2, 2026 05:21
@magiccao magiccao closed this Jun 2, 2026
@github-project-automation github-project-automation Bot moved this from Backlog to Done in OpenViking project Jun 2, 2026
… delivery

Prevent queuefs event loss in multi-instance / Redis coordinator scenarios
through a layered self-healing approach:

* EmbeddingTaskTracker: owner-side poller watches shared remaining counter in
  Coordinator so the completion callback fires even when the final decrement
  lands on a different instance; exponential-backoff retry on transient
  coordinator errors; TTL refresh keeps keys alive during long-running tasks;
  deadline-triggered on_timeout callback for graceful self-healing.

* RequestWaitTracker: two-phase wait (processing gate → status retrieval) with
  per-phase deadline; exponential backoff (0.25 s → 2 s cap) on coordinator
  errors; _touch_reg throttled to default_ttl_sec/4 to cap Redis call volume;
  cleanup() degrades gracefully to TTL-based expiry under distributed backend.

* Adds embedding_completion_timeout_sec config knob (default 1800 s) for the
  distributed-only watchdog timeout.

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
@magiccao magiccao reopened this Jun 2, 2026
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Jun 2, 2026

Persistent review updated to latest commit ea14930

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Jun 2, 2026

PR Code Suggestions ✨

No code suggestions found for the PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

1 participant