Skip to content

Commit

Permalink
fix datacoord consume datanode tt metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <wei.liu@zilliz.com>
  • Loading branch information
weiliu1031 committed Jul 19, 2023
1 parent 0dac3fc commit 110625e
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 8 deletions.
2 changes: 1 addition & 1 deletion internal/datacoord/services.go
Expand Up @@ -1440,7 +1440,7 @@ func (s *Server) ReportDataNodeTtMsgs(ctx context.Context, req *datapb.ReportDat
}

for _, ttMsg := range req.GetMsgs() {
sub := tsoutil.SubByNow(req.GetBase().GetTimestamp())
sub := tsoutil.SubByNow(ttMsg.GetTimestamp())
metrics.DataCoordConsumeDataNodeTimeTickLag.
WithLabelValues(fmt.Sprint(s.session.ServerID), ttMsg.GetChannelName()).
Set(float64(sub))
Expand Down
8 changes: 5 additions & 3 deletions internal/datanode/timetick_sender.go
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/tsoutil"
)

// timeTickSender is to merge channel states updated by flow graph node and send to datacoord periodically
Expand Down Expand Up @@ -64,8 +65,8 @@ func (m *timeTickSender) start(ctx context.Context) {
case <-ctx.Done():
log.Info("timeTickSender context done")
return
case t := <-ticker.C:
m.sendReport(ctx, uint64(t.Unix()))
case <-ticker.C:
m.sendReport(ctx)
}
}
}
Expand Down Expand Up @@ -157,10 +158,11 @@ func (m *timeTickSender) cleanStatesCache(sendedLastTss map[string]uint64) {
log.RatedDebug(30, "timeTickSender channelStatesCaches", zap.Int("sizeAfterClean", len(m.channelStatesCaches)))
}

func (m *timeTickSender) sendReport(ctx context.Context, submitTs Timestamp) error {
func (m *timeTickSender) sendReport(ctx context.Context) error {
toSendMsgs, sendLastTss := m.mergeDatanodeTtMsg()
log.RatedDebug(30, "timeTickSender send datanode timetick message", zap.Any("toSendMsgs", toSendMsgs), zap.Any("sendLastTss", sendLastTss))
err := retry.Do(ctx, func() error {
submitTs := tsoutil.ComposeTSByTime(time.Now(), 0)
statusResp, err := m.dataCoord.ReportDataNodeTtMsgs(ctx, &datapb.ReportDataNodeTtMsgsRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_DataNodeTt),
Expand Down
8 changes: 4 additions & 4 deletions internal/datanode/timetick_sender_test.go
Expand Up @@ -86,7 +86,7 @@ func TestTimetickManagerNormal(t *testing.T) {
}
manager.update(channelName2, ts3, segmentStats3)

err := manager.sendReport(ctx, 100)
err := manager.sendReport(ctx)
assert.NoError(t, err)

_, channelExistAfterSubmit := manager.channelStatesCaches[channelName1]
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestTimetickManagerNormal(t *testing.T) {
}
manager.update(channelName3, ts5, segmentStats5)

err = manager.sendReport(ctx, 100)
err = manager.sendReport(ctx)
assert.NoError(t, err)

_, channelExistAfterSubmit2 := manager.channelStatesCaches[channelName1]
Expand All @@ -148,7 +148,7 @@ func TestTimetickManagerSendErr(t *testing.T) {
}
// update first time
manager.update(channelName1, ts, segmentStats)
err := manager.sendReport(ctx, 100)
err := manager.sendReport(ctx)
assert.Error(t, err)
}

Expand All @@ -167,6 +167,6 @@ func TestTimetickManagerSendNotSuccess(t *testing.T) {
}
// update first time
manager.update(channelName1, ts, segmentStats)
err := manager.sendReport(ctx, 100)
err := manager.sendReport(ctx)
assert.Error(t, err)
}

0 comments on commit 110625e

Please sign in to comment.