Skip to content

Commit

Permalink
Use tickle to check WatchEvent state at querycoord and querynode
Browse files Browse the repository at this point in the history
Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
  • Loading branch information
aoiasd committed Jan 10, 2023
1 parent 23ceb9c commit a5ffad6
Show file tree
Hide file tree
Showing 21 changed files with 866 additions and 668 deletions.
40 changes: 22 additions & 18 deletions internal/datacoord/channel_checker.go
Expand Up @@ -34,7 +34,8 @@ import (

type channelStateTimer struct {
watchkv kv.MetaKv
runningTimers sync.Map // channel name to timer stop channels
timerStops sync.Map // channel name to timer stop channels
timers sync.Map
etcdWatcher clientv3.WatchChan
timeoutWatcher chan *ackEvent
}
Expand All @@ -49,7 +50,6 @@ func newChannelStateTimer(kv kv.MetaKv) *channelStateTimer {
func (c *channelStateTimer) getWatchers(prefix string) (clientv3.WatchChan, chan *ackEvent) {
if c.etcdWatcher == nil {
c.etcdWatcher = c.watchkv.WatchWithPrefix(prefix)

}
return c.etcdWatcher, c.timeoutWatcher
}
Expand Down Expand Up @@ -89,31 +89,33 @@ func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channe
)
return
}

c.removeTimers(channelName)
stop := make(chan struct{})
c.removeTimers([]string{channelName})
c.runningTimers.Store(channelName, stop)
timeoutT := time.Unix(0, timeoutTs)
timer := time.NewTimer(time.Until(timeoutT))
c.timerStops.Store(channelName, stop)
c.timers.Store(channelName, timer)

go func() {
log.Info("timer started",
zap.String("watch state", watchState.String()),
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName),
zap.Time("timeout time", timeoutT))
zap.String("channel name", channelName))

select {
case <-time.NewTimer(time.Until(timeoutT)).C:
case <-timer.C:
log.Info("timeout and stop timer: wait for channel ACK timeout",
zap.String("watch state", watchState.String()),
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName),
zap.Time("timeout time", timeoutT))
zap.String("channel name", channelName))
ackType := getAckType(watchState)
c.notifyTimeoutWatcher(&ackEvent{ackType, channelName, nodeID})
case <-stop:
log.Info("stop timer before timeout",
zap.String("watch state", watchState.String()),
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName),
zap.Time("timeout time", timeoutT))
zap.String("channel name", channelName))
}
}()
}
Expand All @@ -122,26 +124,28 @@ func (c *channelStateTimer) notifyTimeoutWatcher(e *ackEvent) {
c.timeoutWatcher <- e
}

