Skip to content

Commit

Permalink
fix: Prevent dispatcher merging if curTs is 0 (#34562) (#34626)
Browse files Browse the repository at this point in the history
When the main dispatcher has not yet consumed data, curTs is 0. During
this time, merging dispatchers should not be allowed; otherwise, the
data of the solo dispatcher will be skipped.

issue: #34255

pr: #34562

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
  • Loading branch information
bigsheeper committed Jul 12, 2024
1 parent 7f3a2a2 commit c46fccb
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pkg/mq/msgdispatcher/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (c *dispatcherManager) tryMerge() {

c.mu.Lock()
defer c.mu.Unlock()
if c.mainDispatcher == nil {
if c.mainDispatcher == nil || c.mainDispatcher.CurTs() == 0 {
return
}
candidates := make(map[string]struct{})
Expand Down
12 changes: 12 additions & 0 deletions pkg/mq/msgdispatcher/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ func TestManager(t *testing.T) {
_, err = c.Add(ctx, "mock_vchannel_2", nil, mqwrapper.SubscriptionPositionUnknown)
assert.NoError(t, err)
assert.Equal(t, 3, c.Num())
c.(*dispatcherManager).mainDispatcher.curTs.Store(1000)
c.(*dispatcherManager).mu.RLock()
for _, d := range c.(*dispatcherManager).soloDispatchers {
d.curTs.Store(1000)
}
c.(*dispatcherManager).mu.RUnlock()

c.(*dispatcherManager).tryMerge()
assert.Equal(t, 1, c.Num())
Expand All @@ -96,6 +102,12 @@ func TestManager(t *testing.T) {
_, err = c.Add(ctx, "mock_vchannel_2", nil, mqwrapper.SubscriptionPositionUnknown)
assert.NoError(t, err)
assert.Equal(t, 3, c.Num())
c.(*dispatcherManager).mainDispatcher.curTs.Store(1000)
c.(*dispatcherManager).mu.RLock()
for _, d := range c.(*dispatcherManager).soloDispatchers {
d.curTs.Store(1000)
}
c.(*dispatcherManager).mu.RUnlock()

CheckPeriod = 10 * time.Millisecond
go c.Run()
Expand Down

0 comments on commit c46fccb

Please sign in to comment.