[controller][admin-tool][compat] Implement ingestion pause mode with region filter#2754
Conversation
Activate ingestionPauseMode (v42/v97) schemas and implement the full controller-side wiring: IngestionPauseMode enum (NOT_PAUSED, NON_CURRENT_VERSION, ALL_VERSIONS), Store/ZKStore/ReadOnlyStore metadata, UpdateStoreQueryParams, admin tool CLI flags (--ingestion-pause-mode, --ingestion-paused-regions), AdminExecutionTask admin channel handling, VeniceHelixAdmin persistence (auto-clears regions on NOT_PAUSED), and version creation guard in CreateVersion that blocks pushes while paused. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Implements controller-side support for a store-level ingestion pause mode (with optional region scoping) and exposes the new settings via the admin API/CLI, while also guarding version creation when ingestion is paused.
Changes:
- Activates AdminOperation v97 and StoreMetaValue v42 and wires new pause fields through store metadata, controller admin message consumption, and controller API query params.
- Adds
IngestionPauseModeplus store getters/setters and propagation intoStoreInfo/ cloning code paths. - Adds a version-creation guard in
CreateVersionand corresponding unit tests and CLI flags.
Reviewed changes
Copilot reviewed 19 out of 20 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| services/venice-controller/src/test/java/com/linkedin/venice/controller/server/CreateVersionTest.java | Adds tests for push rejection/allowance based on pause mode. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/server/CreateVersion.java | Blocks new version creation when store ingestion is paused. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java | Maps new pause fields from UPDATE_STORE admin messages into UpdateStoreQueryParams. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java | Applies pause mode + region filter in internalUpdateStore (parent stores full state; child applies region filter). |
| internal/venice-common/src/test/java/com/linkedin/venice/meta/TestZKStore.java | Tests default/set/get/clone behavior for the new pause fields. |
| internal/venice-common/src/test/java/com/linkedin/venice/meta/IngestionPauseModeTest.java | Adds unit tests for enum mapping/validation. |
| internal/venice-common/src/test/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParamsTest.java | Tests query param round-trip for pause mode + paused regions. |
| internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java | Bumps pinned protocol versions to activate v97/v42. |
| internal/venice-common/src/main/java/com/linkedin/venice/meta/ZKStore.java | Persists/reads the new pause fields in ZK-backed store properties. |
| internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java | Exposes new getters and blocks setters for system stores. |
| internal/venice-common/src/main/java/com/linkedin/venice/meta/StoreInfo.java | Includes pause fields in controller client-facing store representation. |
| internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java | Adds pause mode + region list to the Store interface. |
| internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java | Adds pause fields to property cloning and read-only delegation. |
| internal/venice-common/src/main/java/com/linkedin/venice/meta/IngestionPauseMode.java | Introduces the new enum and int mapping. |
| internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java | Adds query param setters/getters for pause mode + paused regions. |
| internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java | Adds API param keys for pause mode + paused regions. |
| clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java | Allows new args for --update-store. |
| clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java | Adds CLI flags for pause mode + paused regions. |
| clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java | Parses new CLI flags into UpdateStoreQueryParams. |
| build.gradle | Removes schema version overrides to enable the new Avro versions. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Fix NPE on null ingestionPausedRegions in admin channel deserialization - Populate both fields in VeniceParentHelixAdmin UpdateStore construction - Validate that --ingestion-paused-regions requires --ingestion-pause-mode - Normalize paused regions (trim whitespace, drop empty tokens) - Use VeniceHttpException with 409 status for pause-related rejections - Null-safe assertions in CreateVersionTest - Make IngestionPauseMode.value final - Initialize ingestionPausedRegions in AdminOperationSerializerTest to avoid NPE - Add integration test TestIngestionPauseMode (e2e with real cluster) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…d global Parent controller persists the full pause state and blocks pushes whenever any region has a pause configured. Child controllers only apply the pause mode to their local ZK if their own region is in the paused regions list (or the list is empty, meaning all regions), while still persisting the regions list for observability. Rewrites the integration test to use a 2-region setup so the parent-vs-child filtering behavior can be verified end-to-end. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Tests that build an UpdateStore message via Avro's getNewInstance() or 'new UpdateStore()' and then call adminOperationSerializer.serialize() must explicitly initialize list fields like ingestionPausedRegions, because Avro-generated constructors don't honor the schema's default value. Add the initialization alongside the existing ones for storeLifecycleHooks and keyUrnFields. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 26 out of 27 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Review fixes: - VeniceParentHelixAdmin: auto-clear regions list on resume (NOT_PAUSED) so stale region data doesn't leak past the resume - AdminExecutionTask: normalize regions to empty list when admin message mode is NOT_PAUSED to defend against stale region data from producers - VeniceHelixAdmin: use VeniceHttpException (400 BAD_REQUEST) instead of plain VeniceException (500) for invalid --ingestion-paused-regions usage CI fixes: - TestIngestionPauseMode: remove dead dc1 variable flagged by spotbugs - CreateVersionTest: add empty-push guard unit tests to raise diff coverage above 45% threshold Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The previous version attempted a new push immediately after the prior push returned a version number, but in multi-region setups the push needs time to complete before a subsequent push can start. Add waitForNonDeterministicPushCompletion after push-1 and bump the test timeout to 180s. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 26 out of 27 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 26 out of 27 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
sixpluszero
left a comment
There was a problem hiding this comment.
lgtm. just add the [compact] to follow our rollout protocol
- incrementVersionIdempotent: throw VeniceHttpException(409, BAD_REQUEST) instead of plain VeniceException so clients see an actionable 4xx - UpdateStoreQueryParams: extract normalizeRegions() helper, apply it on both set and get (symmetric) and in the admin-tool CLI parsing so all entry points share the same canonicalization - CreateVersionTest: rename testEmptyPushAllowedWhenIngestionPausedIsGlobal to testEmptyPushBlockedWhenIngestionPausedGlobally to match behavior Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The previous guards in CreateVersion.handleRequestTopicForPushing and CreateVersion.emptyPush blocked ALL push types, including nearline STREAM writers (Samza jobs, Venice System Producers) that call those endpoints on startup/restart just to discover the RT topic and Kafka brokers. That caused writers to fail with 409 during a server-side pause, even though writing to RT is harmless (data accumulates in Kafka; paused servers catch up on resume). Move the single guard to VeniceParentHelixAdmin.incrementVersionIdempotent — the user-facing parent-only method that actually creates new versions. Co-located with the existing concurrent-push check. Only BATCH / INCREMENTAL / STREAM_REPROCESSING go through this path; nearline STREAM writers don't. Also drop the redundant guard from VeniceHelixAdmin.incrementVersionIdempotent (the child-side override) — the parent is the single entry point for user-initiated pushes. Replace the CreateVersionTest pause-related unit tests with one in TestVeniceParentHelixAdmin exercising the new guard location. The existing multi-region integration test already covers the end-to-end behavior. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 26 out of 27 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
The shared Store mock uses RETURNS_DEEP_STUBS, which auto-generates mock return values for unstubbed methods — including the new getIngestionPauseMode() (returns a mock enum instance, not null) and getIngestionPausedRegions() (returns a mock list). The mock enum is non-null and != NOT_PAUSED, so the pause guard in VeniceParentHelixAdmin.incrementVersionIdempotent fires and breaks every test that goes through that path (testIdempotentIncrementVersionWhenNoPreviousTopics, testAdminMessageIsolation, testIncrementVersionDeferredSwapIgnoredForSystemStore, etc.). Explicitly stub the pause-mode getters to safe defaults in the shared setup so the guard short-circuits. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The inline ternary in handleSetStore had four branches (NOT_PAUSED short circuit, null-regions short circuit, and their falls-through), only one of which was exercised by the AdminConsumptionTaskTest.testSetStore baseline. Diff coverage dropped to 25% for the controller module after the earlier refactor removed the other test paths. Extract the normalization into a static package-private helper (resolvePausedRegions) and add three targeted unit tests that hit each distinct branch: - NOT_PAUSED normalizes to empty (even if a stale regions list is present) - null regions list normalizes to empty (defensive against older messages) - paused + explicit regions converts CharSequence to String Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 28 out of 29 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Phase 2 of the ingestion pause feature: server-side and DaVinci SITs now react to the store-level IngestionPauseMode (set in Phase 1, linkedin#2754) by fully unsubscribing their Kafka consumers when paused, and resubscribing from the persisted offset on resume. Mechanism: - LeaderFollowerStoreIngestionTask.maybeTransitionPauseState reconciles each PCS's pause state every checkLongRunningTaskState iteration. On enter: consumerUnSubscribeAllTopics(pcs) (covers leader topic across all Kafka clusters for AA/NR). On exit: resubscribe(pcs), reusing the existing leader/follower-aware resubscribe machinery. - StoreIngestionTask.shouldPauseForStore is the static decision helper: NOT_PAUSED/null -> false; ALL_VERSIONS -> true; CURRENT_VERSION -> true only when sitVersionNumber == store.getCurrentVersion(). Applies uniformly to both Venice servers and DaVinci clients. - Restart-while-paused: validateAndSubscribePartition unsubscribes immediately after subscribing if the store is already paused. Blob transfer is allowed to run, so the replica still comes up populated. - PartitionConsumptionState.storeLevelPaused flag is kept for observability and to make the disk-quota pauseConsumption / resumeConsumption callbacks no-op (so quota and store-level pause don't fight each other). - store_level_paused_gauge metric exposed via IngestionStats / AggVersionedIngestionStats. Tests: - StoreIngestionTaskShouldPauseTest: 4 cases for the static decision helper. - PartitionConsumptionStateTest: storeLevelPaused default + setter. - TestServerIngestionPauseResume integration test: testRealTimeConsumptionPausesAndResumesOnServer verifies global pause stops ingestion and resume catches up; testRegionScopedPauseOnlyAffectsTargetedRegion verifies the region filter from Phase 1 isolates the pause to listed regions only.
Problem Statement
Venice lacks the ability to pause and resume Kafka ingestion for a specific hybrid store. Operators need a way to stop all consumption quickly during incidents and resume it later without losing progress. Additionally, pausing should support region-scoping so operators can target specific fabrics without affecting healthy ones.
Solution
Activates the v42/v97 Avro schemas (added in #2749) and implements the full controller-side wiring for ingestion pause mode.
New enum
IngestionPauseMode:NOT_PAUSED(0)— default, all ingestion runs normallyCURRENT_VERSION(1)— pause the current serving version's RT consumption onlyALL_VERSIONS(2)— pause ingestion for all versionsRegion filter (
ingestionPausedRegions):["prod-lor1"]) = pause applies only to listed regionsVersion creation guard:
ingestionPauseMode != NOT_PAUSED, regardless of region scopeCLI usage:
Note: this PR wires the config and blocks version creation. The server-side SIT pause/resume (Kafka consumer
pause()/resume()) is a follow-up.Code changes
Concurrency-Specific Checks
internalUpdateStore.)createStoreWriteLockinupdateStore.)How was this PR tested?
IngestionPauseModeTest— 4 tests for enum round-trip and validation.)TestZKStore— 5 tests: defaults, set/get, clone preserves fieldsUpdateStoreQueryParamsTest— 3 tests: mode + regions round-tripCreateVersionTest— 4 tests: push rejected (global ALL_VERSIONS, global CURRENT_VERSION, region-scoped), push allowed (NOT_PAUSED)Does this PR introduce any user-facing or breaking changes?
--update-storeflags (--ingestion-pause-mode,--ingestion-paused-regions) and blocks version creation when a store's ingestion is paused. No existing behavior changes for stores that don't use the new flags (default isNOT_PAUSED).