Skip to content

Commit

Permalink
Add tickle for datacoord watch event
Browse files Browse the repository at this point in the history
Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
  • Loading branch information
aoiasd committed Feb 13, 2023
1 parent 7c49530 commit 5b69f5f
Show file tree
Hide file tree
Showing 20 changed files with 641 additions and 612 deletions.
2 changes: 1 addition & 1 deletion configs/milvus.yaml
Expand Up @@ -280,7 +280,7 @@ dataCoord:
enableActiveStandby: false # Enable active-standby

channel:
maxWatchDuration: 600 # Timeout on watching channels (in seconds). Default 600 seconds.
watchTimeoutInterval: 30 # Timeout on watching channels (in seconds). Datanode tickler update watch progress will reset timeout timer.

segment:
maxSize: 512 # Maximum size of a segment in MB
Expand Down
46 changes: 31 additions & 15 deletions internal/datacoord/channel_checker.go
Expand Up @@ -33,10 +33,12 @@ import (
)

type channelStateTimer struct {
watchkv kv.MetaKv
runningTimers sync.Map // channel name to timer stop channels
etcdWatcher clientv3.WatchChan
timeoutWatcher chan *ackEvent
watchkv kv.MetaKv

runningTimers sync.Map
runningTimerStops sync.Map // channel name to timer stop channels
etcdWatcher clientv3.WatchChan
timeoutWatcher chan *ackEvent
}

