Skip to content

mooncake-store: cascade Scheduler.reset_connector_cache to remove_all(force=True)#46

Open
aoshen02 wants to merge 6 commits into
ivanium:feat/mooncake-store-intfrom
aoshen02:feat/mooncake-clear-hook
Open

mooncake-store: cascade Scheduler.reset_connector_cache to remove_all(force=True)#46
aoshen02 wants to merge 6 commits into
ivanium:feat/mooncake-store-intfrom
aoshen02:feat/mooncake-clear-hook

Conversation

@aoshen02
Copy link
Copy Markdown
Collaborator

Summary

When verl (or any other RL framework) calls engine.reset_prefix_cache(reset_connector=True) after a weight update, vLLM core already routes the request through Scheduler.reset_connector_cache() -> self.connector.reset_cache(). This patch wires MooncakeStoreConnector.reset_cache() so the cascade actually clears the Mooncake master, dropping all external KV blocks computed against the previous weights before any new rollout request can read them.

This is the RL-correct hard-reset path; without it, the external Mooncake store would silently serve KV computed against stale weights on the next rollout step.

Design

  • SCHEDULER role MooncakeStoreConnector.reset_cache() -> MooncakeStoreScheduler.reset_store() -> LookupKeyClient.reset() (ZMQ send RESET_MAGIC frame).
  • WORKER rank-0 LookupKeyServer recognizes the RESET_MAGIC (0xFFFFFFFF) discriminator frame and calls self.store_worker.store.remove_all(force=True) on the master.
  • The reset path reuses the existing scheduler<->worker rank 0 ZMQ admin channel originally built for prefix lookups, so no new sockets / handshake. A reserved sentinel value (unreachable for any real prompt length) discriminates RESET from a normal lookup so the wire format stays backward compatible.
  • WORKER role reset_cache() is a no-op: reset is driven from the scheduler side via the ZMQ admin channel.
  • Soft failure semantics: MooncakeStoreScheduler.reset_store() returns False (rather than raising) when the RPC errors out, matching the soft-dependency contract verl uses to keep training alive if Mooncake hiccups.

Comparison vs SGLang

SGLang this patch
trigger per-RPC opt-in flush_cache=True flag — forget it once -> silent stale-cache corruption automatic data-plane cascade via reset_prefix_cache(reset_connector=True) (no flag)
multi-rank each scheduler instance calls remove_all -> idempotent but N redundant RPCs one ZMQ RPC to rank-0 worker -> one RemoveAll per reset
guard consistency is_fully_idle() silently false on failure, no doc/test cascades only when the existing scheduler guard passes; covered by the new test_scheduler_reset_prefix_cache_cascades_only_on_success analog
RL-correctness client must remember flag hard-reset is the default path

Tests

6 new mock-based unit tests in tests/v1/kv_connector/unit/test_mooncake_store_connector.py:

  • test_reset_cache_scheduler_role_delegates_to_reset_store
  • test_reset_cache_scheduler_role_propagates_failure
  • test_reset_cache_worker_role_returns_none
  • test_scheduler_reset_store_returns_client_reset_result
  • test_scheduler_reset_store_handles_rpc_exception
  • test_lookup_key_client_reset_uses_magic_protocol

Run:

.venv/bin/python -m pytest tests/v1/kv_connector/unit/test_mooncake_store_connector.py -v -k "reset or lookup_key_client_reset"

Result: 6 new + 13 existing connector tests + 18 existing worker tests all pass (37/37).

End-to-end cascade verified inside the verl-bench-aoshen container against a per-run Mooncake master. Master Admin Metrics confirm the path traffic (smoke wrote 12 puts across 2 rounds + 3 exist queries; both DelAll RemoveAll calls landed):

PutStart:(Req=12/0/12, Item=12/12)
PutEnd:(Req=12/0/12, Item=12/12)
ExistKey:(Req=3/0/3, Item=18/18)
DelAll=2/2

