Skip to content

pkg/eventservice: use checkpointTs as dispatcher runtime lower bound#4448

Closed
3AceShowHand wants to merge 4 commits into
pingcap:masterfrom
3AceShowHand:fix-event-store-panic
Closed

pkg/eventservice: use checkpointTs as dispatcher runtime lower bound#4448
3AceShowHand wants to merge 4 commits into
pingcap:masterfrom
3AceShowHand:fix-event-store-panic

Conversation

@3AceShowHand
Copy link
Copy Markdown
Collaborator

@3AceShowHand 3AceShowHand commented Mar 12, 2026

What problem does this PR solve?

Issue Number: close #4492

event service dispatcher runtime state kept both startTs and checkpointTs
as lower-bound candidates. After a reset, heartbeat could advance checkpoint,
but handshake and the next scan could still read the stale runtime startTs.
That may reopen a range below the effective event store checkpoint and trigger:

dataRange startTs is smaller than subscriptionStat checkpointTs, it should not happen

What is changed and how it works?

  • remove runtime startTs from pkg/eventservice/dispatcherStat
  • use checkpointTs as the single runtime lower bound in event service
  • use checkpointTs when sending handshake events
  • reset post-reset lower-bound state from the clamped checkpoint value
  • add a regression test that follows the failure order from the captured logs:
    reset -> heartbeat advances checkpoint -> second reset uses resolvedTs - 1
  • add a Chinese analysis document summarizing the root cause and why using
    checkpointTs as the only runtime lower bound is the correct fix

Check List

Tests

  • Unit test

Questions

Will it cause performance regression or break compatibility?

No. The change only removes stale runtime state in event service and makes
reset/handshake/scan lower-bound handling consistent with the checkpoint
already enforced by event store.

Do you need to update user documentation, design documentation or monitoring documentation?

Yes. Added an internal design/analysis note describing the panic sequence and
the rationale for removing runtime startTs.

Release note

Fix event service dispatcher reset to use checkpointTs as the runtime lower bound and avoid scanning below the event store checkpoint after reset.

@ti-chi-bot ti-chi-bot Bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Mar 12, 2026
@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented Mar 12, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign flowbehappy for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@3AceShowHand 3AceShowHand changed the title [DNM] add more logs to debug event store panic [DNM] add more logs to debug event store Mar 12, 2026
@gemini-code-assist
Copy link
Copy Markdown

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request focuses on improving the system's debuggability by strategically adding more detailed logging statements throughout the event processing and dispatching pipeline. The changes aim to capture richer contextual information, such as various timestamps and state values, at critical points, which will be invaluable for diagnosing and resolving panics or unexpected behaviors within the event store and related components without altering the core logic.

Highlights

  • Enhanced Debugging Logs: Added extensive logging across various components to provide more context and diagnostic information, particularly for issues related to event store panics.
  • DML Event Check: Introduced a warning log in the basic dispatcher when a DML event's commit timestamp is less than or equal to the current checkpoint timestamp.
  • Dispatcher State Logging: Improved logging during dispatcher reset operations to include more detailed timestamp and sequence information, aiding in understanding state transitions.
  • Event Broker Visibility: Increased logging in the event broker for handshake events, dispatcher resets, and heartbeat-based checkpoint updates, offering better insight into event flow and state management.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • downstreamadapter/dispatcher/basic_dispatcher.go
    • Added a warning log when a DML event's commitTs is less than or equal to the checkpointTs.
  • downstreamadapter/eventcollector/dispatcher_stat.go
    • Enhanced logging in the doReset function to include checkpointTs, resolvedTs, startTs, lastEventCommitTs, and lastEventSeq.
  • logservice/eventstore/event_store.go
    • Added dispatcherCheckpointTs to an existing panic log message in GetIterator.
  • pkg/eventservice/event_broker.go
    • Added startTs, checkpointTs, and lastScannedCommitTs to the log in sendHandshakeIfNeed.
    • Introduced detailed log.Info statements before and after copyStatistics during resetDispatcher.
    • Added detailed log.Info statements when updating a dispatcher's checkpoint via a heartbeat.
Activity
  • No human activity has been recorded for this pull request yet.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Mar 12, 2026

📝 Walkthrough

Walkthrough

