-
-
Notifications
You must be signed in to change notification settings - Fork 63
feat(apply): Add delayed shutdown mechanism and fix termination #360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(apply): Add delayed shutdown mechanism and fix termination #360
Conversation
Pull Request Test Coverage Report for Build 17762933433Details
💛 - Coveralls |
@codex review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Codex Review: Here are some suggestions.
Reply with @codex fix comments
to fix any unresolved comments.
About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you open a pull request for review, mark a draft as ready, or comment "@codex review". If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex fix this CI failure" or "@codex address that feedback".
etl/src/concurrency/timer.rs
Outdated
impl Future for DeferredTimer { | ||
type Output = (); | ||
|
||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[P0] Import Future trait for DeferredTimer
The new DeferredTimer
implements impl Future for DeferredTimer
but Future
is never brought into scope, so the module does not compile (cannot find trait
Future in this scope
). Add use std::future::Future;
(or fully qualify the trait) before implementing it so the delayed shutdown code can build.
Useful? React with 👍 / 👎.
etl/src/replication/apply.rs
Outdated
let mut continue_loop = hook.process_syncing_tables(end_lsn, false).await?; | ||
|
||
// If we are in delayed shutdown, and we are processing a `COMMIT` event it means that the transaction | ||
// was finished, so we want to end the apply loop. | ||
if state.delayed_shutdown { | ||
continue_loop = false; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[P1] Delayed shutdown never exits on transaction boundary
Setting continue_loop = false
in the COMMIT handler when state.delayed_shutdown
is true is intended to stop the apply loop once the in‑flight transaction completes, but this value is discarded by the caller. handle_replication_message_with_timeout
builds its own continue_loop
from batching and synchronization results and never consults the flag from handle_commit_message
, so the loop keeps processing new events until the force‑shutdown timer expires. As a result, shutdowns requested mid‑transaction are always forced after the timeout and the promised graceful exit after the next boundary never occurs.
Useful? React with 👍 / 👎.
@codex review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Codex Review: Here are some suggestions.
Reply with @codex fix comments
to fix any unresolved comments.
About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you open a pull request for review, mark a draft as ready, or comment "@codex review". If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex fix this CI failure" or "@codex address that feedback".
etl/src/replication/apply.rs
Outdated
// If discarded the shutdown, and we are processing a `COMMIT` event it means that the transaction | ||
// was finished, so we want to end the apply loop. | ||
if state.shutdown_discarded { | ||
continue_loop = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[P1] Propagate delayed shutdown after commit
When a shutdown signal is received mid-transaction, handle_commit_message
sets continue_loop = false
based on state.shutdown_discarded
, but that flag is only used to choose between return_boundary_event
and finish_batch_and_return_boundary_event
. handle_replication_message_with_timeout
ignores the reason the batch was flushed and always returns true
unless the hook reports a stop, so the apply loop continues to process new messages even though a shutdown was requested and the transaction finished. As a result the delayed shutdown never actually terminates the loop, defeating the feature’s intent.
Useful? React with 👍 / 👎.
@codex review |
Codex Review: Didn't find any major issues. What shall we delve into next? About Codex in GitHubYour team has set up Codex to review pull requests in this repo. Reviews are triggered when you open a pull request for review, mark a draft as ready, or comment "@codex review". If Codex has suggestions, it will comment; otherwise it will react with 👍. Codex can also answer questions or update the PR. Try commenting "@codex fix this CI failure" or "@codex address that feedback". |
@codex review |
Codex Review: Didn't find any major issues. Already looking forward to the next diff. About Codex in GitHubYour team has set up Codex to review pull requests in this repo. Reviews are triggered when you open a pull request for review, mark a draft as ready, or comment "@codex review". If Codex has suggestions, it will comment; otherwise it will react with 👍. Codex can also answer questions or update the PR. Try commenting "@codex fix this CI failure" or "@codex address that feedback". |
This PR introduces two key improvements to the apply loop:
Delayed shutdown when a transaction is in progress
Distinction between stopping and completing the apply loop