Test plan

  • 6 new unit tests pass (mock MooncakeDistributedStore)
  • 31 existing mooncake store unit tests still pass (no regressions)
  • End-to-end cascade against real Mooncake master (in container) — connector.reset_cache() -> remove_all(force=True) confirmed via master Admin Metrics
  • Full verl GRPO 5-cycle smoke (Qwen3-0.6B / 1 tray) — recipe authored at recipe_aoshen/phase_b_mooncake.sh in the paired verl PR; deferred for a dedicated bench run

AI assistance

This PR was authored with the help of Claude Opus 4.7 (1M context). Each line was reviewed by me end-to-end; tests were verified locally on a GB200 host with the container verl-bench-aoshen. The cascade-design (re-using the existing LookupKey ZMQ channel with a RESET_MAGIC sentinel discriminator instead of opening a new admin socket) was chosen to minimize new surface area.

Duplicate-work check

No existing PR addresses MooncakeStoreConnector.reset_cache for the RL hard-reset path. KVConnectorBase_V1.reset_cache is the framework-level hook already shipped in vLLM 0.20.1; this PR is the connector-specific implementation.

…nector

When verl (or any RL framework) calls
`engine.reset_prefix_cache(reset_connector=True)` after a weight update,
vLLM core already routes it through
`Scheduler.reset_connector_cache() -> self.connector.reset_cache()`. This
patch wires `MooncakeStoreConnector.reset_cache()` so the cascade actually
clears the Mooncake master:

  SCHEDULER role: connector_scheduler.reset_store() ->
    LookupKeyClient.reset() over ZMQ ->
      LookupKeyServer recognizes RESET_MAGIC frame ->
        store_worker.store.remove_all(force=True) on the master

The reset path reuses the existing ZMQ admin channel between scheduler
and worker rank 0 (originally used for prefix lookups). A reserved
sentinel token_len (0xFFFFFFFF, unreachable for any real prompt length)
discriminates the RESET command from a normal lookup so the wire format
stays backward compatible.

6 new mock-based unit tests cover:
- SCHEDULER role delegates to reset_store
- failure propagation (False on store/RPC failure)
- WORKER role no-op
- LookupKeyClient.reset() uses RESET_MAGIC frame
- scheduler.reset_store handles RPC exception

End-to-end cascade verified inside the verl-bench-aoshen container
against a per-run Mooncake master (PutStart=12, DelAll=2 in master
Admin Metrics matching the smoke flow).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements a reset_cache mechanism for the Mooncake store connector, enabling global cache resets via a ZMQ-based administrative command. This is particularly important for RL workflows to prevent stale KV data from persisting after weight updates. The review feedback identifies several critical improvements: clearing pending KV events during a reset to prevent stale state, addressing a potential race condition where background threads might upload stale data post-reset, and adding a timeout to the blocking ZMQ receive call to prevent the engine from hanging if the worker becomes unresponsive.

Comment on lines +178 to +180
if self.role == KVConnectorRole.SCHEDULER:
assert self.connector_scheduler is not None
return self.connector_scheduler.reset_store()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When the cache is reset, any previously collected but not yet taken KV events (self._kv_cache_events) should be cleared. These events represent blocks that were successfully stored in Mooncake, but since the store is being cleared, these events are no longer valid and could lead to incorrect prefix matching or stale state in the consumer if they are processed after the reset.

Suggested change
if self.role == KVConnectorRole.SCHEDULER:
assert self.connector_scheduler is not None
return self.connector_scheduler.reset_store()
if self.role == KVConnectorRole.SCHEDULER:
assert self.connector_scheduler is not None
self._kv_cache_events = None
return self.connector_scheduler.reset_store()

