Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
84dced1
improve memory control v1
asddongmen Jan 20, 2026
1d6d73c
fix bug
asddongmen Jan 20, 2026
381e57d
add debug log
asddongmen Jan 21, 2026
d2852f6
add debug log 2
asddongmen Jan 21, 2026
f1f9fe1
add debug log 3
asddongmen Jan 21, 2026
62e1546
add debug log 4
asddongmen Jan 21, 2026
f714b8d
add debug log 5
asddongmen Jan 21, 2026
2f04643
adjust = 0.7
asddongmen Jan 21, 2026
75b2d16
adjust 5
asddongmen Jan 21, 2026
8dcb0a2
for debug
asddongmen Jan 21, 2026
994ef9b
adjust 7
asddongmen Jan 22, 2026
4073610
fix bug
asddongmen Jan 22, 2026
87c5e6f
fix bug 2
asddongmen Jan 22, 2026
fcf76f2
adjust 10
asddongmen Jan 22, 2026
0853113
adjust 11
asddongmen Jan 22, 2026
9139d93
allow scan interval grow larger
asddongmen Jan 23, 2026
274878f
allow scan interval grow larger 2
asddongmen Jan 23, 2026
36be880
remove verbose log
asddongmen Jan 23, 2026
96b9ae8
fix bug
asddongmen Jan 23, 2026
fc271cd
Merge remote-tracking branch 'upstream/master' into 0119-refine-dispa…
asddongmen Jan 26, 2026
c9df6b0
comment hardcode scan interval
asddongmen Jan 26, 2026
2ebeef6
add cool down
asddongmen Jan 26, 2026
abc0015
add metrics
asddongmen Jan 26, 2026
3d506ab
adjust scan window algorithm
asddongmen Jan 26, 2026
a0febab
adjust scan window algorithm 2
asddongmen Jan 26, 2026
460b74d
adjust scan window algorithm 3
asddongmen Jan 27, 2026
f8c34a1
add comment
asddongmen Feb 2, 2026
1a86e53
remove useless codes
asddongmen Feb 2, 2026
6c6babf
add ddl workload
asddongmen Feb 2, 2026
6464520
remove useless code
asddongmen Feb 2, 2026
730f373
refine syncpoint handle
asddongmen Feb 3, 2026
1a4383c
Merge remote-tracking branch 'upstream/master' into 0119-refine-dispa…
asddongmen Feb 3, 2026
dcb47a8
fix bug
asddongmen Feb 3, 2026
d41d180
add debug log
asddongmen Feb 3, 2026
aeeda69
fix
asddongmen Feb 4, 2026
a24e5b3
add log
asddongmen Feb 4, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions downstreamadapter/dispatcher/basic_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,14 +629,19 @@ func (d *BasicDispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.D
// 3. clear blockEventStatus(should be the old pending event, but clear the new one)
d.blockEventStatus.clear()
})
if action.Action == heartbeatpb.Action_Write {
switch action.Action {
case heartbeatpb.Action_Write:
actionCommitTs := action.CommitTs
actionIsSyncPoint := action.IsSyncPoint
d.sharedInfo.GetBlockEventExecutor().Submit(d, func() {
d.ExecuteBlockEventDDL(pendingEvent, actionCommitTs, actionIsSyncPoint)
})
return true
} else {
case heartbeatpb.Action_Pass, heartbeatpb.Action_Skip:
failpoint.Inject("BlockOrWaitBeforePass", nil)
d.PassBlockEventToSink(pendingEvent)
failpoint.Inject("BlockAfterPass", nil)
default:
failpoint.Inject("BlockOrWaitBeforePass", nil)
d.PassBlockEventToSink(pendingEvent)
failpoint.Inject("BlockAfterPass", nil)
Comment on lines +640 to 647

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic in the default case is identical to the case heartbeatpb.Action_Pass, heartbeatpb.Action_Skip. This code duplication can be removed by combining them. You can use a default case to handle Pass, Skip, and any potential future actions with the same logic, which simplifies the code and improves maintainability.

Suggested change
case heartbeatpb.Action_Pass, heartbeatpb.Action_Skip:
failpoint.Inject("BlockOrWaitBeforePass", nil)
d.PassBlockEventToSink(pendingEvent)
failpoint.Inject("BlockAfterPass", nil)
default:
failpoint.Inject("BlockOrWaitBeforePass", nil)
d.PassBlockEventToSink(pendingEvent)
failpoint.Inject("BlockAfterPass", nil)
case heartbeatpb.Action_Pass, heartbeatpb.Action_Skip:
fallthrough
default:
failpoint.Inject("BlockOrWaitBeforePass", nil)
d.PassBlockEventToSink(pendingEvent)
failpoint.Inject("BlockAfterPass", nil)

Expand All @@ -650,6 +655,14 @@ func (d *BasicDispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.D
zap.Stringer("dispatcher", d.id))
return false
}
if ok && action.CommitTs == ts {
log.Debug("action does not match pending event, ignore it",
zap.Uint64("pendingEventCommitTs", ts),
zap.Uint64("actionCommitTs", action.CommitTs),
zap.Bool("actionIsSyncPoint", action.IsSyncPoint),
zap.Stringer("dispatcher", d.id))
return false
}
}

// Step3: whether the outdate message or not, we need to return message show we have finished the event.
Expand Down
64 changes: 64 additions & 0 deletions downstreamadapter/dispatcher/event_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,70 @@ func TestDispatcherHandleEvents(t *testing.T) {
require.Equal(t, uint64(7), checkpointTs)
}

func TestDispatcherIgnoreActionWithSameCommitTsDifferentType(t *testing.T) {
sink := sink.NewMockSink(common.MysqlSinkType)
tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID())
require.NoError(t, err)
dispatcher := newDispatcherForTest(sink, tableSpan)

nodeID := node.NewID()

ddlEvent := &commonEvent.DDLEvent{
FinishedTs: 10,
BlockedTables: &commonEvent.InfluencedTables{
InfluenceType: commonEvent.InfluenceTypeNormal,
TableIDs: []int64{0, 1},
},
}
block := dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, ddlEvent)}, callback)
require.True(t, block)

// Pending DDL event should be in WAITING stage.
pending, stage := dispatcher.blockEventStatus.getEventAndStage()
require.NotNil(t, pending)
require.Equal(t, heartbeatpb.BlockStage_WAITING, stage)

// ACK the DDL so the resend task is cancelled.
dispatcher.HandleDispatcherStatus(&heartbeatpb.DispatcherStatus{
Ack: &heartbeatpb.ACK{
CommitTs: ddlEvent.FinishedTs,
IsSyncPoint: false,
},
})
require.Equal(t, 0, dispatcher.resendTaskMap.Len())

// Drain any previously sent block status messages.
drain:
for {
select {
case <-dispatcher.sharedInfo.blockStatusesChan:
default:
break drain
}
}

// Send an action for a syncpoint with the same commitTs. It should be ignored.
dispatcher.HandleDispatcherStatus(&heartbeatpb.DispatcherStatus{
Action: &heartbeatpb.DispatcherAction{
Action: heartbeatpb.Action_Skip,
CommitTs: ddlEvent.FinishedTs,
IsSyncPoint: true,
},
})

// Still waiting on the DDL.
pending, stage = dispatcher.blockEventStatus.getEventAndStage()
require.NotNil(t, pending)
require.Equal(t, heartbeatpb.BlockStage_WAITING, stage)

// Should not emit a DONE block status for the mismatched action.
select {
case msg := <-dispatcher.sharedInfo.blockStatusesChan:
require.FailNow(t, "unexpected block status emitted", "msg=%v", msg)
default:
}
}

// test uncompelete table span can correctly handle the ddl events
func TestUncompeleteTableSpanDispatcherHandleEvents(t *testing.T) {
count.Swap(0)
Expand Down
7 changes: 6 additions & 1 deletion downstreamadapter/dispatcher/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,12 @@ func (b *BlockEventStatus) actionMatchs(action *heartbeatpb.DispatcherAction) bo
return false
}

return b.blockCommitTs == action.CommitTs
if b.blockCommitTs != action.CommitTs {
return false
}

isSyncPointPending := b.blockPendingEvent.GetType() == commonEvent.TypeSyncPointEvent
return isSyncPointPending == action.IsSyncPoint
}

