diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 1869db8bcc0b2..a47729ae7bc49 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -24,6 +24,7 @@ import ( "time" "github.com/samber/lo" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -745,6 +746,8 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe zap.String("channel", channel), zap.String("scope", req.GetScope().String()), ) + span := trace.SpanFromContext(ctx) + hasSpan := span.SpanContext().IsValid() channelsMvcc := make(map[string]uint64) for _, ch := range req.GetDmlChannels() { channelsMvcc[ch] = req.GetReq().GetMvccTimestamp() @@ -771,7 +774,7 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe searchCtx, cancel := context.WithCancel(ctx) defer cancel() - tr := timerecord.NewTimeRecorder("searchSegments") + tr := timerecord.NewTimeRecorderWithTrace(ctx, "searchSegments") log.Debug("search segments...") if !node.manager.Collection.Ref(req.Req.GetCollectionID(), 1) { @@ -792,18 +795,36 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe task = tasks.NewSearchTask(searchCtx, collection, node.manager, req, node.serverID) } + if hasSpan { + span.AddEvent("searchSegments.schedule.enqueue", trace.WithAttributes( + attribute.Int("segment_count", len(req.GetSegmentIDs())), + attribute.String("channel", channel), + )) + } + enqueueStart := time.Now() if err := node.scheduler.Add(task); err != nil { log.Warn("failed to search channel", zap.Error(err)) resp.Status = merr.Status(err) return resp, nil } + if hasSpan { + span.AddEvent("searchSegments.schedule.enqueued", trace.WithAttributes( + attribute.Int64("enqueue_ms", time.Since(enqueueStart).Milliseconds()), + )) + } + waitStart := time.Now() err := task.Wait() if err != nil { log.Warn("failed to search segments", zap.Error(err)) resp.Status = merr.Status(err) return resp, nil } + if hasSpan { + span.AddEvent("searchSegments.schedule.wait_done", trace.WithAttributes( + attribute.Int64("wait_ms", time.Since(waitStart).Milliseconds()), + )) + } tr.CtxElapse(ctx, fmt.Sprintf("search segments done, channel = %s, segmentIDs = %v", channel, diff --git a/internal/querynodev2/tasks/search_task.go b/internal/querynodev2/tasks/search_task.go index 1db0775adca16..08a9bdb9c9c08 100644 --- a/internal/querynodev2/tasks/search_task.go +++ b/internal/querynodev2/tasks/search_task.go @@ -9,9 +9,11 @@ import ( "context" "fmt" "strconv" + "time" "github.com/samber/lo" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "google.golang.org/protobuf/proto" @@ -55,6 +57,7 @@ type SearchTask struct { tr *timerecord.TimeRecorder scheduleSpan trace.Span + readyEnqueue time.Time } func NewSearchTask(ctx context.Context, @@ -97,6 +100,25 @@ func (t *SearchTask) IsGpuIndex() bool { return t.collection.IsGpuIndex() } +// MarkReadyEnqueue records when the task enters the ready queue. +func (t *SearchTask) MarkReadyEnqueue(attrs ...attribute.KeyValue) { + if t.scheduleSpan == nil { + return + } + t.readyEnqueue = time.Now() + t.scheduleSpan.AddEvent("schedule.ready_enqueue", trace.WithAttributes(attrs...)) +} + +// MarkReadyDequeue records when the task leaves the ready queue and how long it waited there. +func (t *SearchTask) MarkReadyDequeue(attrs ...attribute.KeyValue) { + if t.scheduleSpan == nil || t.readyEnqueue.IsZero() { + return + } + waitMs := time.Since(t.readyEnqueue).Milliseconds() + attrs = append(attrs, attribute.Int64("ready_wait_ms", waitMs)) + t.scheduleSpan.AddEvent("schedule.ready_dequeue", trace.WithAttributes(attrs...)) +} + func (t *SearchTask) PreExecute() error { // Update task wait time metric before execute nodeID := strconv.FormatInt(t.GetNodeID(), 10) @@ -228,13 +250,14 @@ func (t *SearchTask) Execute() error { log.Warn("failed to reduce search results", zap.Error(err)) return err } + reduceDuration := tr.CtxRecord(t.ctx, "search reduce") defer segcore.DeleteSearchResultDataBlobs(blobs) metrics.QueryNodeReduceLatency.WithLabelValues( fmt.Sprint(t.GetNodeID()), metrics.SearchLabel, metrics.ReduceSegments, metrics.BatchReduce). - Observe(float64(tr.RecordSpan().Milliseconds())) + Observe(float64(reduceDuration.Milliseconds())) for i := range t.originNqs { blob, cost, err := segcore.GetSearchResultDataBlob(t.ctx, blobs, i) if err != nil { diff --git a/internal/util/searchutil/scheduler/concurrent_safe_scheduler.go b/internal/util/searchutil/scheduler/concurrent_safe_scheduler.go index 3a7cb079199ee..d76895042014a 100644 --- a/internal/util/searchutil/scheduler/concurrent_safe_scheduler.go +++ b/internal/util/searchutil/scheduler/concurrent_safe_scheduler.go @@ -4,6 +4,7 @@ import ( "fmt" "sync" + "go.opentelemetry.io/otel/attribute" "go.uber.org/atomic" "go.uber.org/zap" @@ -41,6 +42,12 @@ type addTaskReq struct { err chan<- error } +// readyQueueTracer optionally lets tasks receive instrumentation callbacks for ready-queue timing. +type readyQueueTracer interface { + MarkReadyEnqueue(attrs ...attribute.KeyValue) + MarkReadyDequeue(attrs ...attribute.KeyValue) +} + // scheduler is a general concurrent safe scheduler implementation by wrapping a schedule policy. type scheduler struct { policy schedulePolicy @@ -140,6 +147,12 @@ func (s *scheduler) schedule() { // And consume recv chan as much as possible. s.consumeRecvChan(req, maxReceiveChanBatchConsumeNum) case execChan <- task: + if tracer, ok := task.(readyQueueTracer); ok { + tracer.MarkReadyDequeue( + attribute.Int("ready_len", s.policy.Len()), + attribute.Int64("waiting_nq", s.GetWaitingTaskTotalNQ()), + ) + } // Task sent, drop the ownership of sent task. // Update waiting task counter. s.updateWaitingTaskCounter(-1, -nq) @@ -185,6 +198,12 @@ func (s *scheduler) handleAddTaskRequest(req addTaskReq, maxWaitTaskNum int64) b newTaskAdded, err := s.policy.Push(req.task) if err == nil { s.updateWaitingTaskCounter(int64(newTaskAdded), nq) + if tracer, ok := req.task.(readyQueueTracer); ok { + tracer.MarkReadyEnqueue( + attribute.Int("ready_len", s.policy.Len()), + attribute.Int64("waiting_nq", s.GetWaitingTaskTotalNQ()), + ) + } } req.err <- err } @@ -203,6 +222,12 @@ func (s *scheduler) produceExecChan() Task { select { case execChan <- task: + if tracer, ok := task.(readyQueueTracer); ok { + tracer.MarkReadyDequeue( + attribute.Int("ready_len", s.policy.Len()), + attribute.Int64("waiting_nq", s.GetWaitingTaskTotalNQ()), + ) + } // Update waiting task counter. s.updateWaitingTaskCounter(-1, -nq) // Task sent, drop the ownership of sent task.