Comment on lines +181 to +184
# Worker role is a no-op here: reset is driven via the scheduler-side
# ZMQ admin channel that lands on this worker's LookupKeyServer
# thread (worker rank 0), which calls `store.remove_all(force=True)`.
return None
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is a potential race condition between the global remove_all(force=True) call and the background KVCacheStoreSendingThread on all worker ranks. Since reset_cache is currently a no-op on the worker side and only triggers a reset via rank 0, other ranks (or even rank 0's background thread) might still be processing their request_queue and performing batch_put operations for stale KV data (computed with old weights) that land in the store after the reset has completed. This would lead to the silent stale-cache corruption that this PR aims to prevent. Consider implementing a mechanism to flush or clear the background sending queues on all workers before performing the global store reset.

Comment on lines +415 to +422
def reset(self) -> bool:
"""Trigger `store.remove_all(force=True)` on worker rank 0.

Returns True on success, False on failure.
"""
self.socket.send_multipart([RESET_MAGIC.to_bytes(4, "big")], copy=False)
resp = self.socket.recv()
return int.from_bytes(resp, "big") == RESET_MAGIC
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The self.socket.recv() call is blocking and lacks a timeout. If the worker rank 0 becomes unresponsive or the ZMQ connection is interrupted, the scheduler (and thus the entire vLLM engine) will hang indefinitely during a prefix cache reset. Given that this is an administrative/control-plane operation, it should have a reasonable timeout and handle the failure gracefully. Note that setting RCVTIMEO on this shared socket will also affect the lookup method, which is likely desirable for robustness.

Suggested change
def reset(self) -> bool:
"""Trigger `store.remove_all(force=True)` on worker rank 0.
Returns True on success, False on failure.
"""
self.socket.send_multipart([RESET_MAGIC.to_bytes(4, "big")], copy=False)
resp = self.socket.recv()
return int.from_bytes(resp, "big") == RESET_MAGIC
def reset(self) -> bool:
"""Trigger `store.remove_all(force=True)` on worker rank 0.
Returns True on success, False on failure.
"""
self.socket.setsockopt(zmq.RCVTIMEO, 10000)
try:
self.socket.send_multipart([RESET_MAGIC.to_bytes(4, "big")], copy=False)
resp = self.socket.recv()
return int.from_bytes(resp, "big") == RESET_MAGIC
except zmq.Again:
return False

aoshen524 and others added 3 commits May 11, 2026 11:49
test_scheduler_reset_connector_cache_invokes_connector_reset mirrors
the exact call shape from Scheduler.reset_connector_cache
(scheduler.py:1917-1929) — the only relevant lines are

    if self.connector.reset_cache() is False:
        return False
    return True

so a small _StubScheduler exercises the cascade with a real (SCHEDULER
role) MooncakeStoreConnector instance, without dragging in the heavy
KVCacheManager fixtures. Covers both branches (store reset OK -> True,
store reset failed -> False).

This closes the upstream-wiring gap: previously the connector.reset_cache
contract was only mocked; this test confirms the cascade routing through
Scheduler.reset_connector_cache works with the real MooncakeStoreConnector
class.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace the numeric RESET_MAGIC=0xFFFFFFFF sentinel that aliased the
token_len field with a self-describing tag scheme on the LookupKey
ZMQ admin channel:

  - new vllm/.../mooncake/protocol.py is the single source of truth
    for LOOKUP_MSG / RESET_MSG request tags and RESP_OK / RESP_ERR
    response status bytes (mirrors the convention used by the NIXL
    connector, see kv_connector/v1/nixl/metadata.py)
  - LookupKeyServer.process_request dispatches on the named tag at
    frame 0 instead of comparing the token_len field to 0xFFFFFFFF
  - LookupKeyClient.lookup() prepends LOOKUP_MSG to its request frames
  - LookupKeyClient.reset() sends RESET_MSG and parses RESP_OK/RESP_ERR
  - the duplicated RESET_MAGIC constant (previously in both worker.py
    and scheduler.py, kept in sync only by comment) is gone

Why: a numeric sentinel in the data field is fragile (silently breaks
if the constants drift; second admin command needs another magic) and
inconsistent with how the rest of the codebase models RPC envelopes.
The named-tag protocol is extensible (new admin commands just need a
new tag and a new dispatch branch) and self-documenting.

Tests: 20/20 mooncake_store_connector tests pass (was 19/19); the
test_lookup_key_client_reset_uses_admin_protocol case asserts the
new wire format end to end.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When ``AsyncLLM.pause_generation(clear_cache=True)`` runs through
``EngineCore._reset_caches``, the existing
``Scheduler.reset_prefix_cache(reset_connector=...)`` was called with
the default ``reset_connector=False``. That meant RL post-training
callers wanting external KV stores (e.g. ``MooncakeStoreConnector``)
wiped after a weight update had to issue a second
``engine.reset_prefix_cache(reset_connector=True)`` call afterwards,
or silently serve KV computed against the previous policy.

Make ``_reset_caches`` always request a connector reset. ``clear_cache``
already represents the caller's intent to invalidate caches; the
external KV store is part of that cache hierarchy and shouldn't be
opted-in to separately. ``Scheduler.reset_prefix_cache`` already
gracefully handles the no-connector case (warn + return False), so
this is a no-op for engines without a configured KV connector.

Callers that genuinely want a local-only invalidation can still call
``scheduler.reset_prefix_cache(reset_connector=False)`` directly,
bypassing the pause cascade.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@aoshen02 aoshen02 force-pushed the feat/mooncake-clear-hook branch from 248f3c2 to b38d7b5 Compare May 11, 2026 12:49
Previously Scheduler.reset_connector_cache returned False when no
KV connector was attached, which made
`reset_prefix_cache(reset_connector=True)` flip its return value to
False purely because the caller hadn't configured a connector
(due to `reset_successful = self.reset_connector_cache() and
reset_successful`).

This forced callers (notably verl's RL rollout adapter, which always
wants Mooncake-store to be cleared on weight update *if* it is
attached) to gate the flag with their own "is a KV store enabled?"
boolean. Demote the no-connector case to a debug log + return True
so callers can pass reset_connector=True unconditionally; the
configuration site (engine init) is what decides whether a connector
exists.

No behavior change when a connector is attached: success / failure
of `connector.reset_cache()` is still propagated normally.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Owner

@ivanium ivanium left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left a minor comment mainly for splitting core changes into a separate PR to oss vllm. Also worthwhile to check gemini's review

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel these core changes can be a separate PR to oss vllm

Comment thread vllm/v1/engine/core.py Outdated
# directly instead of going through this cascade.
self.reset_prefix_cache(
reset_running_requests=reset_running_requests,
reset_connector=True,
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we expose this rest_connector as part of the interface?

Address review feedback on PR vllm-project#46: expose reset_connector as a kwarg
on EngineCore._reset_caches instead of hardcoding True inside the
method body. Default is True, so all existing callers (pause_scheduler
cascade) keep the previous behavior; future internal callers that
want a local-only invalidation can pass reset_connector=False without
touching the cascade.

Rationale:
  - matches the sibling reset_running_requests parameter (parameterized
    on the same helper)
  - keeps reset_connector hidden from the user-facing AsyncLLM.pause_
    generation surface, so RL callers still get the "clear means clear
    everything" semantics with one call (no opt-in flag they could
    forget). This intentionally rejects threading the kwarg all the
    way to pause_generation: the user-facing API was the footgun-prone
    layer; the internal helper is not.
  - eases upstreaming this change later: a parameterized helper with
    True default is more conservative than a hardcoded constant.

Reviewer: ivanium (Yifan)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

Signed-off-by: aoshen524 <aoshen524@gmail.com>
wuxibin89 pushed a commit to verl-project/verl that referenced this pull request May 27, 2026
…ht update (#6373)

### What does this PR do?

Adds `actor_rollout_ref.rollout.kv_store` configuration (`enable`,
`kv_connector`, `kv_role`, `config_path`, `extra_config`, `on_failure`)
and wires the vLLM rollout engine launch with `--kv-transfer-config` so
an external `MooncakeStoreConnector` can pool prefix KV across rollout
replicas.

Propagates `reset_connector=True` on every prefix-cache reset path
(`wake_up`, `clear_kv_cache`, `abort_all_requests` via
`pause_generation`) when `kv_store.enable=true`, so the external
Mooncake master is cleared via `store.remove_all(force=True)` on every
weight update. This is the **RL-correct hard-reset path**: external KV
blocks computed against the previous model weights are dropped before
any new rollout request can read them — matching the in-engine
prefix-cache invalidation that verl already drives via
`abort_all_requests` + `reset_prefix_cache`.

Adds `docs/perf/rollout_kv_offload.md` (under "Performance Tuning Guide"
toctree) documenting when to enable, the full reset cascade, config
schema, env vars, operational notes, and a comparison vs SGLang's
HiCacheStorage flow.

#### Why this matters

verl's existing `update_weights` flow already does the right thing for
vLLM's in-engine prefix cache: `abort_all_requests` drains in-flight,
sleep + NCCL sync the weights, then `wake_up` calls
`engine.reset_prefix_cache()`. The guard in
`BlockPool.reset_prefix_cache` wipes the in-memory hash table because
abort drained everything.

What it does **not** do (yet) is reset the external Mooncake KV store.
Without this PR, the next request's
`MooncakeStoreScheduler.get_num_new_matched_tokens()` would happily
query Mooncake, find a prefix that the previous-weight rollout wrote
there, load it, and let the new weights attend to KV computed against
the old policy — silent correctness loss.

The vLLM-side companion patch implements
`MooncakeStoreConnector.reset_cache()` (paired PR; routes via the
existing ZMQ admin channel to worker rank 0 →
`store.remove_all(force=True)`). This verl-side PR is the wire that
triggers it.

#### Paired vLLM PR

Depends on `MooncakeStoreConnector.reset_cache()` landing in vLLM.
Companion PR: ivanium/vllm#46 (against
`feat/mooncake-store-int` → upstream vllm-project/vllm).

#### Soft-dependency contract

`KVStoreConfig.on_failure` defaults to `"fallback"`: if the Mooncake
master is unreachable when the engine launches, training keeps running
with the connector disabled (warning logged). Set to `"crash"` for
stricter CI use.

### Checklist Before Starting

- [x] Searched for similar PRs:
- `gh pr list --repo verl-project/verl --search "mooncake in:title"
--state all` — only #5176, #5500 (ckpt backend) and #6243 (PD disagg,
different connector). No existing rollout `MooncakeStoreConnector` PR.
- `gh pr list --repo verl-project/verl --search "MooncakeStoreConnector
OR mooncake_store" --state all` — none.
- [x] PR title follows `[rollout] feat: ...` (CI-checked format).

