Skip to content
This repository was archived by the owner on Apr 2, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion internal/querynodev2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/samber/lo"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -745,6 +746,8 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe
zap.String("channel", channel),
zap.String("scope", req.GetScope().String()),
)
span := trace.SpanFromContext(ctx)
hasSpan := span.SpanContext().IsValid()
channelsMvcc := make(map[string]uint64)
for _, ch := range req.GetDmlChannels() {
channelsMvcc[ch] = req.GetReq().GetMvccTimestamp()
Expand All @@ -771,7 +774,7 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe
searchCtx, cancel := context.WithCancel(ctx)
defer cancel()

tr := timerecord.NewTimeRecorder("searchSegments")
tr := timerecord.NewTimeRecorderWithTrace(ctx, "searchSegments")
log.Debug("search segments...")

if !node.manager.Collection.Ref(req.Req.GetCollectionID(), 1) {
Expand All @@ -792,18 +795,36 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe
task = tasks.NewSearchTask(searchCtx, collection, node.manager, req, node.serverID)
}

if hasSpan {
span.AddEvent("searchSegments.schedule.enqueue", trace.WithAttributes(
attribute.Int("segment_count", len(req.GetSegmentIDs())),
attribute.String("channel", channel),
))
}
enqueueStart := time.Now()
if err := node.scheduler.Add(task); err != nil {
log.Warn("failed to search channel", zap.Error(err))
resp.Status = merr.Status(err)
return resp, nil
}
if hasSpan {
span.AddEvent("searchSegments.schedule.enqueued", trace.WithAttributes(
attribute.Int64("enqueue_ms", time.Since(enqueueStart).Milliseconds()),
))
}

waitStart := time.Now()
err := task.Wait()
if err != nil {
log.Warn("failed to search segments", zap.Error(err))
resp.Status = merr.Status(err)
return resp, nil
}
if hasSpan {
span.AddEvent("searchSegments.schedule.wait_done", trace.WithAttributes(
attribute.Int64("wait_ms", time.Since(waitStart).Milliseconds()),
))
}

tr.CtxElapse(ctx, fmt.Sprintf("search segments done, channel = %s, segmentIDs = %v",
channel,
Expand Down
25 changes: 24 additions & 1 deletion internal/querynodev2/tasks/search_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
"context"
"fmt"
"strconv"
"time"

"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -55,6 +57,7 @@ type SearchTask struct {

tr *timerecord.TimeRecorder
scheduleSpan trace.Span
readyEnqueue time.Time
}

func NewSearchTask(ctx context.Context,
Expand Down Expand Up @@ -97,6 +100,25 @@ func (t *SearchTask) IsGpuIndex() bool {
return t.collection.IsGpuIndex()
}

// MarkReadyEnqueue records when the task enters the ready queue.
func (t *SearchTask) MarkReadyEnqueue(attrs ...attribute.KeyValue) {
if t.scheduleSpan == nil {
return
}
t.readyEnqueue = time.Now()
t.scheduleSpan.AddEvent("schedule.ready_enqueue", trace.WithAttributes(attrs...))
}

// MarkReadyDequeue records when the task leaves the ready queue and how long it waited there.
func (t *SearchTask) MarkReadyDequeue(attrs ...attribute.KeyValue) {
if t.scheduleSpan == nil || t.readyEnqueue.IsZero() {
return
}
waitMs := time.Since(t.readyEnqueue).Milliseconds()
attrs = append(attrs, attribute.Int64("ready_wait_ms", waitMs))
t.scheduleSpan.AddEvent("schedule.ready_dequeue", trace.WithAttributes(attrs...))
}

func (t *SearchTask) PreExecute() error {
// Update task wait time metric before execute
nodeID := strconv.FormatInt(t.GetNodeID(), 10)
Expand Down Expand Up @@ -228,13 +250,14 @@ func (t *SearchTask) Execute() error {
log.Warn("failed to reduce search results", zap.Error(err))
return err
}
reduceDuration := tr.CtxRecord(t.ctx, "search reduce")
defer segcore.DeleteSearchResultDataBlobs(blobs)
metrics.QueryNodeReduceLatency.WithLabelValues(
fmt.Sprint(t.GetNodeID()),
metrics.SearchLabel,
metrics.ReduceSegments,
metrics.BatchReduce).
Observe(float64(tr.RecordSpan().Milliseconds()))
Observe(float64(reduceDuration.Milliseconds()))
for i := range t.originNqs {
blob, cost, err := segcore.GetSearchResultDataBlob(t.ctx, blobs, i)
if err != nil {
Expand Down
25 changes: 25 additions & 0 deletions internal/util/searchutil/scheduler/concurrent_safe_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"sync"

"go.opentelemetry.io/otel/attribute"
"go.uber.org/atomic"
"go.uber.org/zap"

Expand Down Expand Up @@ -41,6 +42,12 @@ type addTaskReq struct {
err chan<- error
}

// readyQueueTracer optionally lets tasks receive instrumentation callbacks for ready-queue timing.
type readyQueueTracer interface {
MarkReadyEnqueue(attrs ...attribute.KeyValue)
MarkReadyDequeue(attrs ...attribute.KeyValue)
}

// scheduler is a general concurrent safe scheduler implementation by wrapping a schedule policy.
type scheduler struct {
policy schedulePolicy
Expand Down Expand Up @@ -140,6 +147,12 @@ func (s *scheduler) schedule() {
// And consume recv chan as much as possible.
s.consumeRecvChan(req, maxReceiveChanBatchConsumeNum)
case execChan <- task:
if tracer, ok := task.(readyQueueTracer); ok {
tracer.MarkReadyDequeue(
attribute.Int("ready_len", s.policy.Len()),
attribute.Int64("waiting_nq", s.GetWaitingTaskTotalNQ()),
)
}
// Task sent, drop the ownership of sent task.
// Update waiting task counter.
s.updateWaitingTaskCounter(-1, -nq)
Expand Down Expand Up @@ -185,6 +198,12 @@ func (s *scheduler) handleAddTaskRequest(req addTaskReq, maxWaitTaskNum int64) b
newTaskAdded, err := s.policy.Push(req.task)
if err == nil {
s.updateWaitingTaskCounter(int64(newTaskAdded), nq)
if tracer, ok := req.task.(readyQueueTracer); ok {
tracer.MarkReadyEnqueue(
attribute.Int("ready_len", s.policy.Len()),
attribute.Int64("waiting_nq", s.GetWaitingTaskTotalNQ()),
)
}
}
req.err <- err
}
Expand All @@ -203,6 +222,12 @@ func (s *scheduler) produceExecChan() Task {

select {
case execChan <- task:
if tracer, ok := task.(readyQueueTracer); ok {
tracer.MarkReadyDequeue(
attribute.Int("ready_len", s.policy.Len()),
attribute.Int64("waiting_nq", s.GetWaitingTaskTotalNQ()),
)
}
// Update waiting task counter.
s.updateWaitingTaskCounter(-1, -nq)
// Task sent, drop the ownership of sent task.
Expand Down
Loading