func newChannelStateTimer(kv kv.MetaKv) *channelStateTimer {
Expand All @@ -49,7 +51,6 @@ func newChannelStateTimer(kv kv.MetaKv) *channelStateTimer {
func (c *channelStateTimer) getWatchers(prefix string) (clientv3.WatchChan, chan *ackEvent) {
if c.etcdWatcher == nil {
c.etcdWatcher = c.watchkv.WatchWithPrefix(prefix)

}
return c.etcdWatcher, c.timeoutWatcher
}
Expand Down Expand Up @@ -80,40 +81,46 @@ func (c *channelStateTimer) loadAllChannels(nodeID UniqueID) ([]*datapb.ChannelW
}

// startOne can write ToWatch or ToRelease states.
func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channelName string, nodeID UniqueID, timeoutTs int64) {
if timeoutTs == 0 {
func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channelName string, nodeID UniqueID, timeout time.Duration) {
if timeout == 0 {
log.Info("zero timeoutTs, skip starting timer",
zap.String("watch state", watchState.String()),
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName),
)
return
}

stop := make(chan struct{})
ticker := time.NewTimer(timeout)
c.removeTimers([]string{channelName})
c.runningTimers.Store(channelName, stop)
timeoutT := time.Unix(0, timeoutTs)
c.runningTimerStops.Store(channelName, stop)
c.runningTimers.Store(channelName, ticker)

go func() {
log.Info("timer started",
zap.String("watch state", watchState.String()),
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName),
zap.Time("timeout time", timeoutT))
zap.Duration("check interval", timeout))
defer ticker.Stop()

select {
case <-time.NewTimer(time.Until(timeoutT)).C:
case <-ticker.C:
// check tickle at path as :tickle/[prefix]/{channel_name}
log.Info("timeout and stop timer: wait for channel ACK timeout",
zap.String("watch state", watchState.String()),
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName),
zap.Time("timeout time", timeoutT))
zap.Duration("timeout interval", timeout))
ackType := getAckType(watchState)
c.notifyTimeoutWatcher(&ackEvent{ackType, channelName, nodeID})
case <-stop:
log.Info("stop timer before timeout",
zap.String("watch state", watchState.String()),
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName),
zap.Time("timeout time", timeoutT))
zap.Duration("timeout interval", timeout))
}
}()
}
Expand All @@ -124,16 +131,25 @@ func (c *channelStateTimer) notifyTimeoutWatcher(e *ackEvent) {

func (c *channelStateTimer) removeTimers(channels []string) {
for _, channel := range channels {
if stop, ok := c.runningTimers.LoadAndDelete(channel); ok {
if stop, ok := c.runningTimerStops.LoadAndDelete(channel); ok {
close(stop.(chan struct{}))
c.runningTimers.Delete(channel)
}
}
}

func (c *channelStateTimer) stopIfExist(e *ackEvent) {
stop, ok := c.runningTimers.LoadAndDelete(e.channelName)
stop, ok := c.runningTimerStops.LoadAndDelete(e.channelName)
if ok && e.ackType != watchTimeoutAck && e.ackType != releaseTimeoutAck {
close(stop.(chan struct{}))
c.runningTimers.Delete(e.channelName)
}
}

func (c *channelStateTimer) resetIfExist(channel string, interval time.Duration) {
if value, ok := c.runningTimers.Load(channel); ok {
timer := value.(*time.Timer)
timer.Reset(interval)
}
}

Expand Down
44 changes: 23 additions & 21 deletions internal/datacoord/channel_checker_test.go
Expand Up @@ -52,10 +52,9 @@ func TestChannelStateTimer(t *testing.T) {
timer.loadAllChannels(1)

validWatchInfo := datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{},
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_ToWatch,
TimeoutTs: time.Now().Add(20 * time.Millisecond).UnixNano(),
Vchan: &datapb.VchannelInfo{},
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_ToWatch,
}
validData, err := proto.Marshal(&validWatchInfo)
require.NoError(t, err)
Expand Down Expand Up @@ -87,18 +86,20 @@ func TestChannelStateTimer(t *testing.T) {
})

t.Run("test startOne", func(t *testing.T) {
normalTimeoutTs := time.Now().Add(20 * time.Second).UnixNano()
nowTimeoutTs := time.Now().UnixNano()
zeroTimeoutTs := int64(0)
normalTimeoutTs := 20 * time.Second
nowTimeoutTs := 1 * time.Millisecond
zeroTimeoutTs := 0 * time.Second
resetTimeoutTs := 30 * time.Second
tests := []struct {
channelName string
timeoutTs int64
timeoutTs time.Duration

description string
}{
{"channel-1", normalTimeoutTs, "test stop"},
{"channel-2", nowTimeoutTs, "test timeout"},
{"channel-3", zeroTimeoutTs, "not start"},
{"channel-4", resetTimeoutTs, "reset timer"},
}

timer := newChannelStateTimer(kv)
Expand All @@ -112,6 +113,11 @@ func TestChannelStateTimer(t *testing.T) {
e := <-timeoutCh
assert.Equal(t, watchTimeoutAck, e.ackType)
assert.Equal(t, test.channelName, e.channelName)
} else if test.timeoutTs == resetTimeoutTs {
timer.resetIfExist(test.channelName, nowTimeoutTs)
e := <-timeoutCh
assert.Equal(t, watchTimeoutAck, e.ackType)
assert.Equal(t, test.channelName, e.channelName)
} else {
timer.stopIfExist(&ackEvent{watchSuccessAck, test.channelName, 1})
}
Expand All @@ -123,18 +129,17 @@ func TestChannelStateTimer(t *testing.T) {
})

t.Run("test startOne no leaking issue 17335", func(t *testing.T) {
timeoutTs := time.Now().Add(20 * time.Second).UnixNano()
timer := newChannelStateTimer(kv)

timer.startOne(datapb.ChannelWatchState_ToRelease, "channel-1", 1, timeoutTs)
stop, ok := timer.runningTimers.Load("channel-1")
timer.startOne(datapb.ChannelWatchState_ToRelease, "channel-1", 1, 20*time.Second)
stop, ok := timer.runningTimerStops.Load("channel-1")
require.True(t, ok)

timer.startOne(datapb.ChannelWatchState_ToWatch, "channel-1", 1, timeoutTs)
timer.startOne(datapb.ChannelWatchState_ToWatch, "channel-1", 1, 20*time.Second)
_, ok = <-stop.(chan struct{})
assert.False(t, ok)

stop2, ok := timer.runningTimers.Load("channel-1")
stop2, ok := timer.runningTimerStops.Load("channel-1")
assert.True(t, ok)

timer.removeTimers([]string{"channel-1"})
Expand All @@ -151,10 +156,9 @@ func TestChannelStateTimer_parses(t *testing.T) {

t.Run("test parseWatchInfo", func(t *testing.T) {
validWatchInfo := datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{},
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_ToWatch,
TimeoutTs: time.Now().Add(20 * time.Millisecond).UnixNano(),
Vchan: &datapb.VchannelInfo{},
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_ToWatch,
}
validData, err := proto.Marshal(&validWatchInfo)
require.NoError(t, err)
Expand Down Expand Up @@ -186,7 +190,6 @@ func TestChannelStateTimer_parses(t *testing.T) {
assert.NotNil(t, info)
assert.Equal(t, info.GetState(), validWatchInfo.GetState())
assert.Equal(t, info.GetStartTs(), validWatchInfo.GetStartTs())
assert.Equal(t, info.GetTimeoutTs(), validWatchInfo.GetTimeoutTs())
} else {
assert.Nil(t, info)
assert.Error(t, err)
Expand All @@ -205,9 +208,8 @@ func TestChannelStateTimer_parses(t *testing.T) {
DroppedSegments: []*datapb.SegmentInfo{{ID: 3}},
UnflushedSegmentIds: []int64{1},
},
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_ToWatch,
TimeoutTs: time.Now().Add(20 * time.Millisecond).UnixNano(),
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_ToWatch,
}

oldData, err := proto.Marshal(&oldWatchInfo)
Expand Down
31 changes: 16 additions & 15 deletions internal/datacoord/channel_manager.go
Expand Up @@ -192,22 +192,23 @@ func (c *ChannelManager) checkOldNodes(nodes []UniqueID) error {
for nodeID, watchInfos := range nodeWatchInfos {
for _, info := range watchInfos {
channelName := info.GetVchan().GetChannelName()
checkInterval := Params.DataCoordCfg.WatchTimeoutInterval.GetAsDuration(time.Second)

log.Info("processing watch info",
zap.String("watch state", info.GetState().String()),
zap.String("channel name", channelName))

switch info.GetState() {
case datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_Uncomplete:
c.stateTimer.startOne(datapb.ChannelWatchState_ToWatch, channelName, nodeID, info.GetTimeoutTs())
c.stateTimer.startOne(datapb.ChannelWatchState_ToWatch, channelName, nodeID, checkInterval)

case datapb.ChannelWatchState_WatchFailure:
if err := c.Release(nodeID, channelName); err != nil {
return err
}

case datapb.ChannelWatchState_ToRelease:
c.stateTimer.startOne(datapb.ChannelWatchState_ToRelease, channelName, nodeID, info.GetTimeoutTs())
c.stateTimer.startOne(datapb.ChannelWatchState_ToRelease, channelName, nodeID, checkInterval)

case datapb.ChannelWatchState_ReleaseSuccess:
if err := c.Reassign(nodeID, channelName); err != nil {
Expand Down Expand Up @@ -446,11 +447,10 @@ func (c *ChannelManager) fillChannelWatchInfo(op *ChannelOp) {
for _, ch := range op.Channels {
vcInfo := c.h.GetDataVChanPositions(ch, allPartitionID)
info := &datapb.ChannelWatchInfo{
Vchan: vcInfo,
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_Uncomplete,
TimeoutTs: time.Now().Add(Params.DataCoordCfg.MaxWatchDuration.GetAsDuration(time.Second)).UnixNano(),
Schema: ch.Schema,
Vchan: vcInfo,
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_Uncomplete,
Schema: ch.Schema,
}
op.ChannelWatchInfos = append(op.ChannelWatchInfos, info)
}
Expand All @@ -460,20 +460,19 @@ func (c *ChannelManager) fillChannelWatchInfo(op *ChannelOp) {
func (c *ChannelManager) fillChannelWatchInfoWithState(op *ChannelOp, state datapb.ChannelWatchState) []string {
var channelsWithTimer = []string{}
startTs := time.Now().Unix()
timeoutTs := time.Now().Add(Params.DataCoordCfg.MaxWatchDuration.GetAsDuration(time.Second)).UnixNano()
checkInterval := Params.DataCoordCfg.WatchTimeoutInterval.GetAsDuration(time.Second)
for _, ch := range op.Channels {
vcInfo := c.h.GetDataVChanPositions(ch, allPartitionID)
info := &datapb.ChannelWatchInfo{
Vchan: vcInfo,
StartTs: startTs,
State: state,
TimeoutTs: timeoutTs,
Schema: ch.Schema,
Vchan: vcInfo,
StartTs: startTs,
State: state,
Schema: ch.Schema,
}

// Only set timer for watchInfo not from bufferID
if op.NodeID != bufferID {
c.stateTimer.startOne(state, ch.Name, op.NodeID, timeoutTs)
c.stateTimer.startOne(state, ch.Name, op.NodeID, checkInterval)
channelsWithTimer = append(channelsWithTimer, ch.Name)
}

Expand Down Expand Up @@ -696,11 +695,13 @@ func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) {
continue
}

// ignore these states
// runnging states
state := watchInfo.GetState()
if state == datapb.ChannelWatchState_ToWatch ||
state == datapb.ChannelWatchState_ToRelease ||
state == datapb.ChannelWatchState_Uncomplete {
c.stateTimer.resetIfExist(watchInfo.GetVchan().ChannelName, Params.DataCoordCfg.WatchTimeoutInterval.GetAsDuration(time.Second))
log.Info("tickle update, timer delay", zap.String("channel", watchInfo.GetVchan().ChannelName), zap.Int32("progress", watchInfo.Progress))
continue
}

Expand Down

0 comments on commit 5b69f5f

Please sign in to comment.