### Test

- **Cascade integration** verified inside the production rollout
container with the paired vLLM patch applied: `connector.reset_cache()`
→ ZMQ admin RPC → `store.remove_all(force=True)` on a per-run Mooncake
master. Master Admin Metrics confirmed `DelAll=2/2` matching the smoke
flow (`PutStart:(Req=12/0/12, Item=12/12)`, `ExistKey:(Req=3/0/3,
Item=18/18)`).
- **Standalone Mooncake `remove_all(force=True)` validation** (put 8
keys → exist → remove_all → exist=0 → re-put → exist=1) passed against
`ivanium/Mooncake yifan/dev` build on aarch64 GB200.
- **Lookup-path regression smoke** (5 cases incl. lookup-after-reset)
verifies the new `RESET_MAGIC` discriminator did not break the existing
ZMQ lookup wire format.
- **Static syntax-check** on the modified verl files via `py_compile`.
- **Config dataclass smoke**: `RolloutConfig(name='vllm')` →
`cfg.kv_store.enable=False`, `.kv_connector="MooncakeStoreConnector"`,
`.get('enable', False)=False` verified against the patched dataclass
(importlib overlay test inside the container's verl env).
- Paired vLLM unit tests: 7/7 reset tests, 37/37 mooncake-store tests
overall.
- ⏳ Full GRPO 5-cycle bench run with `kv_store.enable=true` — needs a
free Ray cluster + ~1 hour bench time; the new doc page lists the
expected acceptance criteria. Will post results once the cluster slot
opens.

### API and Usage Example

```yaml
# config snippet (Hydra)
actor_rollout_ref:
  rollout:
    name: vllm
    kv_store:
      enable: true
      kv_connector: MooncakeStoreConnector
      kv_role: kv_both
      config_path: ${oc.env:MOONCAKE_CONFIG_PATH}
      on_failure: fallback   # or "crash" for strict CI
```

```bash
# launch a per-run Mooncake master alongside verl
export MOONCAKE_CONFIG_PATH=/path/to/mooncake_config.json
bash projects/mooncake-integration/scripts/start_master.sh
# run verl with the config above; engine launch attaches
# --kv-transfer-config '{"kv_connector":"MooncakeStoreConnector","kv_role":"kv_both"}'
```

See `docs/perf/rollout_kv_offload.md` for the full reset cascade, env
vars, and operational notes.

### Design & Code Changes

- `verl/workers/config/rollout.py` — new `KVStoreConfig` dataclass
attached to `RolloutConfig.kv_store`. Six fields (`enable`,
`kv_connector`, `kv_role`, `config_path`, `extra_config`, `on_failure`);
supports `cfg.kv_store.get("enable", False)` mapping access.
- `verl/workers/rollout/vllm_rollout/vllm_async_server.py` — at engine
launch, if `kv_store.enable`, append `--kv-transfer-config '{...}'` to
vllm serve args (`extra_config` is shallow-merged so users can override
knobs). Three reset call sites (`wake_up`, `clear_kv_cache`,
`pause_generation` invoked by `abort_all_requests`) now pass
`reset_connector=True` unconditionally; the paired vLLM patch makes that
a no-op when no connector is attached.
- `docs/perf/rollout_kv_offload.md` + `docs/index.rst` — new doc page
wired into "Performance Tuning Guide" toctree.

Total diff: 4 files, +254 / −3.

### Checklist Before Submitting

- [x] Read the [Contribute
Guide](https://github.com/verl-project/verl/blob/main/CONTRIBUTING.md).
- [x] Pre-commit checks run locally on the touched files.
- [x] Added documentation (`docs/perf/rollout_kv_offload.md`).
- [ ] Add unit/E2E test in CI — not feasible in this PR because the test
requires a running Mooncake master process; the verl CI image does not
bundle one. The standalone Mooncake `remove_all` validation and the
in-container cascade smoke are documented in the "Test" section. Happy
to add a mocked unit test on a follow-up if reviewers want one.
- [ ] Recipe submodule update — not applicable (no recipe submodule
changes).

### AI assistance disclosure

This PR was authored with the help of Claude Opus 4.7 (1M context).
Every line was reviewed end-to-end by @aoshen524. The config schema +
flag-propagation strategy was deliberately kept minimal (one new
dataclass, three call-site `reset_connector=True` arguments, one CLI
flag for `vllm serve`, one doc page).

### Duplicate-work check

No existing verl PR adds `MooncakeStoreConnector` rollout support. The
related PD-disaggregation work on `feat/verl-vllm-pd-disagg` (PR #6243)
addresses a different connector (`MooncakeConnector` for PD transport,
not `MooncakeStoreConnector` for offload pool) and a different lifecycle
(per-request handshake vs per-update reset). The two could be combined
in a `MultiConnector` config in a follow-up PR.

---------

Signed-off-by: aoshen <aoshen@inferact.ai>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants