Skip to content

eventcollector: introduce dispatcher session to separate connection lifecycle management#4991

Merged
ti-chi-bot[bot] merged 21 commits intopingcap:masterfrom
lidezhu:ldz/refactor-event-collector001
May 7, 2026
Merged

eventcollector: introduce dispatcher session to separate connection lifecycle management#4991
ti-chi-bot[bot] merged 21 commits intopingcap:masterfrom
lidezhu:ldz/refactor-event-collector001

Conversation

@lidezhu
Copy link
Copy Markdown
Collaborator

@lidezhu lidezhu commented May 5, 2026

What problem does this PR solve?

Issue Number: close #4999

What is changed and how it works?

This pull request refactors the dispatcher's connection and lifecycle management by introducing a dedicated dispatcherSession abstraction. By moving connection state and coordination logic out of dispatcherStat, the system achieves better separation of concerns and more robust handling of dispatcher registration, reset operations, and heartbeat synchronization. The changes also improve event validation through explicit per-epoch state tracking, ensuring that dispatchers correctly handle progress monitoring and connection lifecycle events.

Highlights

  • Session Abstraction: Introduced dispatcherSession to encapsulate dispatcher registration and lifecycle management, decoupling this logic from dispatcherStat.

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

Summary by CodeRabbit

  • Refactor

    • Reworked dispatcher lifecycle to centralize connection/session handling and simplify per-epoch state coordination.
    • Streamlined heartbeat construction and registration flows for dispatcher-event service interactions.
  • New Features

    • Event collector now runs periodic dispatcher heartbeat sending in the background to improve progress reporting.
  • Tests

    • Updated and added tests covering event filtering, commit-ts/heartbeat checkpoint behaviors, and heartbeat reset/handshake scenarios.

@ti-chi-bot ti-chi-bot Bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. labels May 5, 2026
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 5, 2026

Review Change Stack

📝 Walkthrough

Walkthrough

This PR introduces a new dispatcherSession abstraction that encapsulates dispatcher registration lifecycle and event-service coordination logic previously embedded within dispatcherStat. The session manages connection state (current event service, readiness signals, and remote candidates), handles readiness/reset operations via epoch advancement, routes dispatcher events, and provides lifecycle methods for registration and removal. dispatcherStat delegates all connection-state operations to the session and uses per-epoch state tracking for event validation and heartbeat clamping.

Changes

Session Abstraction & Dispatcher Lifecycle Refactoring

Layer / File(s) Summary
Session Lifecycle Abstraction
downstreamadapter/eventcollector/dispatcher_session.go
Introduces dispatcherConnState to track current event service, readiness flag, and remote candidate queue. Implements dispatcherSession with lifecycle methods: registerTo, commitReady, reset, remove, and signal handling via handleSignalEvent that routes TypeReadyEvent (updates readiness and triggers reset), TypeNotReusableEvent (advances remote candidates), and provides setRemoteCandidates to enqueue and act on candidate lists.
Dispatcher Stat Refactoring
downstreamadapter/eventcollector/dispatcher_stat.go
Removes inline dispatcherConnState and delegates lifecycle (run, clear, registerTo, commitReady, reset, remove) to the new session. Keeps per-epoch dispatcherEpochState for event validation and updates resets to call d.reset(d.session.getEventServiceID()). Heartbeat clamping and dispatcher queries now delegate to session methods.
Event Collector & Heartbeat Integration
downstreamadapter/eventcollector/event_collector.go
Starts sendEventServiceHeartbeats in EventCollector.Run; refactors heartbeat grouping to sendDispatcherHeartbeat + groupHeartbeat() and uses stat.getEventServiceID() / stat.isCurrentEventService() when processing heartbeat responses and generating congestion-control messages.
Tests
downstreamadapter/eventcollector/dispatcher_stat_test.go, downstreamadapter/eventcollector/event_collector_test.go
Updates tests to set/assert stat.session.connState for readiness and event-service ID; replaces TestShouldForwardEventByCommitTs with TestFilterAndUpdateEventByCommitTs; adds TestCheckpointTsForEventServiceUsesCollectorObservedMaxTs; adjusts heartbeat tests to use session-scoped fields.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

  • pingcap/ticdc#4566: The main PR is directly related to #4566: both modify downstreamadapter/eventcollector to move heartbeat sending into the collector and refactor/groupHeartbeat semantics (including per-dispatcher epoch/checkpoint handling), plus closely overlapping changes in dispatcher_stat and related tests.
  • pingcap/ticdc#4579: The main PR's changes to commit-ts filtering/state-update logic and the associated test renames directly overlap the refactor in PR #4579 that splits filtering (shouldForwardEventByCommitTs) from state mutation and adjusts tests accordingly.

Suggested reviewers

  • asddongmen
  • wk989898

Poem

🐰 In burrows of code the sessions wake,
Epochs hop forward with each heartbeat wake,
Candidates queued in tidy lines,
Ready signals, reset signs,
A rabbit cheers: "Dispatchers, dance!" 🥕

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 3.23% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately and specifically describes the main change: introducing a dispatcher session abstraction to separate connection lifecycle management from dispatcherStat.
Description check ✅ Passed The PR description follows the required template structure with Issue Number, explanation of changes, highlights, and checklist, but the release-note section contains only a placeholder without actual release notes content.
Linked Issues check ✅ Passed The PR successfully implements the objective from issue #4999 by introducing dispatcherSession as a new component to manage dispatcher connection lifecycle, registration, and coordination logic, achieving the stated goal of centralizing this management.
Out of Scope Changes check ✅ Passed All changes are directly related to the refactoring objective: adding dispatcherSession implementation, migrating connection state management from dispatcherStat, updating tests, and adjusting event_collector to use the new abstraction with no unrelated modifications.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ti-chi-bot ti-chi-bot Bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label May 5, 2026
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the dispatcher heartbeat mechanism to be epoch-aware and moves the heartbeat emission logic from the DispatcherManager to the EventCollector. It introduces a new dispatcherSession to manage connection states and updates the heartbeat protocol to version 2, which now includes epoch information to prevent stale progress reporting and checkpoint jumps. Feedback focuses on improving the robustness of the binary decoding logic in dispatcher_heartbeat.go by adding necessary length checks to prevent potential panics and optimizing slice initializations for better efficiency.

Comment thread pkg/common/event/dispatcher_heartbeat.go
Comment thread pkg/common/event/dispatcher_heartbeat.go
Comment thread pkg/common/event/dispatcher_heartbeat.go
Comment thread pkg/common/event/dispatcher_heartbeat.go
Comment thread pkg/common/event/dispatcher_heartbeat.go
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
downstreamadapter/eventcollector/dispatcher_stat.go (1)

233-280: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Reset replay is still filtered by stale commit-ts state.

filterAndUpdateEventByCommitTs still uses dispatcher-wide lastEventCommitTs/gotDDLOnTs/gotSyncpointOnTS, but resets now only replace currentEpoch. If resetTs := target.GetCheckpointTs() lags the last forwarded commit ts, the replay range from the new epoch is <= lastEventCommitTs from the old epoch and gets discarded here, so the reset cannot actually backfill the gap it was meant to repair. This dedupe state needs to move into the epoch state, or be reinitialized whenever the epoch advances.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@downstreamadapter/eventcollector/dispatcher_stat.go` around lines 233 - 280,
filterAndUpdateEventByCommitTs currently uses dispatcher-wide dedupe state
(d.lastEventCommitTs, d.gotDDLOnTs, d.gotSyncpointOnTS) which prevents a reset
that advances epoch from replaying older commit-ts events; move the dedupe state
into the epoch so replayed events aren't filtered by previous epoch values: add
lastEventCommitTs/gotDDLOnTs/gotSyncpointOnTS fields to dispatcherEpochState (or
ensure those dispatcher-wide flags are reinitialized whenever the epoch in state
advances), then update filterAndUpdateEventByCommitTs to read/update the
epoch-local fields (e.g., state.lastEventCommitTs, state.gotDDLOnTs,
state.gotSyncpointOnTS) instead of
d.lastEventCommitTs/d.gotDDLOnTs/d.gotSyncpointOnTS so a reset/backfill can emit
events <= previous commit-ts for the new epoch.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@downstreamadapter/eventcollector/dispatcher_session.go`:
- Around line 88-97: The handler advances to the next remote for every non-local
TypeNotReusableEvent even if the signal came from an old/stale eventService, and
getNextRemoteCandidate leaves eventServiceID unchanged when candidates are
exhausted causing setRemoteCandidates to be rejected and the session to stall;
fix by making the non-local not-reusable path first verify the signal's source
matches the current d.eventServiceID (do not advance if it’s stale), and update
getNextRemoteCandidate (and the analogous logic around lines 262-269) to clear
d.eventServiceID (set to empty) when remoteCandidates is empty so subsequent
setRemoteCandidates calls are accepted. Ensure you reference
dispatcherConnState.getNextRemoteCandidate, the d.eventServiceID field and the
non-local TypeNotReusableEvent handling code paths when applying the changes.
- Around line 205-223: The TypeReadyEvent case dereferences event.From at
multiple places without ensuring it's non-nil, causing a panic for malformed
ready signals; update the ready-event branch in dispatcher_session.go (the case
handling commonEvent.TypeReadyEvent, referencing s.isCurrentEventService,
s.removeFrom, localServerID, and s.readyCallback) to first check if event.From
== nil and if so ignore/return, then proceed with the existing logic that
dereferences *event.From; also ensure any existing early checks that compare
event.From to localServerID or call isCurrentEventService use the guarded,
non-nil pointer.

In `@pkg/common/event/dispatcher_heartbeat.go`:
- Around line 48-52: The Unmarshal implementations must validate buffer lengths
before calling buf.Next(...) to avoid panics on short payloads; update
DispatcherProgressLegacy.Unmarshal (and the equivalent legacy and v2 progress
record Unmarshal functions referenced in the review) to check buf.Len() or the
return size before each field extraction (e.g., ensure enough bytes for
DispatcherID.GetSize() and for the 8-byte checkpoint) and return a clear error
on truncated data instead of proceeding to binary.BigEndian.Uint64 or calling
Unmarshal on incomplete slices. Ensure each per-entry length check is added
where DispatcherID.Unmarshal and binary.BigEndian.Uint64 are invoked so
malformed or truncated heartbeats return an error rather than panic.