Dispatcher reset and handshake logic now uses checkpoint timestamp (checkpointTs) as the lower bound and reference for schema lookups, logging, and handshake emission. dispatcherStat initialization/reset was consolidated to resetLowerBound(checkpointTs); checkpoint propagation via copyStatistics was removed. Tests updated/added to validate these behaviors.

Changes

Cohort / File(s) Summary
Reset & Handshake Logic
pkg/eventservice/event_broker.go
Use task.checkpointTs.Load() / newCheckpointTs instead of startTs for handshake, schema lookups, and error logging; log field names updated to checkpoint-related names; requestStartTs included in final log.
Dispatcher Stat State & API
pkg/eventservice/dispatcher_stat.go
Removed startTs field; introduced resetLowerBound(checkpointTs) to initialize checkpointTs, sentResolvedTs, lastScannedCommitTs, and reset lastScannedStartTs. copyStatistics no longer propagates checkpointTs.
Tests: Broker & Stat
pkg/eventservice/event_broker_test.go, pkg/eventservice/dispatcher_stat_test.go
Added three tests verifying reset uses checkpoint as new lower bound, handshake uses checkpointTs, and behavior after checkpoint advance. Updated existing stat test to assert lastScannedCommitTs initialization instead of startTs.

Sequence Diagram(s)

sequenceDiagram
  participant Client as Client
  participant Broker as EventBroker\n(pkg/eventservice)
  participant Dispatcher as DispatcherStat
  participant TableInfo as TableInfoCache
  participant Downstream as DownstreamConsumer

  rect rgba(200,230,255,0.5)
  Client->>Broker: request resetDispatcher(table, dispatcherInfo)
  Broker->>Dispatcher: read oldCheckpointTs (oldStat.checkpointTs)\nread newCheckpointTs (dispatcherInfo.GetStartTs() -> newCheckpointTs)
  end

  rect rgba(200,255,200,0.5)
  Broker->>TableInfo: lookup tableInfo(using newCheckpointTs)
  TableInfo-->>Broker: schema/table metadata
  Broker->>Dispatcher: resetLowerBound(newCheckpointTs)
  Dispatcher-->>Broker: ack reset
  end

  rect rgba(255,230,200,0.5)
  Broker->>Downstream: sendHandshakeIfNeed(checkpointTs)
  Downstream-->>Broker: confirm handshake received
  Broker->>Client: reset complete (logs include requestStartTs, oldCheckpointTs, newCheckpointTs)
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Suggested reviewers

  • lidezhu
  • tenfyzhong
  • flowbehappy

Poem

🐰 I hopped to the checkpoint, soft and spry,
New bounds set where old timestamps lie,
Handshakes now follow the checkpoint's cue,
Tables rechecked, the schema is new —
A tiny rabbit cheers the reset sky! 🥕

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 8.33% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: removing runtime startTs and using checkpointTs as the single lower bound in the event service dispatcher.
Description check ✅ Passed The description comprehensively addresses the template requirements with problem statement, detailed change explanation, test confirmation, and release notes.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ti-chi-bot ti-chi-bot Bot added the size/M Denotes a PR that changes 30-99 lines, ignoring generated files. label Mar 12, 2026
@3AceShowHand
Copy link
Copy Markdown
Collaborator Author

/test all

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request adds several logging statements to aid in debugging the event store. The changes are generally helpful for diagnostics. I've identified a few areas for improvement related to logging efficiency and code duplication. Specifically, using zap.Stringer instead of zap.Any for DispatcherID would be more performant, and some duplicated logging code in event_broker.go could be refactored for better maintainability.

