Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use tickle to check WatchEvent state at datacoord and datanode #21193

Merged
merged 1 commit into from Feb 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion configs/milvus.yaml
Expand Up @@ -280,7 +280,7 @@ dataCoord:
enableActiveStandby: false # Enable active-standby

channel:
maxWatchDuration: 600 # Timeout on watching channels (in seconds). Default 600 seconds.
watchTimeoutInterval: 30 # Timeout on watching channels (in seconds). Datanode tickler update watch progress will reset timeout timer.

segment:
maxSize: 512 # Maximum size of a segment in MB
Expand Down
46 changes: 31 additions & 15 deletions internal/datacoord/channel_checker.go
Expand Up @@ -33,10 +33,12 @@ import (
)

type channelStateTimer struct {
watchkv kv.MetaKv
runningTimers sync.Map // channel name to timer stop channels
etcdWatcher clientv3.WatchChan
timeoutWatcher chan *ackEvent
watchkv kv.MetaKv

runningTimers sync.Map
runningTimerStops sync.Map // channel name to timer stop channels
etcdWatcher clientv3.WatchChan
timeoutWatcher chan *ackEvent
}

func newChannelStateTimer(kv kv.MetaKv) *channelStateTimer {
Expand All @@ -49,7 +51,6 @@ func newChannelStateTimer(kv kv.MetaKv) *channelStateTimer {
func (c *channelStateTimer) getWatchers(prefix string) (clientv3.WatchChan, chan *ackEvent) {
if c.etcdWatcher == nil {
c.etcdWatcher = c.watchkv.WatchWithPrefix(prefix)

}
return c.etcdWatcher, c.timeoutWatcher
}
Expand Down Expand Up @@ -80,40 +81,46 @@ func (c *channelStateTimer) loadAllChannels(nodeID UniqueID) ([]*datapb.ChannelW
}

// startOne can write ToWatch or ToRelease states.
func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channelName string, nodeID UniqueID, timeoutTs int64) {
if timeoutTs == 0 {
func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channelName string, nodeID UniqueID, timeout time.Duration) {
if timeout == 0 {
log.Info("zero timeoutTs, skip starting timer",
zap.String("watch state", watchState.String()),
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName),
)
return
}

stop := make(chan struct{})
ticker := time.NewTimer(timeout)
c.removeTimers([]string{channelName})
c.runningTimers.Store(channelName, stop)
timeoutT := time.Unix(0, timeoutTs)
c.runningTimerStops.Store(channelName, stop)
c.runningTimers.Store(channelName, ticker)

go func() {
log.Info("timer started",
zap.String("watch state", watchState.String()),
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName),
zap.Time("timeout time", timeoutT))
zap.Duration("check interval", timeout))
defer ticker.Stop()

select {
case <-time.NewTimer(time.Until(timeoutT)).C:
case <-ticker.C:
// check tickle at path as :tickle/[prefix]/{channel_name}
log.Info("timeout and stop timer: wait for channel ACK timeout",
zap.String("watch state", watchState.String()),
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName),
zap.Time("timeout time", timeoutT))
zap.Duration("timeout interval", timeout))
ackType := getAckType(watchState)
c.notifyTimeoutWatcher(&ackEvent{ackType, channelName, nodeID})
case <-stop:
log.Info("stop timer before timeout",
zap.String("watch state", watchState.String()),
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName),
zap.Time("timeout time", timeoutT))
zap.Duration("timeout interval", timeout))
}
}()
}
Expand All @@ -124,16 +131,25 @@ func (c *channelStateTimer) notifyTimeoutWatcher(e *ackEvent) {

func (c *channelStateTimer) removeTimers(channels []string) {
for _, channel := range channels {
if stop, ok := c.runningTimers.LoadAndDelete(channel); ok {
if stop, ok := c.runningTimerStops.LoadAndDelete(channel); ok {
close(stop.(chan struct{}))
c.runningTimers.Delete(channel)
}
}
}

func (c *channelStateTimer) stopIfExist(e *ackEvent) {
stop, ok := c.runningTimers.LoadAndDelete(e.channelName)
stop, ok := c.runningTimerStops.LoadAndDelete(e.channelName)
if ok && e.ackType != watchTimeoutAck && e.ackType != releaseTimeoutAck {
close(stop.(chan struct{}))
c.runningTimers.Delete(e.channelName)
}
}

func (c *channelStateTimer) resetIfExist(channel string, interval time.Duration) {
if value, ok := c.runningTimers.Load(channel); ok {
timer := value.(*time.Timer)
timer.Reset(interval)
}
}

Expand Down
44 changes: 23 additions & 21 deletions internal/datacoord/channel_checker_test.go
Expand Up @@ -52,10 +52,9 @@ func TestChannelStateTimer(t *testing.T) {
timer.loadAllChannels(1)

validWatchInfo := datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{},
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_ToWatch,
TimeoutTs: time.Now().Add(20 * time.Millisecond).UnixNano(),
Vchan: &datapb.VchannelInfo{},
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_ToWatch,
}
validData, err := proto.Marshal(&validWatchInfo)
require.NoError(t, err)
Expand Down Expand Up @@ -87,18 +86,20 @@ func TestChannelStateTimer(t *testing.T) {
})

t.Run("test startOne", func(t *testing.T) {
normalTimeoutTs := time.Now().Add(20 * time.Second).UnixNano()
nowTimeoutTs := time.Now().UnixNano()
zeroTimeoutTs := int64(0)
normalTimeoutTs := 20 * time.Second
nowTimeoutTs := 1 * time.Millisecond
zeroTimeoutTs := 0 * time.Second
resetTimeoutTs := 30 * time.Second
tests := []struct {
channelName string
timeoutTs int64
timeoutTs time.Duration

description string
}{
{"channel-1", normalTimeoutTs, "test stop"},
{"channel-2", nowTimeoutTs, "test timeout"},
{"channel-3", zeroTimeoutTs, "not start"},
{"channel-4", resetTimeoutTs, "reset timer"},
}

timer := newChannelStateTimer(kv)
Expand All @@ -112,6 +113,11 @@ func TestChannelStateTimer(t *testing.T) {
e := <-timeoutCh
assert.Equal(t, watchTimeoutAck, e.ackType)
assert.Equal(t, test.channelName, e.channelName)
} else if test.timeoutTs == resetTimeoutTs {
timer.resetIfExist(test.channelName, nowTimeoutTs)
e := <-timeoutCh
assert.Equal(t, watchTimeoutAck, e.ackType)
assert.Equal(t, test.channelName, e.channelName)
} else {
timer.stopIfExist(&ackEvent{watchSuccessAck, test.channelName, 1})
}
Expand All @@ -123,18 +129,17 @@ func TestChannelStateTimer(t *testing.T) {
})

t.Run("test startOne no leaking issue 17335", func(t *testing.T) {
timeoutTs := time.Now().Add(20 * time.Second).UnixNano()
timer := newChannelStateTimer(kv)

timer.startOne(datapb.ChannelWatchState_ToRelease, "channel-1", 1, timeoutTs)
stop, ok := timer.runningTimers.Load("channel-1")
timer.startOne(datapb.ChannelWatchState_ToRelease, "channel-1", 1, 20*time.Second)
stop, ok := timer.runningTimerStops.Load("channel-1")
require.True(t, ok)

timer.startOne(datapb.ChannelWatchState_ToWatch, "channel-1", 1, timeoutTs)
timer.startOne(datapb.ChannelWatchState_ToWatch, "channel-1", 1, 20*time.Second)
_, ok = <-stop.(chan struct{})
assert.False(t, ok)

stop2, ok := timer.runningTimers.Load("channel-1")
stop2, ok := timer.runningTimerStops.Load("channel-1")
assert.True(t, ok)

timer.removeTimers([]string{"channel-1"})
Expand All @@ -151,10 +156,9 @@ func TestChannelStateTimer_parses(t *testing.T) {

t.Run("test parseWatchInfo", func(t *testing.T) {
validWatchInfo := datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{},
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_ToWatch,
TimeoutTs: time.Now().Add(20 * time.Millisecond).UnixNano(),
Vchan: &datapb.VchannelInfo{},
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_ToWatch,
}
validData, err := proto.Marshal(&validWatchInfo)
require.NoError(t, err)
Expand Down Expand Up @@ -186,7 +190,6 @@ func TestChannelStateTimer_parses(t *testing.T) {
assert.NotNil(t, info)
assert.Equal(t, info.GetState(), validWatchInfo.GetState())
assert.Equal(t, info.GetStartTs(), validWatchInfo.GetStartTs())
assert.Equal(t, info.GetTimeoutTs(), validWatchInfo.GetTimeoutTs())
} else {
assert.Nil(t, info)
assert.Error(t, err)
Expand All @@ -205,9 +208,8 @@ func TestChannelStateTimer_parses(t *testing.T) {
DroppedSegments: []*datapb.SegmentInfo{{ID: 3}},
UnflushedSegmentIds: []int64{1},
},
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_ToWatch,
TimeoutTs: time.Now().Add(20 * time.Millisecond).UnixNano(),
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_ToWatch,
}

oldData, err := proto.Marshal(&oldWatchInfo)
Expand Down
31 changes: 16 additions & 15 deletions internal/datacoord/channel_manager.go
Expand Up @@ -192,22 +192,23 @@ func (c *ChannelManager) checkOldNodes(nodes []UniqueID) error {
for nodeID, watchInfos := range nodeWatchInfos {
for _, info := range watchInfos {
channelName := info.GetVchan().GetChannelName()
checkInterval := Params.DataCoordCfg.WatchTimeoutInterval.GetAsDuration(time.Second)

log.Info("processing watch info",
zap.String("watch state", info.GetState().String()),
zap.String("channel name", channelName))

switch info.GetState() {
case datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_Uncomplete:
c.stateTimer.startOne(datapb.ChannelWatchState_ToWatch, channelName, nodeID, info.GetTimeoutTs())
c.stateTimer.startOne(datapb.ChannelWatchState_ToWatch, channelName, nodeID, checkInterval)

case datapb.ChannelWatchState_WatchFailure:
if err := c.Release(nodeID, channelName); err != nil {
return err
}

case datapb.ChannelWatchState_ToRelease:
c.stateTimer.startOne(datapb.ChannelWatchState_ToRelease, channelName, nodeID, info.GetTimeoutTs())
c.stateTimer.startOne(datapb.ChannelWatchState_ToRelease, channelName, nodeID, checkInterval)

case datapb.ChannelWatchState_ReleaseSuccess:
if err := c.Reassign(nodeID, channelName); err != nil {
Expand Down Expand Up @@ -446,11 +447,10 @@ func (c *ChannelManager) fillChannelWatchInfo(op *ChannelOp) {
for _, ch := range op.Channels {
vcInfo := c.h.GetDataVChanPositions(ch, allPartitionID)
info := &datapb.ChannelWatchInfo{
Vchan: vcInfo,
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_Uncomplete,
TimeoutTs: time.Now().Add(Params.DataCoordCfg.MaxWatchDuration.GetAsDuration(time.Second)).UnixNano(),
Schema: ch.Schema,
Vchan: vcInfo,
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_Uncomplete,
Schema: ch.Schema,
}
op.ChannelWatchInfos = append(op.ChannelWatchInfos, info)
}
Expand All @@ -460,20 +460,19 @@ func (c *ChannelManager) fillChannelWatchInfo(op *ChannelOp) {
func (c *ChannelManager) fillChannelWatchInfoWithState(op *ChannelOp, state datapb.ChannelWatchState) []string {
var channelsWithTimer = []string{}
startTs := time.Now().Unix()
timeoutTs := time.Now().Add(Params.DataCoordCfg.MaxWatchDuration.GetAsDuration(time.Second)).UnixNano()
checkInterval := Params.DataCoordCfg.WatchTimeoutInterval.GetAsDuration(time.Second)
for _, ch := range op.Channels {
vcInfo := c.h.GetDataVChanPositions(ch, allPartitionID)
info := &datapb.ChannelWatchInfo{
Vchan: vcInfo,
StartTs: startTs,
State: state,
TimeoutTs: timeoutTs,
Schema: ch.Schema,
Vchan: vcInfo,
StartTs: startTs,
State: state,
Schema: ch.Schema,
}

// Only set timer for watchInfo not from bufferID
if op.NodeID != bufferID {
c.stateTimer.startOne(state, ch.Name, op.NodeID, timeoutTs)
c.stateTimer.startOne(state, ch.Name, op.NodeID, checkInterval)
channelsWithTimer = append(channelsWithTimer, ch.Name)
}

Expand Down Expand Up @@ -696,11 +695,13 @@ func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) {
continue
}

// ignore these states
// runnging states
state := watchInfo.GetState()
if state == datapb.ChannelWatchState_ToWatch ||
state == datapb.ChannelWatchState_ToRelease ||
state == datapb.ChannelWatchState_Uncomplete {
c.stateTimer.resetIfExist(watchInfo.GetVchan().ChannelName, Params.DataCoordCfg.WatchTimeoutInterval.GetAsDuration(time.Second))
log.Info("tickle update, timer delay", zap.String("channel", watchInfo.GetVchan().ChannelName), zap.Int32("progress", watchInfo.Progress))
continue
}

Expand Down