[FLINK-30068][runtime] Add configurable commit failure strategy for Sink V2#4
Closed
[FLINK-30068][runtime] Add configurable commit failure strategy for Sink V2#4
Conversation
869721d to
c405937
Compare
…ink V2 When recovering from a checkpoint/savepoint, the CommitterOperator replays uncommitted transactions. If a transaction has expired or the producer ID mapping is lost, the commit fails with signalFailedWithUnknownReason() which throws unconditionally, causing an infinite restart loop with no recovery path. This adds a CommitFailureStrategy enum (FAIL/WARN) and a new config option sink.committer.failure-strategy that controls whether unknown commit failures throw (default, preserving current behavior) or log a warning and skip the committable, allowing recovery to proceed.
c405937 to
f582a30
Compare
nateab
added a commit
that referenced
this pull request
Feb 14, 2026
Fixes applied: - #2: Nexus API uses profile-specific endpoints, correct XML payloads - #3: JIRA auth added to all API calls including reads - #4: Dropped FLIP framing, reframed as dev@ discussion / normal PR - #5: URL encoding uses stdin pipe (fixes single-quote breakage in JQL) - apache#6: Uses perl -pi -e instead of sed -i (macOS portability) - apache#7: Added set -o pipefail to all scripts - apache#8: Glob expansion handled directly, not through run_cmd wrapper - apache#9: require_var moved inside subcommands, not at top level - apache#11: Removed shared releasing_utils.sh, each script is self-contained - apache#13: Removed check-pypi-space (PyPI API doesn't expose quotas) - apache#15: Backtick command substitution matching existing conventions - apache#17: Added set -o xtrace for auditability (with noted exceptions)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
CommitFailureStrategyenum (FAIL/WARN) and config optionsink.committer.failure-strategyWARN,signalFailedWithUnknownReason()logs a warning and skips the committable instead of throwing, allowing recovery from expired transactionsFAIL, preserving existing behavior exactlyMotivation
When a Flink job recovers from a checkpoint/savepoint, the
CommitterOperatorreplays uncommitted transactions. If a Kafka transaction has expired or the producer ID mapping is lost (e.g.,InvalidPidMappingException), the commit fails and the job enters an infinite restart loop with no way to recover -- even from earlier savepoints.The
CommitRequestImplcode already had TODO comments noting: "let the user configure a strategy for failing and apply it here". This PR implements that configurability.Changes
CommitFailureStrategy.java(new)@PublicEvolvingenum withFAILandWARNSinkOptions.javaCOMMITTER_FAILURE_STRATEGYconfig optionCommitRequestImpl.javasignalFailedWithUnknownReasonrespects WARNCheckpointCommittableManager.javacommit()with strategy param (default delegates with FAIL)CheckpointCommittableManagerImpl.javaCommitterOperator.javaGlobalCommitterOperator.javaTest plan
CommitRequestImplTest- unit tests for FAIL/WARN/default strategy behaviorCheckpointCommittableManagerImplTest- WARN skips failures, FAIL throwsSinkV2CommitterOperatorTest- operator-level WARN/FAIL tests + recovery scenario