Comment thread downstreamadapter/dispatcher/basic_dispatcher.go Outdated
Comment thread pkg/eventservice/event_broker.go Outdated
Comment on lines +1170 to +1190
log.Info("before copy statistics when reset dispatcher",
zap.Stringer("changefeedID", changefeedID),
zap.Stringer("dispatcherID", dispatcherID),
zap.Uint64("requestStartTs", dispatcherInfo.GetStartTs()),
zap.Uint64("oldCheckpointTs", oldStat.checkpointTs.Load()),
zap.Uint64("oldLastScannedCommitTs", oldStat.lastScannedCommitTs.Load()),
zap.Uint64("newCheckpointTs", newStat.checkpointTs.Load()),
zap.Uint64("newLastScannedCommitTs", newStat.lastScannedCommitTs.Load()),
zap.Uint64("oldEpoch", oldStat.epoch),
zap.Uint64("newEpoch", newStat.epoch))
newStat.copyStatistics(oldStat)
log.Info("after copy statistics when reset dispatcher",
zap.Stringer("changefeedID", changefeedID),
zap.Stringer("dispatcherID", dispatcherID),
zap.Uint64("requestStartTs", dispatcherInfo.GetStartTs()),
zap.Uint64("oldCheckpointTs", oldStat.checkpointTs.Load()),
zap.Uint64("oldLastScannedCommitTs", oldStat.lastScannedCommitTs.Load()),
zap.Uint64("newCheckpointTs", newStat.checkpointTs.Load()),
zap.Uint64("newLastScannedCommitTs", newStat.lastScannedCommitTs.Load()),
zap.Uint64("oldEpoch", oldStat.epoch),
zap.Uint64("newEpoch", newStat.epoch))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The two logging blocks before and after newStat.copyStatistics(oldStat) are nearly identical, which introduces code duplication. This can be refactored to improve readability and maintainability by extracting the common logging fields into a shared slice.

        logFields := []zap.Field{
                zap.Stringer("changefeedID", changefeedID),
                zap.Stringer("dispatcherID", dispatcherID),
                zap.Uint64("requestStartTs", dispatcherInfo.GetStartTs()),
                zap.Uint64("oldCheckpointTs", oldStat.checkpointTs.Load()),
                zap.Uint64("oldLastScannedCommitTs", oldStat.lastScannedCommitTs.Load()),
                zap.Uint64("oldEpoch", oldStat.epoch),
                zap.Uint64("newEpoch", newStat.epoch),
        }
        log.Info("before copy statistics when reset dispatcher",
                append(logFields,
                        zap.Uint64("newCheckpointTs", newStat.checkpointTs.Load()),
                        zap.Uint64("newLastScannedCommitTs", newStat.lastScannedCommitTs.Load()))...)
        newStat.copyStatistics(oldStat)
        log.Info("after copy statistics when reset dispatcher",
                append(logFields,
                        zap.Uint64("newCheckpointTs", newStat.checkpointTs.Load()),
                        zap.Uint64("newLastScannedCommitTs", newStat.lastScannedCommitTs.Load()))...)