In `@pkg/eventservice/event_broker.go`:
- Around line 1247-1285: The v1 (DispatcherProgressesLegacy) path doesn't verify
heartbeat.serverID against the dispatcher's current owner, allowing stale
collectors to refresh checkpointTs and lastReceivedHeartbeatTime; update
handleProgress (or its callers) to also validate the dispatcher's owner server
ID: pass heartbeat.serverID into handleProgress (or access it inside) and after
loading dispatcher (dispatcher := dispatcherPtr.Load()) compare the dispatcher's
owner field (e.g. dispatcher.serverID or dispatcher.ownerServerID — the field
that stores the dispatcher's current owner) to heartbeat.serverID and if they
differ either ignore the heartbeat (return) or append a DSStateRemoved response
exactly as done when dispatcherPtr == nil, then proceed with epoch/checkpoint
updates only when the owner matches; apply this change for both
DispatcherProgresses and DispatcherProgressesLegacy paths.

---

Outside diff comments:
In `@downstreamadapter/eventcollector/dispatcher_stat.go`:
- Around line 233-280: filterAndUpdateEventByCommitTs currently uses
dispatcher-wide dedupe state (d.lastEventCommitTs, d.gotDDLOnTs,
d.gotSyncpointOnTS) which prevents a reset that advances epoch from replaying
older commit-ts events; move the dedupe state into the epoch so replayed events
aren't filtered by previous epoch values: add
lastEventCommitTs/gotDDLOnTs/gotSyncpointOnTS fields to dispatcherEpochState (or
ensure those dispatcher-wide flags are reinitialized whenever the epoch in state
advances), then update filterAndUpdateEventByCommitTs to read/update the
epoch-local fields (e.g., state.lastEventCommitTs, state.gotDDLOnTs,
state.gotSyncpointOnTS) instead of
d.lastEventCommitTs/d.gotDDLOnTs/d.gotSyncpointOnTS so a reset/backfill can emit
events <= previous commit-ts for the new epoch.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 02af9288-8f0d-4a4e-9764-f611a48a36dd

📥 Commits

Reviewing files that changed from the base of the PR and between 5518eb2 and 708063a.

📒 Files selected for processing (12)
  • .gitignore
  • downstreamadapter/dispatchermanager/dispatcher_manager.go
  • downstreamadapter/eventcollector/dispatcher_session.go
  • downstreamadapter/eventcollector/dispatcher_stat.go
  • downstreamadapter/eventcollector/dispatcher_stat_test.go
  • downstreamadapter/eventcollector/event_collector.go
  • downstreamadapter/eventcollector/event_collector_test.go
  • logservice/eventstore/event_store_test.go
  • pkg/common/event/dispatcher_heartbeat.go
  • pkg/common/event/dispatcher_heartbeat_test.go
  • pkg/eventservice/event_broker.go
  • pkg/eventservice/event_broker_test.go
💤 Files with no reviewable changes (1)
  • downstreamadapter/dispatchermanager/dispatcher_manager.go

Comment thread downstreamadapter/eventcollector/dispatcher_session.go
Comment thread downstreamadapter/eventcollector/dispatcher_session.go
Comment thread pkg/common/event/dispatcher_heartbeat.go
Comment thread pkg/eventservice/event_broker.go
@lidezhu lidezhu changed the title Ldz/refactor event collector001 eventcollector: refactor event collector dispatcher session May 5, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
downstreamadapter/eventcollector/event_collector.go (1)

424-437: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

groupHeartbeat is missing the empty-ID guard present in newCongestionControlMessages.

newCongestionControlMessages (Line 712–715) explicitly skips dispatchers whose getEventServiceID() returns "". groupHeartbeat does not. Because isReceivingDataEvent() and getEventServiceID() are two separate reads, a concurrent RemoveDispatcher call (which calls stat.remove() before dispatcherMap.Delete) can race between them, leaving getEventServiceID() returning "". The group helper would then insert a heartbeat under node.ID("") and sendDispatcherHeartbeat would enqueue a message to an empty target.

🛡️ Proposed fix
 	c.dispatcherMap.Range(func(_, value interface{}) bool {
 		stat := value.(*dispatcherStat)
 		if !stat.isReceivingDataEvent() {
 			return true
 		}
+		eventServiceID := stat.getEventServiceID()
+		if eventServiceID == "" {
+			return true
+		}
 		checkpointTs, epoch := stat.getHeartbeatProgressForEventService()
 		group(
-			stat.getEventServiceID(),
+			eventServiceID,
 			stat.getDispatcherID(),
 			checkpointTs,
 			epoch,
 		)
 		return true
 	})
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@downstreamadapter/eventcollector/event_collector.go` around lines 424 - 437,
groupHeartbeat can insert heartbeats under an empty node ID because it doesn't
guard against stat.getEventServiceID() returning "" (unlike
newCongestionControlMessages); update groupHeartbeat's dispatcherMap.Range loop
to fetch id := stat.getEventServiceID() after confirming
stat.isReceivingDataEvent(), and skip the dispatcher (do not call group or
sendDispatcherHeartbeat) if id == "", mirroring the empty-ID check used in
newCongestionControlMessages to avoid enqueuing heartbeats for removed/cleared
dispatchers (refer to dispatcherMap, dispatcherStat.remove, group,
sendDispatcherHeartbeat, getEventServiceID, and isReceivingDataEvent).
downstreamadapter/eventcollector/dispatcher_stat_test.go (1)

341-494: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

TestFilterAndUpdateEventByCommitTs: the newly added expectedDDLOnTs/expectedSyncOnTs/expectedCommitTs struct fields are never referenced in the assertions.

Lines 489–491 assert the post-call state against the initial input values (tt.gotDDLOnTs, tt.gotSyncpointOnTS, tt.lastEventCommitTs) rather than the intended expected values. For example, the "DDL event with same commit ts and not got DDL" case has gotDDLOnTs: false and expectedDDLOnTs: true, but the assertion uses tt.gotDDLOnTs (i.e., false). Similarly for "DML event with larger commit ts" where expectedCommitTs: 110lastEventCommitTs: 100.

This means the test does not actually verify state mutations — the expected* fields are dead code.

The fix depends on whether shouldForwardEventByCommitTs is a pure filter (in which case the expected* fields should be removed) or also mutates state (in which case the assertions should use the expected fields):

🐛 Proposed fix — wire up the expected fields in assertions
-		require.Equal(t, tt.gotDDLOnTs, stat.gotDDLOnTs.Load())
-		require.Equal(t, tt.gotSyncpointOnTS, stat.gotSyncpointOnTS.Load())
-		require.Equal(t, tt.lastEventCommitTs, stat.lastEventCommitTs.Load())
+		require.Equal(t, tt.expectedDDLOnTs, stat.gotDDLOnTs.Load())
+		require.Equal(t, tt.expectedSyncOnTs, stat.gotSyncpointOnTS.Load())
+		require.Equal(t, tt.expectedCommitTs, stat.lastEventCommitTs.Load())

Alternatively, if shouldForwardEventByCommitTs is intentionally a pure filter (with updateCommitTsStateByEvents handling mutations separately), remove the expectedDDLOnTs, expectedSyncOnTs, expectedCommitTs fields entirely to avoid misleading readers.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@downstreamadapter/eventcollector/dispatcher_stat_test.go` around lines 341 -
494, The test TestFilterAndUpdateEventByCommitTs defines
expectedDDLOnTs/expectedSyncOnTs/expectedCommitTs but the assertions check the
original input fields (tt.gotDDLOnTs, tt.gotSyncpointOnTS, tt.lastEventCommitTs)
so state mutations are never verified; either (A) update the assertions after
calling stat.shouldForwardEventByCommitTs to compare against tt.expectedDDLOnTs,
tt.expectedSyncOnTs and tt.expectedCommitTs (and keep the existing require.Equal
on result), or (B) if shouldForwardEventByCommitTs is meant to be pure, remove
the unused expected* fields from the test cases to avoid dead code; reference
the test name TestFilterAndUpdateEventByCommitTs and methods
shouldForwardEventByCommitTs / updateCommitTsStateByEvents to decide which
approach to apply.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@downstreamadapter/eventcollector/dispatcher_stat_test.go`:
- Around line 341-494: The test TestFilterAndUpdateEventByCommitTs defines
expectedDDLOnTs/expectedSyncOnTs/expectedCommitTs but the assertions check the
original input fields (tt.gotDDLOnTs, tt.gotSyncpointOnTS, tt.lastEventCommitTs)
so state mutations are never verified; either (A) update the assertions after
calling stat.shouldForwardEventByCommitTs to compare against tt.expectedDDLOnTs,
tt.expectedSyncOnTs and tt.expectedCommitTs (and keep the existing require.Equal
on result), or (B) if shouldForwardEventByCommitTs is meant to be pure, remove
the unused expected* fields from the test cases to avoid dead code; reference
the test name TestFilterAndUpdateEventByCommitTs and methods
shouldForwardEventByCommitTs / updateCommitTsStateByEvents to decide which
approach to apply.

In `@downstreamadapter/eventcollector/event_collector.go`:
- Around line 424-437: groupHeartbeat can insert heartbeats under an empty node
ID because it doesn't guard against stat.getEventServiceID() returning ""
(unlike newCongestionControlMessages); update groupHeartbeat's
dispatcherMap.Range loop to fetch id := stat.getEventServiceID() after
confirming stat.isReceivingDataEvent(), and skip the dispatcher (do not call
group or sendDispatcherHeartbeat) if id == "", mirroring the empty-ID check used
in newCongestionControlMessages to avoid enqueuing heartbeats for
removed/cleared dispatchers (refer to dispatcherMap, dispatcherStat.remove,
group, sendDispatcherHeartbeat, getEventServiceID, and isReceivingDataEvent).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: fe594fd9-7d83-4488-98f4-781bd0696834

📥 Commits

Reviewing files that changed from the base of the PR and between 708063a and 2e30f71.

📒 Files selected for processing (5)
  • downstreamadapter/eventcollector/dispatcher_stat.go
  • downstreamadapter/eventcollector/dispatcher_stat_test.go
  • downstreamadapter/eventcollector/event_collector.go
  • downstreamadapter/eventcollector/event_collector_test.go
  • pkg/sink/mysql/mysql_writer_dml_test.go
✅ Files skipped from review due to trivial changes (1)
  • pkg/sink/mysql/mysql_writer_dml_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • downstreamadapter/eventcollector/dispatcher_stat.go

@lidezhu lidezhu changed the title eventcollector: refactor event collector dispatcher session eventcollector: introduce dispatcher session May 6, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
downstreamadapter/eventcollector/dispatcher_stat_test.go (1)

1435-1480: 💤 Low value

LGTM: heartbeat checkpoint progression and clamping to dispatcher checkpointTs are well covered.

The test exercises a clean progression of maxEventTs: constructor (startTs=100) → doReset(150) → handshake (180) → resolved (200) → DML (210), and confirms the heartbeat checkpoint clamps to mockDisp.checkPointTs when it falls below maxEventTs (line 1461-1462). Initial maxEventTs=100 correctly reflects newDispatcherStat's use of target.GetStartTs() per the dispatcher_stat.go constructor.

Minor optional nit: consider adding a sub-assertion for the currentEventService value alongside the checkpoint comparisons (e.g., after doReset) to make the "for event service" intent in the test name explicit, but this is non-blocking.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@downstreamadapter/eventcollector/dispatcher_stat_test.go` around lines 1435 -
1480, Add an explicit assertion that the epoch state's currentEventService is
set to the event service passed to doReset to make the test intent explicit:
after calling stat.doReset(node.ID("event-service-1"), 150) assert that
stat.loadCurrentEpochState().currentEventService (or the exported accessor if
present) equals node.ID("event-service-1"); this should sit alongside the
existing checkpoint assertions that use getHeartbeatCheckpoint() and verifies
doReset correctly records the currentEventService for the event service path
tested.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@downstreamadapter/eventcollector/dispatcher_stat_test.go`:
- Around line 1435-1480: Add an explicit assertion that the epoch state's
currentEventService is set to the event service passed to doReset to make the
test intent explicit: after calling stat.doReset(node.ID("event-service-1"),
150) assert that stat.loadCurrentEpochState().currentEventService (or the
exported accessor if present) equals node.ID("event-service-1"); this should sit
alongside the existing checkpoint assertions that use getHeartbeatCheckpoint()
and verifies doReset correctly records the currentEventService for the event
service path tested.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 15a68569-5f4b-43f1-a9c1-6c95cf5c3f0e

📥 Commits

Reviewing files that changed from the base of the PR and between 2e30f71 and 5795a32.

📒 Files selected for processing (1)
  • downstreamadapter/eventcollector/dispatcher_stat_test.go

@lidezhu lidezhu changed the title eventcollector: introduce dispatcher session eventcollector: introduce dispatcher session to separate connection lifecycle management May 6, 2026
@lidezhu
Copy link
Copy Markdown
Collaborator Author

lidezhu commented May 6, 2026

/test all

@lidezhu
Copy link
Copy Markdown
Collaborator Author

lidezhu commented May 6, 2026

Just do some code movement, puller-error-log-view failed because of original logs, it will be handled in later prs.

@ti-chi-bot ti-chi-bot Bot added needs-1-more-lgtm Indicates a PR needs 1 more LGTM. approved labels May 7, 2026
@lidezhu
Copy link
Copy Markdown
Collaborator Author

lidezhu commented May 7, 2026

/gemini summary

@gemini-code-assist
Copy link
Copy Markdown

Summary of Changes

This pull request refactors the dispatcher's connection and lifecycle management by introducing a dedicated dispatcherSession abstraction. By moving connection state and coordination logic out of dispatcherStat, the system achieves better separation of concerns and more robust handling of dispatcher registration, reset operations, and heartbeat synchronization. The changes also improve event validation through explicit per-epoch state tracking, ensuring that dispatchers correctly handle progress monitoring and connection lifecycle events.

Highlights

  • Session Abstraction: Introduced dispatcherSession to encapsulate dispatcher registration and lifecycle management, decoupling this logic from dispatcherStat.
  • Connection State Management: Implemented dispatcherConnState to track the current event service, readiness signals, and remote candidate queues, improving connection reliability.
  • Epoch-based Validation: Added dispatcherEpochState to track epoch, last event sequence, and maximum event timestamps, ensuring more accurate event validation and heartbeat coordination.
  • Test Coverage: Expanded test coverage for event filtering, checkpoint timing, and signal handling to validate the new session-based architecture.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Activity
  • Pull request created by lidezhu.
  • Automated review comments provided by CodeRabbit regarding potential panics and state management.
  • Author performed code movement and addressed initial feedback.
  • Pull request approved by hongyunyan.
  • Author requested a summary.

@lidezhu
Copy link
Copy Markdown
Collaborator Author

lidezhu commented May 7, 2026

/test all

@lidezhu
Copy link
Copy Markdown
Collaborator Author

lidezhu commented May 7, 2026

This PR only involves some code rearrangement with no logic changes. The panic log already exists and will be addressed in a later PR.

@ti-chi-bot ti-chi-bot Bot added lgtm and removed needs-1-more-lgtm Indicates a PR needs 1 more LGTM. labels May 7, 2026
@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented May 7, 2026

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: flowbehappy, hongyunyan

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Details Needs approval from an approver in each of these files:
  • OWNERS [flowbehappy,hongyunyan]

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented May 7, 2026

[LGTM Timeline notifier]

Timeline:

  • 2026-05-07 03:01:23.533660298 +0000 UTC m=+323156.407010271: ☑️ agreed by hongyunyan.
  • 2026-05-07 03:15:04.853947121 +0000 UTC m=+323977.727297093: ☑️ agreed by flowbehappy.

@lidezhu
Copy link
Copy Markdown
Collaborator Author

lidezhu commented May 7, 2026

/retest

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
downstreamadapter/eventcollector/dispatcher_stat_test.go (1)

1371-1398: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

TestHandleBatchDataEventsDoesNotAdvanceCommitTsWhenNoValidEvents likely tests the wrong guard.

The test's intent (commit ts not advanced for stale events) requires the event to pass source-ID and readiness checks first. The stat is created without stat.session.connState.setEventServiceID(...) or readyEventReceived.Store(true), but the event carries From: createNodeID("service1"). Since handleBatchDataEvents filters per-event source (as every other batch-event test demonstrates), "service1" != "" causes the event to be silently dropped at the source check — not at the stale-commit-ts check. The result and require.Empty assertions both pass, but the intended invariant is never exercised.

🛠️ Suggested fix — add session setup to test the intended path
 stat := newDispatcherStat(mockDisp, nil, nil)
 stat.lastEventCommitTs.Store(50)
 stat.currentEpoch.Store(newDispatcherEpochState(10, 1, stat.target.GetStartTs()))
+stat.session.connState.setEventServiceID("service1")
+stat.session.connState.readyEventReceived.Store(true)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@downstreamadapter/eventcollector/dispatcher_stat_test.go` around lines 1371 -
1398, The test TestHandleBatchDataEventsDoesNotAdvanceCommitTsWhenNoValidEvents
is currently dropping the event at the source-ID/readiness guard instead of
exercising the stale-commit-ts path; to fix it, arrange the session so the event
passes source and readiness checks before calling stat.handleBatchDataEvents:
set the session event service id to match the event’s source (call
stat.session.connState.setEventServiceID with the same createNodeID("service1")
used in the event) and mark the session ready (call
stat.readyEventReceived.Store(true)); then run the existing assertions to verify
handleBatchDataEvents does not advance lastEventCommitTs for the stale commitTs
case.
🧹 Nitpick comments (1)
downstreamadapter/eventcollector/dispatcher_stat_test.go (1)

1157-1165: 💤 Low value

Prefer a struct field over string comparison for panic-case branching.

Branching on tt.name == "multiple events" is fragile — any rename silently converts a Panics assertion into the normal path. The existing TestHandleSignalEvent pattern (which uses expectedPanic bool) is the right model here.

♻️ Proposed refactor
 tests := []struct {
     name           string
     events         []dispatcher.DispatcherEvent
     currentService node.ID
     lastSeq        uint64
     lastCommitTs   uint64
     epoch          uint64
     want           bool
+    expectPanic    bool
 }{
     {
-        name: "multiple events",
+        name:        "multiple events",
+        expectPanic: true,
         events: []dispatcher.DispatcherEvent{
             ...
         },
         ...
     },
     ...
 }

 ...

-if tt.name == "multiple events" {
+if tt.expectPanic {
     require.Panics(t, func() {
         stat.handleSingleDataEvents(tt.events)
     })
 } else {
     got := stat.handleSingleDataEvents(tt.events)
     require.Equal(t, tt.want, got)
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@downstreamadapter/eventcollector/dispatcher_stat_test.go` around lines 1157 -
1165, The test currently branches on tt.name == "multiple events" to decide
whether to expect a panic when calling stat.handleSingleDataEvents, which is
fragile; add a boolean field (e.g. expectedPanic) to the test case struct and
use that to choose between require.Panics and normal assertion instead of
string-matching tt.name, update all affected cases (including the "multiple
events" case) to set expectedPanic=true, and refactor the test body to follow
the TestHandleSignalEvent pattern so the panic expectation is explicit and
robust.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@downstreamadapter/eventcollector/dispatcher_stat_test.go`:
- Around line 1371-1398: The test
TestHandleBatchDataEventsDoesNotAdvanceCommitTsWhenNoValidEvents is currently
dropping the event at the source-ID/readiness guard instead of exercising the
stale-commit-ts path; to fix it, arrange the session so the event passes source
and readiness checks before calling stat.handleBatchDataEvents: set the session
event service id to match the event’s source (call
stat.session.connState.setEventServiceID with the same createNodeID("service1")
used in the event) and mark the session ready (call
stat.readyEventReceived.Store(true)); then run the existing assertions to verify
handleBatchDataEvents does not advance lastEventCommitTs for the stale commitTs
case.

---

Nitpick comments:
In `@downstreamadapter/eventcollector/dispatcher_stat_test.go`:
- Around line 1157-1165: The test currently branches on tt.name == "multiple
events" to decide whether to expect a panic when calling
stat.handleSingleDataEvents, which is fragile; add a boolean field (e.g.
expectedPanic) to the test case struct and use that to choose between
require.Panics and normal assertion instead of string-matching tt.name, update
all affected cases (including the "multiple events" case) to set
expectedPanic=true, and refactor the test body to follow the
TestHandleSignalEvent pattern so the panic expectation is explicit and robust.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 1b3507f3-cd72-4260-8a67-8414e91d4191

📥 Commits

Reviewing files that changed from the base of the PR and between 5795a32 and 13a4a9b.

📒 Files selected for processing (3)
  • downstreamadapter/eventcollector/dispatcher_stat.go
  • downstreamadapter/eventcollector/dispatcher_stat_test.go
  • downstreamadapter/eventcollector/event_collector_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
  • downstreamadapter/eventcollector/event_collector_test.go
  • downstreamadapter/eventcollector/dispatcher_stat.go

@lidezhu
Copy link
Copy Markdown
Collaborator Author

lidezhu commented May 7, 2026

/retest

@ti-chi-bot ti-chi-bot Bot merged commit f8396e3 into pingcap:master May 7, 2026
23 of 25 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approved lgtm release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

introduce a new component for managing connection lifecycle management inside event collector

3 participants