func (b *BlockEventStatus) getEventCommitTs() (uint64, bool) {
Expand Down
41 changes: 39 additions & 2 deletions downstreamadapter/eventcollector/event_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ type EventCollector struct {
dispatcherMap sync.Map // key: dispatcherID, value: dispatcherStat
changefeedMap sync.Map // key: changefeedID.GID, value: *changefeedStat

lastCongestionLogTime int64

mc messaging.MessageCenter

logCoordinatorClient *LogCoordinatorClient
Expand Down Expand Up @@ -600,6 +602,10 @@ func (c *EventCollector) newCongestionControlMessages() map[node.ID]*event.Conge
// collect path-level available memory and total available memory for each changefeed
changefeedPathMemory := make(map[common.ChangeFeedID]map[common.DispatcherID]uint64)
changefeedTotalMemory := make(map[common.ChangeFeedID]uint64)
changefeedUsedMemory := make(map[common.ChangeFeedID]uint64)
changefeedMaxMemory := make(map[common.ChangeFeedID]uint64)
totalDispatchers := 0
dispatchersWithoutService := 0

// collect from main dynamic stream
for _, quota := range c.ds.GetMetrics().MemoryControl.AreaMemoryMetrics {
Expand All @@ -617,6 +623,8 @@ func (c *EventCollector) newCongestionControlMessages() map[node.ID]*event.Conge
}
// store total available memory from AreaMemoryMetric
changefeedTotalMemory[cfID] = uint64(quota.AvailableMemory())
changefeedUsedMemory[cfID] = uint64(quota.MemoryUsage())
changefeedMaxMemory[cfID] = uint64(quota.MaxMemory())
}

// collect from redo dynamic stream and take minimum
Expand All @@ -643,6 +651,16 @@ func (c *EventCollector) newCongestionControlMessages() map[node.ID]*event.Conge
} else {
changefeedTotalMemory[cfID] = uint64(quota.AvailableMemory())
}
if existing, exists := changefeedUsedMemory[cfID]; exists {
changefeedUsedMemory[cfID] = min(existing, uint64(quota.MemoryUsage()))
} else {
changefeedUsedMemory[cfID] = uint64(quota.MemoryUsage())
}
if existing, exists := changefeedMaxMemory[cfID]; exists {
changefeedMaxMemory[cfID] = min(existing, uint64(quota.MaxMemory()))
} else {
changefeedMaxMemory[cfID] = uint64(quota.MaxMemory())
}
Comment on lines +654 to +663

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic to find the minimum value for changefeedUsedMemory and changefeedMaxMemory is duplicated. This pattern of checking existence and updating the minimum value could be extracted into a small helper function to reduce code duplication and improve readability.

}

if len(changefeedPathMemory) == 0 {
Expand All @@ -655,7 +673,9 @@ func (c *EventCollector) newCongestionControlMessages() map[node.ID]*event.Conge
c.dispatcherMap.Range(func(k, v interface{}) bool {
stat := v.(*dispatcherStat)
eventServiceID := stat.connState.getEventServiceID()
totalDispatchers++
if eventServiceID == "" {
dispatchersWithoutService++
return true
}

Expand All @@ -678,8 +698,9 @@ func (c *EventCollector) newCongestionControlMessages() map[node.ID]*event.Conge

// build congestion control messages for each node
result := make(map[node.ID]*event.CongestionControl)
changefeedsInMessages := 0
for nodeID, changefeedDispatchers := range nodeDispatcherMemory {
congestionControl := event.NewCongestionControl()
congestionControl := event.NewCongestionControlWithVersion(event.CongestionControlVersion2)

for changefeedID, dispatcherMemory := range changefeedDispatchers {
if len(dispatcherMemory) == 0 {
Expand All @@ -689,11 +710,14 @@ func (c *EventCollector) newCongestionControlMessages() map[node.ID]*event.Conge
// get total available memory directly from AreaMemoryMetric
totalAvailable := uint64(changefeedTotalMemory[changefeedID])
if totalAvailable > 0 {
congestionControl.AddAvailableMemoryWithDispatchers(
congestionControl.AddAvailableMemoryWithDispatchersAndUsage(
changefeedID.ID(),
totalAvailable,
changefeedUsedMemory[changefeedID],
changefeedMaxMemory[changefeedID],
dispatcherMemory,
)
changefeedsInMessages++
}
}

Expand All @@ -702,6 +726,19 @@ func (c *EventCollector) newCongestionControlMessages() map[node.ID]*event.Conge
}
}

nowUnix := time.Now().Unix()
lastLog := atomic.LoadInt64(&c.lastCongestionLogTime)
if nowUnix-lastLog >= int64(time.Minute.Seconds()) &&
atomic.CompareAndSwapInt64(&c.lastCongestionLogTime, lastLog, nowUnix) {
log.Info("congestion control build summary",
zap.Int("dispatchers", totalDispatchers),
zap.Int("dispatchersWithoutService", dispatchersWithoutService),
zap.Int("changefeedsWithMetrics", len(changefeedPathMemory)),
zap.Int("changefeedsInMessages", changefeedsInMessages),
zap.Int("nodes", len(result)),
)
}

return result
}

Expand Down
Loading
Loading