Comment thread pkg/eventservice/event_broker.go Outdated
if dispatcher.checkpointTs.Load() < dp.CheckpointTs {
log.Info("update dispatcher checkpoint by the heartbeat",
zap.Stringer("serverID", node.ID(heartbeat.serverID)),
zap.Any("dispatcherID", dispatcher.id),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using zap.Any for logging complex types like common.DispatcherID can be inefficient due to reflection. Since common.DispatcherID implements the fmt.Stringer interface, it's more performant and idiomatic to use zap.Stringer.

                                zap.Stringer("dispatcherID", dispatcher.id),

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@downstreamadapter/eventcollector/dispatcher_stat.go`:
- Around line 195-200: The log is recording d.lastEventSeq.Load() and
d.lastEventCommitTs.Load() after those fields were reset, so it always prints 0;
capture the pre-reset values into local variables (e.g., oldSeq :=
d.lastEventSeq.Load(), oldCommit := d.lastEventCommitTs.Load()) before the reset
operation (the place where lastEventSeq/lastEventCommitTs are zeroed), and then
use those local snapshots in the zap.Uint64(...) calls for "lastEventSeq" and
"lastEventCommitTs" so the diagnostic reflects the state prior to mutation.

In `@pkg/eventservice/event_broker.go`:
- Around line 1258-1266: The checkpoint update must be made monotonic using an
atomic compare-and-swap instead of a plain Load/Store; replace the current
Load()+Store() sequence around dispatcher.checkpointTs with a CAS loop that
reads old := dispatcher.checkpointTs.Load(), returns early if dp.CheckpointTs <=
old, otherwise attempts dispatcher.checkpointTs.CompareAndSwap(old,
dp.CheckpointTs) and retries on failure; only log the "update dispatcher
checkpoint" message after a successful CAS so a racing handler cannot overwrite
a newer checkpoint with an older dp.CheckpointTs.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: a403e8bf-be15-4a54-9b69-cc1a0c7aa528

📥 Commits

Reviewing files that changed from the base of the PR and between d37d22f and 63e6dcf.

📒 Files selected for processing (4)
  • downstreamadapter/dispatcher/basic_dispatcher.go
  • downstreamadapter/eventcollector/dispatcher_stat.go
  • logservice/eventstore/event_store.go
  • pkg/eventservice/event_broker.go

Comment on lines +195 to +200
zap.Uint64("resetTs", resetTs),
zap.Uint64("checkpointTs", d.target.GetCheckpointTs()),
zap.Uint64("resolvedTs", d.target.GetResolvedTs()),
zap.Uint64("startTs", d.target.GetStartTs()),
zap.Uint64("lastEventCommitTs", d.lastEventCommitTs.Load()),
zap.Uint64("lastEventSeq", d.lastEventSeq.Load()))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Capture the pre-reset sequence before zeroing it.

lastEventSeq was already reset on Line 185, so this new field always logs 0. That makes the added reset diagnostics misleading right where they are meant to explain ordering/reset problems. Snapshot the old sequence/commit values before mutating state, then log those snapshots instead.

Proposed fix
 func (d *dispatcherStat) doReset(serverID node.ID, resetTs uint64) {
+	lastEventSeq := d.lastEventSeq.Load()
+	lastEventCommitTs := d.lastEventCommitTs.Load()
 	epoch := d.epoch.Add(1)
 	d.lastEventSeq.Store(0)
 	// remove the dispatcher from the dynamic stream
 	resetRequest := d.newDispatcherResetRequest(d.eventCollector.getLocalServerID().String(), resetTs, epoch)
 	msg := messaging.NewSingleTargetMessage(serverID, messaging.EventServiceTopic, resetRequest)
@@
 		zap.Uint64("resetTs", resetTs),
 		zap.Uint64("checkpointTs", d.target.GetCheckpointTs()),
 		zap.Uint64("resolvedTs", d.target.GetResolvedTs()),
 		zap.Uint64("startTs", d.target.GetStartTs()),
-		zap.Uint64("lastEventCommitTs", d.lastEventCommitTs.Load()),
-		zap.Uint64("lastEventSeq", d.lastEventSeq.Load()))
+		zap.Uint64("lastEventCommitTs", lastEventCommitTs),
+		zap.Uint64("lastEventSeq", lastEventSeq))
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/eventcollector/dispatcher_stat.go` around lines 195 - 200,
The log is recording d.lastEventSeq.Load() and d.lastEventCommitTs.Load() after
those fields were reset, so it always prints 0; capture the pre-reset values
into local variables (e.g., oldSeq := d.lastEventSeq.Load(), oldCommit :=
d.lastEventCommitTs.Load()) before the reset operation (the place where
lastEventSeq/lastEventCommitTs are zeroed), and then use those local snapshots
in the zap.Uint64(...) calls for "lastEventSeq" and "lastEventCommitTs" so the
diagnostic reflects the state prior to mutation.

Comment thread pkg/eventservice/event_broker.go Outdated
Comment on lines 1258 to 1266
log.Info("update dispatcher checkpoint by the heartbeat",
zap.Stringer("serverID", node.ID(heartbeat.serverID)),
zap.Any("dispatcherID", dispatcher.id),
zap.Uint64("oldCheckpointTs", dispatcher.checkpointTs.Load()),
zap.Uint64("newCheckpointTs", dp.CheckpointTs),
zap.Uint64("sentResolvedTs", dispatcher.sentResolvedTs.Load()),
zap.Uint64("lastScannedCommitTs", dispatcher.lastScannedCommitTs.Load()),
zap.Uint64("dispatcherEpoch", dispatcher.epoch))
dispatcher.checkpointTs.Store(dp.CheckpointTs)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Make the checkpoint update monotonic.

This is still a load/compare/store sequence. If two heartbeat handlers race here, both can pass the < check and the slower goroutine can overwrite a newer checkpoint with an older one. That can regress checkpointTs and feed stale scan ranges back into the event path.

Proposed fix
-		if dispatcher.checkpointTs.Load() < dp.CheckpointTs {
-			log.Info("update dispatcher checkpoint by the heartbeat",
-				zap.Stringer("serverID", node.ID(heartbeat.serverID)),
-				zap.Any("dispatcherID", dispatcher.id),
-				zap.Uint64("oldCheckpointTs", dispatcher.checkpointTs.Load()),
-				zap.Uint64("newCheckpointTs", dp.CheckpointTs),
-				zap.Uint64("sentResolvedTs", dispatcher.sentResolvedTs.Load()),
-				zap.Uint64("lastScannedCommitTs", dispatcher.lastScannedCommitTs.Load()),
-				zap.Uint64("dispatcherEpoch", dispatcher.epoch))
-			dispatcher.checkpointTs.Store(dp.CheckpointTs)
-		}
+		for {
+			oldCheckpointTs := dispatcher.checkpointTs.Load()
+			if oldCheckpointTs >= dp.CheckpointTs {
+				break
+			}
+			if dispatcher.checkpointTs.CompareAndSwap(oldCheckpointTs, dp.CheckpointTs) {
+				log.Info("update dispatcher checkpoint by the heartbeat",
+					zap.Stringer("serverID", node.ID(heartbeat.serverID)),
+					zap.Any("dispatcherID", dispatcher.id),
+					zap.Uint64("oldCheckpointTs", oldCheckpointTs),
+					zap.Uint64("newCheckpointTs", dp.CheckpointTs),
+					zap.Uint64("sentResolvedTs", dispatcher.sentResolvedTs.Load()),
+					zap.Uint64("lastScannedCommitTs", dispatcher.lastScannedCommitTs.Load()),
+					zap.Uint64("dispatcherEpoch", dispatcher.epoch))
+				break
+			}
+		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/eventservice/event_broker.go` around lines 1258 - 1266, The checkpoint
update must be made monotonic using an atomic compare-and-swap instead of a
plain Load/Store; replace the current Load()+Store() sequence around
dispatcher.checkpointTs with a CAS loop that reads old :=
dispatcher.checkpointTs.Load(), returns early if dp.CheckpointTs <= old,
otherwise attempts dispatcher.checkpointTs.CompareAndSwap(old, dp.CheckpointTs)
and retries on failure; only log the "update dispatcher checkpoint" message
after a successful CAS so a racing handler cannot overwrite a newer checkpoint
with an older dp.CheckpointTs.

@ti-chi-bot ti-chi-bot Bot added size/L Denotes a PR that changes 100-499 lines, ignoring generated files. and removed size/M Denotes a PR that changes 30-99 lines, ignoring generated files. labels Mar 12, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
pkg/eventservice/event_broker.go (1)

1265-1274: ⚠️ Potential issue | 🔴 Critical

Keep heartbeat checkpoint updates monotonic.

Line 1265 is still a Load/Store sequence, so concurrent heartbeat handlers can both pass the guard and the slower one can overwrite a newer checkpoint with an older dp.CheckpointTs. Use a CAS loop and only emit the log after the CAS succeeds.

Suggested local fix
-		if dispatcher.checkpointTs.Load() < dp.CheckpointTs {
-			log.Info("update dispatcher checkpoint by the heartbeat",
-				zap.Stringer("serverID", node.ID(heartbeat.serverID)),
-				zap.Any("dispatcherID", dispatcher.id),
-				zap.Uint64("oldCheckpointTs", dispatcher.checkpointTs.Load()),
-				zap.Uint64("newCheckpointTs", dp.CheckpointTs),
-				zap.Uint64("sentResolvedTs", dispatcher.sentResolvedTs.Load()),
-				zap.Uint64("lastScannedCommitTs", dispatcher.lastScannedCommitTs.Load()),
-				zap.Uint64("dispatcherEpoch", dispatcher.epoch))
-			dispatcher.checkpointTs.Store(dp.CheckpointTs)
-		}
+		for {
+			oldCheckpointTs := dispatcher.checkpointTs.Load()
+			if oldCheckpointTs >= dp.CheckpointTs {
+				break
+			}
+			if dispatcher.checkpointTs.CompareAndSwap(oldCheckpointTs, dp.CheckpointTs) {
+				log.Info("update dispatcher checkpoint by the heartbeat",
+					zap.Stringer("serverID", node.ID(heartbeat.serverID)),
+					zap.Any("dispatcherID", dispatcher.id),
+					zap.Uint64("oldCheckpointTs", oldCheckpointTs),
+					zap.Uint64("newCheckpointTs", dp.CheckpointTs),
+					zap.Uint64("sentResolvedTs", dispatcher.sentResolvedTs.Load()),
+					zap.Uint64("lastScannedCommitTs", dispatcher.lastScannedCommitTs.Load()),
+					zap.Uint64("dispatcherEpoch", dispatcher.epoch))
+				break
+			}
+		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/eventservice/event_broker.go` around lines 1265 - 1274, The current
Load/Store sequence around dispatcher.checkpointTs with dp.CheckpointTs is racy:
replace it with an atomic CAS loop that reads old :=
dispatcher.checkpointTs.Load(), compares old < dp.CheckpointTs, and attempts
dispatcher.checkpointTs.CompareAndSwap(old, dp.CheckpointTs) until it succeeds
(or until old >= dp.CheckpointTs), and move the log.Info call so it only runs
after a successful CompareAndSwap; update references to dispatcher.checkpointTs,
dp.CheckpointTs and the surrounding heartbeat handling logic to use this CAS
pattern to ensure monotonic checkpoint updates.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/eventservice/event_broker.go`:
- Around line 1138-1140: The reset logic snapshots oldCheckpointTs once
(oldStat.checkpointTs.Load()) and then may perform a statPtr swap while
heartbeats update checkpointTs, causing regressions; fix by serializing reset
with heartbeat checkpoint updates: when building newStat (the code paths around
dispatcherInfo.GetStartTs(), newStat creation, and the statPtr swap retry loop),
always re-read checkpointTs, startTs, sentResolvedTs and lastScannedCommitTs
from the current oldStat immediately before attempting the CAS/swap and retry
the read on any CAS failure, or protect the whole reset-and-swap sequence with
the same mutex used by heartbeat updates so the replacement is built from the
latest atomic values and published only once consistent. Ensure references to
oldStat.checkpointTs.Load(), dispatcherInfo.GetStartTs(), newStat, and the
statPtr swap/CAS loop are updated accordingly.

---

Duplicate comments:
In `@pkg/eventservice/event_broker.go`:
- Around line 1265-1274: The current Load/Store sequence around
dispatcher.checkpointTs with dp.CheckpointTs is racy: replace it with an atomic
CAS loop that reads old := dispatcher.checkpointTs.Load(), compares old <
dp.CheckpointTs, and attempts dispatcher.checkpointTs.CompareAndSwap(old,
dp.CheckpointTs) until it succeeds (or until old >= dp.CheckpointTs), and move
the log.Info call so it only runs after a successful CompareAndSwap; update
references to dispatcher.checkpointTs, dp.CheckpointTs and the surrounding
heartbeat handling logic to use this CAS pattern to ensure monotonic checkpoint
updates.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 30d117b9-0900-4305-bf16-da499cd0edca

📥 Commits

Reviewing files that changed from the base of the PR and between 63e6dcf and 2f71f45.

📒 Files selected for processing (3)
  • pkg/eventservice/dispatcher_stat.go
  • pkg/eventservice/event_broker.go
  • pkg/eventservice/event_broker_test.go
💤 Files with no reviewable changes (1)
  • pkg/eventservice/dispatcher_stat.go

Comment on lines +1138 to 1140
oldCheckpointTs := oldStat.checkpointTs.Load()
newCheckpointTs := max(dispatcherInfo.GetStartTs(), oldCheckpointTs)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Synchronize reset with checkpoint advancement.

Line 1138 snapshots oldCheckpointTs once, and Lines 1171-1229 keep reusing that snapshot even if the swap retries. More importantly, any heartbeat that updates oldStat.checkpointTs after that snapshot but before statPtr is swapped is lost when newStat replaces the old struct, so startTs, checkpointTs, sentResolvedTs, and lastScannedCommitTs can all move backwards and replay already-acked data. This needs serialization with heartbeat checkpoint updates, or a reset flow that rebuilds/publishes the new state from the latest checkpoint atomically.

Also applies to: 1171-1229

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/eventservice/event_broker.go` around lines 1138 - 1140, The reset logic
snapshots oldCheckpointTs once (oldStat.checkpointTs.Load()) and then may
perform a statPtr swap while heartbeats update checkpointTs, causing
regressions; fix by serializing reset with heartbeat checkpoint updates: when
building newStat (the code paths around dispatcherInfo.GetStartTs(), newStat
creation, and the statPtr swap retry loop), always re-read checkpointTs,
startTs, sentResolvedTs and lastScannedCommitTs from the current oldStat
immediately before attempting the CAS/swap and retry the read on any CAS
failure, or protect the whole reset-and-swap sequence with the same mutex used
by heartbeat updates so the replacement is built from the latest atomic values
and published only once consistent. Ensure references to
oldStat.checkpointTs.Load(), dispatcherInfo.GetStartTs(), newStat, and the
statPtr swap/CAS loop are updated accordingly.

@ti-chi-bot ti-chi-bot Bot added size/M Denotes a PR that changes 30-99 lines, ignoring generated files. and removed size/L Denotes a PR that changes 100-499 lines, ignoring generated files. labels Mar 13, 2026
@ti-chi-bot ti-chi-bot Bot added size/L Denotes a PR that changes 100-499 lines, ignoring generated files. and removed size/M Denotes a PR that changes 30-99 lines, ignoring generated files. labels Mar 14, 2026
@3AceShowHand 3AceShowHand changed the title [DNM] add more logs to debug event store pkg/eventservice: use checkpointTs as dispatcher runtime lower bound Mar 14, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
pkg/eventservice/event_broker.go (1)

1135-1201: ⚠️ Potential issue | 🔴 Critical

Rebuild the reset state from the latest dispatcher snapshot before publishing it.

Lines 1135-1170 snapshot oldCheckpointTs, fetch tableInfo, and build newStat once before the CAS loop. A heartbeat can still advance oldStat.checkpointTs without changing the pointer, so the first CAS can succeed and publish a lower checkpointTs / sentResolvedTs / lastScannedCommitTs than the live dispatcher had already reached. If the pointer does change, the retry path still reuses the stale newStat and tableInfo from the first attempt. That can replay already-acked data and regress the checkpoint later reported back to the event store. Re-read the current stat and rebuild the replacement state on each retry, or serialize reset with checkpoint updates.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/eventservice/event_broker.go` around lines 1135 - 1201, The reset builds
newStat once before the CAS loop which can publish stale checkpoint values; to
fix, move the creation of newStat (calls to newDispatcherStat,
newStat.copyStatistics, newStat.resetLowerBound) and the tableInfo lookup
(c.schemaStore.GetTableInfo using newCheckpointTs) inside the for-loop after
reloading oldStat := statPtr.Load(), and recompute
oldCheckpointTs/newCheckpointTs from oldStat.checkpointTs.Load() and
dispatcherInfo.GetStartTs() each iteration; preserve the existing
epoch-staleness check (if oldStat.epoch >= dispatcherInfo.GetEpoch()) and
oldStat.isRemoved.Store(true) logic, then attempt CompareAndSwap, retrying with
rebuilt newStat on CAS failure.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@pkg/eventservice/event_broker.go`:
- Around line 1135-1201: The reset builds newStat once before the CAS loop which
can publish stale checkpoint values; to fix, move the creation of newStat (calls
to newDispatcherStat, newStat.copyStatistics, newStat.resetLowerBound) and the
tableInfo lookup (c.schemaStore.GetTableInfo using newCheckpointTs) inside the
for-loop after reloading oldStat := statPtr.Load(), and recompute
oldCheckpointTs/newCheckpointTs from oldStat.checkpointTs.Load() and
dispatcherInfo.GetStartTs() each iteration; preserve the existing
epoch-staleness check (if oldStat.epoch >= dispatcherInfo.GetEpoch()) and
oldStat.isRemoved.Store(true) logic, then attempt CompareAndSwap, retrying with
rebuilt newStat on CAS failure.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 8f53ff71-5a61-4964-97b0-94c8d44a747a

📥 Commits

Reviewing files that changed from the base of the PR and between 657e5f8 and 4e9f6a8.

📒 Files selected for processing (4)
  • pkg/eventservice/dispatcher_stat.go
  • pkg/eventservice/dispatcher_stat_test.go
  • pkg/eventservice/event_broker.go
  • pkg/eventservice/event_broker_test.go

@3AceShowHand
Copy link
Copy Markdown
Collaborator Author

/test all

1 similar comment
@3AceShowHand
Copy link
Copy Markdown
Collaborator Author

/test all

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-triage-completed release-note Denotes a PR that will be considered when it comes time to generate release notes. size/L Denotes a PR that changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

stale runtime startTs may cause scan below checkpoint after reset

1 participant