func (c *channelStateTimer) removeTimers(channels []string) {
func (c *channelStateTimer) removeTimers(channels ...string) {
for _, channel := range channels {
if stop, ok := c.runningTimers.LoadAndDelete(channel); ok {
if stop, ok := c.timerStops.LoadAndDelete(channel); ok {
close(stop.(chan struct{}))
c.timers.Delete(channel)
}
}
}

func (c *channelStateTimer) stopIfExist(e *ackEvent) {
stop, ok := c.runningTimers.LoadAndDelete(e.channelName)
if ok && e.ackType != watchTimeoutAck && e.ackType != releaseTimeoutAck {
close(stop.(chan struct{}))
//reset set timers
func (c *channelStateTimer) resetTimers(timeoutTs int64, channels ...string) {
for _, channel := range channels {
if timer, ok := c.timers.Load(channel); ok {
timer.(*time.Timer).Reset(time.Second * time.Duration(timeoutTs))
}
}
}

func parseWatchInfo(key string, data []byte) (*datapb.ChannelWatchInfo, error) {
watchInfo := datapb.ChannelWatchInfo{}
if err := proto.Unmarshal(data, &watchInfo); err != nil {
return nil, fmt.Errorf("invalid event data: fail to parse ChannelWatchInfo, key: %s, err: %v", key, err)

}

if watchInfo.Vchan == nil {
Expand Down
51 changes: 31 additions & 20 deletions internal/datacoord/channel_checker_test.go
Expand Up @@ -52,10 +52,9 @@ func TestChannelStateTimer(t *testing.T) {
timer.loadAllChannels(1)

validWatchInfo := datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{},
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_ToWatch,
TimeoutTs: time.Now().Add(20 * time.Millisecond).UnixNano(),
Vchan: &datapb.VchannelInfo{},
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_ToWatch,
}
validData, err := proto.Marshal(&validWatchInfo)
require.NoError(t, err)
Expand Down Expand Up @@ -89,16 +88,15 @@ func TestChannelStateTimer(t *testing.T) {
t.Run("test startOne", func(t *testing.T) {
normalTimeoutTs := time.Now().Add(20 * time.Second).UnixNano()
nowTimeoutTs := time.Now().UnixNano()
zeroTimeoutTs := int64(0)
invalidTimeoutTs := int64(0)
tests := []struct {
channelName string
timeoutTs int64

description string
}{
{"channel-1", normalTimeoutTs, "test stop"},
{"channel-2", nowTimeoutTs, "test timeout"},
{"channel-3", zeroTimeoutTs, "not start"},
{"channel-3", invalidTimeoutTs, "not start"},
}

timer := newChannelStateTimer(kv)
Expand All @@ -113,34 +111,50 @@ func TestChannelStateTimer(t *testing.T) {
assert.Equal(t, watchTimeoutAck, e.ackType)
assert.Equal(t, test.channelName, e.channelName)
} else {
timer.stopIfExist(&ackEvent{watchSuccessAck, test.channelName, 1})
timer.removeTimers(test.channelName)
}
})
}

timer.startOne(datapb.ChannelWatchState_ToWatch, "channel-remove", 1, normalTimeoutTs)
timer.removeTimers([]string{"channel-remove"})
timer.removeTimers("channel-remove")
})

t.Run("test startOne no leaking issue 17335", func(t *testing.T) {
timeoutTs := time.Now().Add(20 * time.Second).UnixNano()
timer := newChannelStateTimer(kv)

timer.startOne(datapb.ChannelWatchState_ToRelease, "channel-1", 1, timeoutTs)
stop, ok := timer.runningTimers.Load("channel-1")
stop, ok := timer.timerStops.Load("channel-1")
require.True(t, ok)

timer.startOne(datapb.ChannelWatchState_ToWatch, "channel-1", 1, timeoutTs)
_, ok = <-stop.(chan struct{})
assert.False(t, ok)

stop2, ok := timer.runningTimers.Load("channel-1")
stop2, ok := timer.timerStops.Load("channel-1")
assert.True(t, ok)

timer.removeTimers([]string{"channel-1"})
timer.removeTimers("channel-1")
_, ok = <-stop2.(chan struct{})
assert.False(t, ok)
})

t.Run("test reset", func(t *testing.T) {
timeoutTs := time.Now().Add(20 * time.Second).UnixNano()
timer := newChannelStateTimer(kv)
testChannel := "channel-test"
_, watcher := timer.getWatchers("")

timer.startOne(datapb.ChannelWatchState_ToRelease, testChannel, 1, timeoutTs)
_, ok := timer.timerStops.Load(testChannel)
assert.True(t, ok)

timer.resetTimers(time.Now().Add(500 * time.Millisecond).UnixNano())
event := <-watcher
assert.Equal(t, testChannel, event.channelName)

})
}

func TestChannelStateTimer_parses(t *testing.T) {
Expand All @@ -151,10 +165,9 @@ func TestChannelStateTimer_parses(t *testing.T) {

t.Run("test parseWatchInfo", func(t *testing.T) {
validWatchInfo := datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{},
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_ToWatch,
TimeoutTs: time.Now().Add(20 * time.Millisecond).UnixNano(),
Vchan: &datapb.VchannelInfo{},
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_ToWatch,
}
validData, err := proto.Marshal(&validWatchInfo)
require.NoError(t, err)
Expand Down Expand Up @@ -186,7 +199,6 @@ func TestChannelStateTimer_parses(t *testing.T) {
assert.NotNil(t, info)
assert.Equal(t, info.GetState(), validWatchInfo.GetState())
assert.Equal(t, info.GetStartTs(), validWatchInfo.GetStartTs())
assert.Equal(t, info.GetTimeoutTs(), validWatchInfo.GetTimeoutTs())
} else {
assert.Nil(t, info)
assert.Error(t, err)
Expand All @@ -205,9 +217,8 @@ func TestChannelStateTimer_parses(t *testing.T) {
DroppedSegments: []*datapb.SegmentInfo{{ID: 3}},
UnflushedSegmentIds: []int64{1},
},
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_ToWatch,
TimeoutTs: time.Now().Add(20 * time.Millisecond).UnixNano(),
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_ToWatch,
}

oldData, err := proto.Marshal(&oldWatchInfo)
Expand Down
36 changes: 20 additions & 16 deletions internal/datacoord/channel_manager.go
Expand Up @@ -188,6 +188,7 @@ func (c *ChannelManager) checkOldNodes(nodes []UniqueID) error {
}
nodeWatchInfos[nodeID] = watchInfos
}
timeout := time.Now().Add(Params.DataCoordCfg.MaxWatchDuration.GetAsDuration(time.Second)).UnixNano()

for nodeID, watchInfos := range nodeWatchInfos {
for _, info := range watchInfos {
Expand All @@ -199,15 +200,15 @@ func (c *ChannelManager) checkOldNodes(nodes []UniqueID) error {

switch info.GetState() {
case datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_Uncomplete:
c.stateTimer.startOne(datapb.ChannelWatchState_ToWatch, channelName, nodeID, info.GetTimeoutTs())
c.stateTimer.startOne(datapb.ChannelWatchState_ToWatch, channelName, nodeID, timeout)

case datapb.ChannelWatchState_WatchFailure:
if err := c.Release(nodeID, channelName); err != nil {
return err
}

case datapb.ChannelWatchState_ToRelease:
c.stateTimer.startOne(datapb.ChannelWatchState_ToRelease, channelName, nodeID, info.GetTimeoutTs())
c.stateTimer.startOne(datapb.ChannelWatchState_ToRelease, channelName, nodeID, timeout)

case datapb.ChannelWatchState_ReleaseSuccess:
if err := c.Reassign(nodeID, channelName); err != nil {
Expand Down Expand Up @@ -259,7 +260,6 @@ func (c *ChannelManager) bgCheckChannelsWork(ctx context.Context) {
reallocates, err := c.bgChecker(channels, time.Now())
if err != nil {
log.Warn("channel manager bg check failed", zap.Error(err))

c.mu.Unlock()
continue
}
Expand Down Expand Up @@ -389,7 +389,7 @@ func (c *ChannelManager) DeleteNode(nodeID int64) error {
}
log.Info("remove timers for channel of the deregistered node",
zap.Any("channels", chNames), zap.Int64("nodeID", nodeID))
c.stateTimer.removeTimers(chNames)
c.stateTimer.removeTimers(chNames...)

if err := c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch); err != nil {
return err
Expand Down Expand Up @@ -446,11 +446,10 @@ func (c *ChannelManager) fillChannelWatchInfo(op *ChannelOp) {
for _, ch := range op.Channels {
vcInfo := c.h.GetDataVChanPositions(ch, allPartitionID)
info := &datapb.ChannelWatchInfo{
Vchan: vcInfo,
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_Uncomplete,
TimeoutTs: time.Now().Add(Params.DataCoordCfg.MaxWatchDuration.GetAsDuration(time.Second)).UnixNano(),
Schema: ch.Schema,
Vchan: vcInfo,
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_Uncomplete,
Schema: ch.Schema,
}
op.ChannelWatchInfos = append(op.ChannelWatchInfos, info)
}
Expand All @@ -464,11 +463,10 @@ func (c *ChannelManager) fillChannelWatchInfoWithState(op *ChannelOp, state data
for _, ch := range op.Channels {
vcInfo := c.h.GetDataVChanPositions(ch, allPartitionID)
info := &datapb.ChannelWatchInfo{
Vchan: vcInfo,
StartTs: startTs,
State: state,
TimeoutTs: timeoutTs,
Schema: ch.Schema,
Vchan: vcInfo,
StartTs: startTs,
State: state,
Schema: ch.Schema,
}

// Only set timer for watchInfo not from bufferID
Expand All @@ -482,6 +480,12 @@ func (c *ChannelManager) fillChannelWatchInfoWithState(op *ChannelOp, state data
return channelsWithTimer
}

//ActivateChannel reset timer of channel to keep this task active
func (c *ChannelManager) ActivateChannels(channels ...string) {
timeoutTs := Params.DataCoordCfg.MaxWatchDuration.GetAsInt64()
c.stateTimer.resetTimers(timeoutTs, channels...)
}

// GetChannels gets channels info of registered nodes.
func (c *ChannelManager) GetChannels() []*NodeChannelInfo {
c.mu.RLock()
Expand Down Expand Up @@ -608,13 +612,13 @@ func (c *ChannelManager) updateWithTimer(updates ChannelOpSet, state datapb.Chan
err := c.store.Update(updates)
if err != nil {
log.Warn("fail to update", zap.Array("updates", updates), zap.Error(err))
c.stateTimer.removeTimers(channelsWithTimer)
c.stateTimer.removeTimers(channelsWithTimer...)
}
return err
}

func (c *ChannelManager) processAck(e *ackEvent) {
c.stateTimer.stopIfExist(e)
c.stateTimer.removeTimers(e.channelName)

switch e.ackType {
case invalidAck:
Expand Down

0 comments on commit a5ffad6

Please sign in to comment.