Skip to content

Commit

Permalink
enhance: add metrics for stopping querynode balance progress
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <wei.liu@zilliz.com>
  • Loading branch information
weiliu1031 committed Dec 15, 2023
1 parent 4731c1b commit 7c58a10
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 15 deletions.
37 changes: 22 additions & 15 deletions internal/querynodev2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,40 +394,47 @@ func (node *QueryNode) Stop() error {
if err != nil {
log.Warn("session fail to go stopping state", zap.Error(err))
} else {
metrics.StoppingBalanceNodeNum.WithLabelValues().Set(1)
timeoutCh := time.After(paramtable.Get().QueryNodeCfg.GracefulStopTimeout.GetAsDuration(time.Second))

outer:
for (node.manager != nil && !node.manager.Segment.Empty()) ||
(node.pipelineManager != nil && node.pipelineManager.Num() != 0) {
var (
sealedSegments = []segments.Segment{}
growingSegments = []segments.Segment{}
channelNum = 0
)
if node.manager != nil {
sealedSegments = node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeSealed))
growingSegments = node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeGrowing))
}
if node.pipelineManager != nil {
channelNum = node.pipelineManager.Num()
}

select {
case <-timeoutCh:
var (
sealedSegments = []segments.Segment{}
growingSegments = []segments.Segment{}
channelNum = 0
)
if node.manager != nil {
sealedSegments = node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeSealed))
growingSegments = node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeGrowing))
}
if node.pipelineManager != nil {
channelNum = node.pipelineManager.Num()
}

log.Warn("migrate data timed out", zap.Int64("ServerID", paramtable.GetNodeID()),
zap.Int64s("sealedSegments", lo.Map[segments.Segment, int64](sealedSegments, func(s segments.Segment, i int) int64 {
zap.Int64s("sealedSegments", lo.Map(sealedSegments, func(s segments.Segment, i int) int64 {
return s.ID()
})),
zap.Int64s("growingSegments", lo.Map[segments.Segment, int64](growingSegments, func(t segments.Segment, i int) int64 {
zap.Int64s("growingSegments", lo.Map(growingSegments, func(t segments.Segment, i int) int64 {
return t.ID()
})),
zap.Int("channelNum", channelNum),
)
break outer

case <-time.After(time.Second):
metrics.StoppingBalanceSegmentNum.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(len(sealedSegments)))
metrics.StoppingBalanceChannelNum.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(channelNum))
}
}

metrics.StoppingBalanceNodeNum.WithLabelValues().Set(0)
metrics.StoppingBalanceSegmentNum.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(0)
metrics.StoppingBalanceChannelNum.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(0)
}

node.UpdateStateCode(commonpb.StateCode_Abnormal)
Expand Down
27 changes: 27 additions & 0 deletions pkg/metrics/querynode_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,30 @@ var (
}, []string{
nodeIDLabelName,
})

StoppingBalanceNodeNum = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "stopping_balance_node_num",
Help: "the number of node which executing stopping balance",
}, []string{})

StoppingBalanceChannelNum = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "stopping_balance_channel_num",
Help: "the number of channel which executing stopping balance",
}, []string{nodeIDLabelName})

StoppingBalanceSegmentNum = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "stopping_balance_segment_num",
Help: "the number of segment which executing stopping balance",
}, []string{nodeIDLabelName})
)

// RegisterQueryNode registers QueryNode metrics
Expand Down Expand Up @@ -497,6 +521,9 @@ func RegisterQueryNode(registry *prometheus.Registry) {
registry.MustRegister(QueryNodeDiskUsedSize)
registry.MustRegister(QueryNodeProcessCost)
registry.MustRegister(QueryNodeWaitProcessingMsgCount)
registry.MustRegister(StoppingBalanceNodeNum)
registry.MustRegister(StoppingBalanceChannelNum)
registry.MustRegister(StoppingBalanceSegmentNum)
}

func CleanupQueryNodeCollectionMetrics(nodeID int64, collectionID int64) {
Expand Down

0 comments on commit 7c58a10

Please sign in to comment.