@@ -21,13 +21,15 @@ import (
21
21
"github.com/grafana/dskit/runutil"
22
22
"github.com/oklog/ulid"
23
23
"github.com/opentracing/opentracing-go"
24
- otlog "github.com/opentracing/opentracing-go/log"
25
24
"github.com/pkg/errors"
26
25
"github.com/prometheus/common/model"
27
26
"github.com/prometheus/prometheus/promql/parser"
28
27
"github.com/samber/lo"
29
28
"github.com/segmentio/parquet-go"
30
29
"github.com/thanos-io/objstore"
30
+ "go.opentelemetry.io/otel"
31
+ attribute "go.opentelemetry.io/otel/attribute"
32
+ "go.opentelemetry.io/otel/trace"
31
33
"golang.org/x/sync/errgroup"
32
34
"google.golang.org/grpc/codes"
33
35
@@ -557,8 +559,8 @@ func SelectMatchingProfiles(ctx context.Context, request *ingestv1.SelectProfile
557
559
}
558
560
559
561
func MergeProfilesStacktraces (ctx context.Context , stream * connect.BidiStream [ingestv1.MergeProfilesStacktracesRequest , ingestv1.MergeProfilesStacktracesResponse ], blockGetter BlockGetter ) error {
560
- sp , ctx := opentracing . StartSpanFromContext (ctx , "MergeProfilesStacktraces" )
561
- defer sp .Finish ()
562
+ ctx , sp := otel . Tracer ( "github.com/grafana/pyroscope" ). Start (ctx , "MergeProfilesStacktraces" )
563
+ defer sp .End ()
562
564
563
565
r , err := stream .Receive ()
564
566
if err != nil {
@@ -572,12 +574,11 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in
572
574
return connect .NewError (connect .CodeInvalidArgument , errors .New ("missing initial select request" ))
573
575
}
574
576
request := r .Request
575
- sp .LogFields (
576
- otlog .String ("start" , model .Time (request .Start ).Time ().String ()),
577
- otlog .String ("end" , model .Time (request .End ).Time ().String ()),
578
- otlog .String ("selector" , request .LabelSelector ),
579
- otlog .String ("profile_id" , request .Type .ID ),
580
- )
577
+ sp .AddEvent ("TODO" , trace .WithAttributes (
578
+ attribute .String ("start" , model .Time (request .Start ).Time ().String ()),
579
+ attribute .String ("end" , model .Time (request .End ).Time ().String ()),
580
+ attribute .String ("selector" , request .LabelSelector ),
581
+ attribute .String ("profile_id" , request .Type .ID )))
581
582
582
583
queriers , err := blockGetter (ctx , model .Time (request .Start ), model .Time (request .End ))
583
584
if err != nil {
@@ -621,7 +622,7 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in
621
622
622
623
// Signals the end of the profile streaming by sending an empty response.
623
624
// This allows the client to not block other streaming ingesters.
624
- sp .LogFields ( otlog . String ("msg" , "signaling the end of the profile streaming" ))
625
+ sp .AddEvent ( "TODO" , trace . WithAttributes ( attribute . String ("msg" , "signaling the end of the profile streaming" ) ))
625
626
if err = stream .Send (& ingestv1.MergeProfilesStacktracesResponse {}); err != nil {
626
627
return err
627
628
}
@@ -631,7 +632,7 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in
631
632
}
632
633
633
634
// sends the final result to the client.
634
- sp .LogFields ( otlog . String ("msg" , "sending the final result to the client" ))
635
+ sp .AddEvent ( "TODO" , trace . WithAttributes ( attribute . String ("msg" , "sending the final result to the client" ) ))
635
636
err = stream .Send (& ingestv1.MergeProfilesStacktracesResponse {
636
637
Result : & ingestv1.MergeProfilesStacktracesResult {
637
638
Format : ingestv1 .StacktracesMergeFormat_MERGE_FORMAT_TREE ,
@@ -649,8 +650,8 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in
649
650
}
650
651
651
652
func MergeProfilesLabels (ctx context.Context , stream * connect.BidiStream [ingestv1.MergeProfilesLabelsRequest , ingestv1.MergeProfilesLabelsResponse ], blockGetter BlockGetter ) error {
652
- sp , ctx := opentracing . StartSpanFromContext (ctx , "MergeProfilesLabels" )
653
- defer sp .Finish ()
653
+ ctx , sp := otel . Tracer ( "github.com/grafana/pyroscope" ). Start (ctx , "MergeProfilesLabels" )
654
+ defer sp .End ()
654
655
655
656
r , err := stream .Receive ()
656
657
if err != nil {
@@ -666,13 +667,12 @@ func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv
666
667
request := r .Request
667
668
by := r .By
668
669
sort .Strings (by )
669
- sp .LogFields (
670
- otlog .String ("start" , model .Time (request .Start ).Time ().String ()),
671
- otlog .String ("end" , model .Time (request .End ).Time ().String ()),
672
- otlog .String ("selector" , request .LabelSelector ),
673
- otlog .String ("profile_id" , request .Type .ID ),
674
- otlog .String ("by" , strings .Join (by , "," )),
675
- )
670
+ sp .AddEvent ("TODO" , trace .WithAttributes (
671
+ attribute .String ("start" , model .Time (request .Start ).Time ().String ()),
672
+ attribute .String ("end" , model .Time (request .End ).Time ().String ()),
673
+ attribute .String ("selector" , request .LabelSelector ),
674
+ attribute .String ("profile_id" , request .Type .ID ),
675
+ attribute .String ("by" , strings .Join (by , "," ))))
676
676
677
677
queriers , err := blockGetter (ctx , model .Time (request .Start ), model .Time (request .End ))
678
678
if err != nil {
@@ -743,8 +743,8 @@ func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv
743
743
}
744
744
745
745
func MergeProfilesPprof (ctx context.Context , stream * connect.BidiStream [ingestv1.MergeProfilesPprofRequest , ingestv1.MergeProfilesPprofResponse ], blockGetter BlockGetter ) error {
746
- sp , ctx := opentracing . StartSpanFromContext (ctx , "MergeProfilesPprof" )
747
- defer sp .Finish ()
746
+ ctx , sp := otel . Tracer ( "github.com/grafana/pyroscope" ). Start (ctx , "MergeProfilesPprof" )
747
+ defer sp .End ()
748
748
749
749
r , err := stream .Receive ()
750
750
if err != nil {
@@ -758,12 +758,11 @@ func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1
758
758
return connect .NewError (connect .CodeInvalidArgument , errors .New ("missing initial select request" ))
759
759
}
760
760
request := r .Request
761
- sp .LogFields (
762
- otlog .String ("start" , model .Time (request .Start ).Time ().String ()),
763
- otlog .String ("end" , model .Time (request .End ).Time ().String ()),
764
- otlog .String ("selector" , request .LabelSelector ),
765
- otlog .String ("profile_id" , request .Type .ID ),
766
- )
761
+ sp .AddEvent ("TODO" , trace .WithAttributes (
762
+ attribute .String ("start" , model .Time (request .Start ).Time ().String ()),
763
+ attribute .String ("end" , model .Time (request .End ).Time ().String ()),
764
+ attribute .String ("selector" , request .LabelSelector ),
765
+ attribute .String ("profile_id" , request .Type .ID )))
767
766
768
767
queriers , err := blockGetter (ctx , model .Time (request .Start ), model .Time (request .End ))
769
768
if err != nil {
@@ -889,8 +888,9 @@ func retrieveStacktracePartition(buf [][]parquet.Value, pos int) uint64 {
889
888
}
890
889
891
890
func (b * singleBlockQuerier ) SelectMatchingProfiles (ctx context.Context , params * ingestv1.SelectProfilesRequest ) (iter.Iterator [Profile ], error ) {
892
- sp , ctx := opentracing .StartSpanFromContext (ctx , "SelectMatchingProfiles - Block" )
893
- defer sp .Finish ()
891
+ ctx , sp := otel .Tracer ("github.com/grafana/pyroscope" ).Start (ctx , "SelectMatchingProfiles - Block" )
892
+ defer sp .End ()
893
+
894
894
if err := b .Open (ctx ); err != nil {
895
895
return nil , err
896
896
}
@@ -1045,9 +1045,9 @@ func (q *singleBlockQuerier) openFiles(ctx context.Context) error {
1045
1045
sp , ctx := opentracing .StartSpanFromContext (ctx , "BlockQuerier - open" )
1046
1046
defer func () {
1047
1047
q .metrics .blockOpeningLatency .Observe (time .Since (start ).Seconds ())
1048
- sp .LogFields (
1049
- otlog .String ("block_ulid" , q .meta .ULID .String ()),
1050
- )
1048
+ sp .AddEvent ( "TODO" , trace . WithAttributes (
1049
+ attribute .String ("block_ulid" , q .meta .ULID .String ())))
1050
+
1051
1051
sp .Finish ()
1052
1052
}()
1053
1053
g , ctx := errgroup .WithContext (ctx )
0 commit comments