Skip to content

Commit

Permalink
enhance: the proxy metric in the query request
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG committed May 23, 2024
1 parent 229a6b9 commit 3dbbc9a
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 92 deletions.
199 changes: 108 additions & 91 deletions internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3406,51 +3406,33 @@ func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*
}

// Query get the records by primary keys.
func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryResults, error) {
func (node *Proxy) query(ctx context.Context, qt *queryTask, isProxyRequest bool) (*milvuspb.QueryResults, error) {
request := qt.request
receiveSize := proto.Size(request)
metrics.ProxyReceiveBytes.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.QueryLabel,
request.GetCollectionName(),
).Add(float64(receiveSize))

metrics.ProxyReceivedNQ.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.SearchLabel,
request.GetCollectionName(),
).Add(float64(1))

subLabel := GetCollectionRateSubLabel(request)
rateCol.Add(internalpb.RateType_DQLQuery.String(), 1, subLabel)
method := "Query"

if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
return &milvuspb.QueryResults{
Status: merr.Status(err),
}, nil
}

ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Query")
defer sp.End()
tr := timerecord.NewTimeRecorder("Query")

method := "Query"

metrics.ProxyFunctionCall.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
method,
metrics.TotalLabel,
request.GetDbName(),
request.GetCollectionName(),
).Inc()

log := log.Ctx(ctx).With(
zap.String("role", typeutil.ProxyRole),
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName),
zap.Strings("partitions", request.PartitionNames),
)

log.Debug(
rpcReceived(method),
zap.String("expr", request.Expr),
zap.Strings("OutputFields", request.OutputFields),
zap.Uint64("travel_timestamp", request.TravelTimestamp),
zap.Uint64("guarantee_timestamp", request.GuaranteeTimestamp),
)

tr := timerecord.NewTimeRecorder(method)

defer func() {
span := tr.ElapseSpan()
if span >= paramtable.Get().ProxyCfg.SlowQuerySpanInSeconds.GetAsDuration(time.Second) {
Expand All @@ -3468,27 +3450,21 @@ func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryRes
}
}()

log.Debug(
rpcReceived(method),
zap.String("expr", request.Expr),
zap.Strings("OutputFields", request.OutputFields),
zap.Uint64("travel_timestamp", request.TravelTimestamp),
zap.Uint64("guarantee_timestamp", request.GuaranteeTimestamp),
)

if err := node.sched.dqQueue.Enqueue(qt); err != nil {
log.Warn(
rpcFailedToEnqueue(method),
zap.Error(err),
)

metrics.ProxyFunctionCall.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
method,
metrics.AbandonLabel,
request.GetDbName(),
request.GetCollectionName(),
).Inc()
if isProxyRequest {
metrics.ProxyFunctionCall.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
method,
metrics.AbandonLabel,
request.GetDbName(),
request.GetCollectionName(),
).Inc()
}

return &milvuspb.QueryResults{
Status: merr.Status(err),
Expand All @@ -3503,45 +3479,36 @@ func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryRes
rpcFailedToWaitToFinish(method),
zap.Error(err))

metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc()
if isProxyRequest {
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc()

Check warning on line 3484 in internal/proxy/impl.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/impl.go#L3483-L3484

Added lines #L3483 - L3484 were not covered by tests
}

return &milvuspb.QueryResults{
Status: merr.Status(err),
}, nil
}
span := tr.CtxRecord(ctx, "wait query result")
metrics.ProxyWaitForSearchResultLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.QueryLabel,
).Observe(float64(span.Milliseconds()))

log.Debug(rpcDone(method))

metrics.ProxyFunctionCall.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
method,
metrics.SuccessLabel,
request.GetDbName(),
request.GetCollectionName(),
).Inc()

metrics.ProxySQLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.QueryLabel,
request.GetDbName(),
request.GetCollectionName(),
).Observe(float64(tr.ElapseSpan().Milliseconds()))
if isProxyRequest {
span := tr.CtxRecord(ctx, "wait query result")
metrics.ProxyWaitForSearchResultLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.QueryLabel,
).Observe(float64(span.Milliseconds()))

metrics.ProxyCollectionSQLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.QueryLabel,
request.CollectionName,
).Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.ProxySQLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.QueryLabel,
request.GetDbName(),
request.GetCollectionName(),
).Observe(float64(tr.ElapseSpan().Milliseconds()))

sentSize := proto.Size(qt.result)
rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize), subLabel)
metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))
metrics.ProxyCollectionSQLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.QueryLabel,
request.CollectionName,
).Observe(float64(tr.ElapseSpan().Milliseconds()))
}

return qt.result, nil
}
Expand All @@ -3563,22 +3530,72 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
lb: node.lbPolicy,
mustUsePartitionKey: Params.ProxyCfg.MustUsePartitionKey.GetAsBool(),
}
res, err := node.query(ctx, qt)
if merr.Ok(res.Status) && err == nil {
username := GetCurUserFromContextOrDefault(ctx)
nodeID := paramtable.GetStringNodeID()
v := Extension.Report(map[string]any{
hookutil.OpTypeKey: hookutil.OpTypeQuery,
hookutil.DatabaseKey: request.DbName,
hookutil.UsernameKey: username,
hookutil.ResultDataSizeKey: proto.Size(res),
hookutil.RelatedDataSizeKey: qt.totalRelatedDataSize,
hookutil.RelatedCntKey: qt.allQueryCnt,
})
SetReportValue(res.Status, v)
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeQuery, request.DbName, username).Add(float64(v))

subLabel := GetCollectionRateSubLabel(request)
receiveSize := proto.Size(request)
metrics.ProxyReceiveBytes.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.QueryLabel,
request.GetCollectionName(),
).Add(float64(receiveSize))
metrics.ProxyReceivedNQ.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.SearchLabel,
request.GetCollectionName(),
).Add(float64(1))

rateCol.Add(internalpb.RateType_DQLQuery.String(), 1, subLabel)

if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
return &milvuspb.QueryResults{
Status: merr.Status(err),
}, nil
}
return res, err

ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Query")
defer sp.End()
method := "Query"

metrics.ProxyFunctionCall.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
method,
metrics.TotalLabel,
request.GetDbName(),
request.GetCollectionName(),
).Inc()

res, err := node.query(ctx, qt, true)
if err != nil || !merr.Ok(res.Status) {
return res, err
}

log.Debug(rpcDone(method))

metrics.ProxyFunctionCall.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
method,
metrics.SuccessLabel,
request.GetDbName(),
request.GetCollectionName(),
).Inc()

sentSize := proto.Size(qt.result)
rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize), subLabel)
metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))

username := GetCurUserFromContextOrDefault(ctx)
nodeID := paramtable.GetStringNodeID()
v := Extension.Report(map[string]any{
hookutil.OpTypeKey: hookutil.OpTypeQuery,
hookutil.DatabaseKey: request.DbName,
hookutil.UsernameKey: username,
hookutil.ResultDataSizeKey: proto.Size(res),
hookutil.RelatedDataSizeKey: qt.totalRelatedDataSize,
hookutil.RelatedCntKey: qt.allQueryCnt,
})
SetReportValue(res.Status, v)
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeQuery, request.DbName, username).Add(float64(v))
return res, nil
}

// CreateAlias create alias for collection, then you can search the collection with alias.
Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/task_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,7 @@ func doRequery(ctx context.Context,
fastSkip: true,
reQuery: true,
}
queryResult, err := node.(*Proxy).query(ctx, qt)
queryResult, err := node.(*Proxy).query(ctx, qt, false)
if err != nil {
return err
}
Expand Down

0 comments on commit 3dbbc9a

Please sign in to comment.