Skip to content
This repository was archived by the owner on Jul 19, 2023. It is now read-only.

Commit 3a9306c

Browse files
committedJul 10, 2023
Add otel traces to visualize tests better
Switch to otel
1 parent 3869536 commit 3a9306c

File tree

2 files changed

+161
-26
lines changed

2 files changed

+161
-26
lines changed
 

‎pkg/phlaredb/query/iters.go

+43-24
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ import (
1010
"sync"
1111

1212
"github.com/grafana/dskit/multierror"
13-
"github.com/opentracing/opentracing-go"
14-
"github.com/opentracing/opentracing-go/log"
1513
"github.com/segmentio/parquet-go"
14+
"go.opentelemetry.io/otel"
15+
"go.opentelemetry.io/otel/attribute"
16+
"go.opentelemetry.io/otel/trace"
1617

1718
"github.com/grafana/phlare/pkg/iter"
1819
)
@@ -39,6 +40,10 @@ type RowNumberWithDefinitionLevel struct {
3940
DefinitionLevel int
4041
}
4142

43+
func (r *RowNumberWithDefinitionLevel) String() string {
44+
return fmt.Sprintf("%v:%v", r.RowNumber, r.DefinitionLevel)
45+
}
46+
4247
// EmptyRowNumber creates an empty invalid row number.
4348
func EmptyRowNumber() RowNumber {
4449
return RowNumber{-1, -1, -1, -1, -1, -1}
@@ -369,6 +374,14 @@ func (r *IteratorResult) Columns(buffer [][]parquet.Value, names ...string) [][]
369374
return buffer
370375
}
371376

377+
func (r *IteratorResult) String() string {
378+
if r == nil {
379+
return "nil"
380+
}
381+
return fmt.Sprintf("rowNum=%d entries=%+#v", r.RowNumber[0], r.ToMap())
382+
383+
}
384+
372385
// iterator - Every iterator follows this interface and can be composed.
373386
type Iterator = iter.SeekIterator[*IteratorResult, RowNumberWithDefinitionLevel]
374387

@@ -776,7 +789,7 @@ type SyncIterator struct {
776789
// Status
777790
ctx context.Context
778791
cancel func()
779-
span opentracing.Span
792+
span trace.Span
780793
metrics *Metrics
781794
curr RowNumber
782795
currRowGroup parquet.RowGroup
@@ -833,10 +846,12 @@ func NewSyncIterator(ctx context.Context, rgs []parquet.RowGroup, column int, co
833846
rn.Skip(rg.NumRows())
834847
}
835848

836-
span, ctx := opentracing.StartSpanFromContext(ctx, "syncIterator", opentracing.Tags{
837-
"columnIndex": column,
838-
"column": columnName,
839-
})
849+
tr := otel.Tracer("query")
850+
851+
ctx, span := tr.Start(ctx, "syncIterator", trace.WithAttributes(
852+
attribute.String("column", columnName),
853+
attribute.Int("columnIndex", column),
854+
))
840855

841856
ctx, cancel := context.WithCancel(ctx)
842857

@@ -1010,11 +1025,12 @@ func (c *SyncIterator) seekPages(seekTo RowNumber, definitionLevel int) (done bo
10101025
return true, err
10111026
}
10121027
c.metrics.pageReadsTotal.WithLabelValues(c.table, c.columnName).Add(1)
1013-
c.span.LogFields(
1014-
log.String("msg", "reading page (seekPages)"),
1015-
log.Int64("page_num_values", pg.NumValues()),
1016-
log.Int64("page_size", pg.Size()),
1017-
)
1028+
c.span.AddEvent(
1029+
"read page (seekPages)",
1030+
trace.WithAttributes(
1031+
attribute.Int64("page_num_values", pg.NumValues()),
1032+
attribute.Int64("page_size", pg.Size()),
1033+
))
10181034

10191035
// Skip based on row number?
10201036
newRN := c.curr
@@ -1079,11 +1095,12 @@ func (c *SyncIterator) next() (RowNumber, *parquet.Value, error) {
10791095
return EmptyRowNumber(), nil, err
10801096
}
10811097
c.metrics.pageReadsTotal.WithLabelValues(c.table, c.columnName).Add(1)
1082-
c.span.LogFields(
1083-
log.String("msg", "reading page (next)"),
1084-
log.Int64("page_num_values", pg.NumValues()),
1085-
log.Int64("page_size", pg.Size()),
1086-
)
1098+
c.span.AddEvent(
1099+
"read page (next)",
1100+
trace.WithAttributes(
1101+
attribute.Int64("page_num_values", pg.NumValues()),
1102+
attribute.Int64("page_size", pg.Size()),
1103+
))
10871104

10881105
if c.filter != nil && !c.filter.KeepPage(pg) {
10891106
// This page filtered out
@@ -1202,12 +1219,14 @@ func (c *SyncIterator) Close() error {
12021219
c.cancel()
12031220
c.closeCurrRowGroup()
12041221

1205-
c.span.SetTag("inspectedColumnChunks", c.filter.InspectedColumnChunks.Load())
1206-
c.span.SetTag("inspectedPages", c.filter.InspectedPages.Load())
1207-
c.span.SetTag("inspectedValues", c.filter.InspectedValues.Load())
1208-
c.span.SetTag("keptColumnChunks", c.filter.KeptColumnChunks.Load())
1209-
c.span.SetTag("keptPages", c.filter.KeptPages.Load())
1210-
c.span.SetTag("keptValues", c.filter.KeptValues.Load())
1211-
c.span.Finish()
1222+
c.span.SetAttributes(attribute.Int64("inspectedColumnChunks", c.filter.InspectedColumnChunks.Load()))
1223+
/*
1224+
c.span.SetTag("inspectedPages", c.filter.InspectedPages.Load())
1225+
c.span.SetTag("inspectedValues", c.filter.InspectedValues.Load())
1226+
c.span.SetTag("keptColumnChunks", c.filter.KeptColumnChunks.Load())
1227+
c.span.SetTag("keptPages", c.filter.KeptPages.Load())
1228+
c.span.SetTag("keptValues", c.filter.KeptValues.Load())
1229+
*/
1230+
c.span.End()
12121231
return nil
12131232
}

‎pkg/phlaredb/query/iters_test.go

+118-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7+
"log"
78
"math"
89
"os"
910
"testing"
@@ -12,6 +13,13 @@ import (
1213
"github.com/prometheus/client_golang/prometheus/testutil"
1314
"github.com/segmentio/parquet-go"
1415
"github.com/stretchr/testify/require"
16+
"go.opentelemetry.io/otel"
17+
"go.opentelemetry.io/otel/attribute"
18+
"go.opentelemetry.io/otel/exporters/jaeger"
19+
"go.opentelemetry.io/otel/sdk/resource"
20+
tracesdk "go.opentelemetry.io/otel/sdk/trace"
21+
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
22+
"go.opentelemetry.io/otel/trace"
1523
)
1624

1725
const MaxDefinitionLevel = 5
@@ -368,7 +376,109 @@ func createFileWith[T any](t testing.TB, rows []T, rowGroups int) *parquet.File
368376
return pf
369377
}
370378

379+
type iteratorTracer struct {
380+
it Iterator
381+
span trace.Span
382+
name string
383+
nextCount uint32
384+
seekCount uint32
385+
}
386+
387+
func (i iteratorTracer) Next() bool {
388+
i.nextCount++
389+
posBefore := i.it.At()
390+
result := i.it.Next()
391+
posAfter := i.it.At()
392+
i.span.AddEvent("next", trace.WithAttributes(
393+
attribute.String("column", i.name),
394+
attribute.Bool("result", result),
395+
attribute.Stringer("posBefore", posBefore),
396+
attribute.Stringer("posAfter", posAfter),
397+
))
398+
return result
399+
}
400+
401+
func (i iteratorTracer) At() *IteratorResult {
402+
return i.it.At()
403+
}
404+
405+
func (i iteratorTracer) Err() error {
406+
return i.it.Err()
407+
}
408+
409+
func (i iteratorTracer) Close() error {
410+
return i.it.Close()
411+
}
412+
413+
func (i iteratorTracer) Seek(pos RowNumberWithDefinitionLevel) bool {
414+
i.seekCount++
415+
posBefore := i.it.At()
416+
result := i.it.Seek(pos)
417+
posAfter := i.it.At()
418+
i.span.AddEvent("seek", trace.WithAttributes(
419+
attribute.String("column", i.name),
420+
attribute.Bool("result", result),
421+
attribute.Stringer("seekTo", &pos),
422+
attribute.Stringer("posBefore", posBefore),
423+
attribute.Stringer("posAfter", posAfter),
424+
))
425+
return result
426+
}
427+
428+
func newIteratorTracer(span trace.Span, name string, it Iterator) Iterator {
429+
return &iteratorTracer{
430+
span: span,
431+
name: name,
432+
it: it,
433+
}
434+
}
435+
436+
// tracerProvider returns an OpenTelemetry TracerProvider configured to use
437+
// the Jaeger exporter that will send spans to the provided url. The returned
438+
// TracerProvider will also use a Resource configured with all the information
439+
// about the application.
440+
func tracerProvider(url string) (*tracesdk.TracerProvider, error) {
441+
// Create the Jaeger exporter
442+
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
443+
if err != nil {
444+
return nil, err
445+
}
446+
tp := tracesdk.NewTracerProvider(
447+
// Always be sure to batch in production.
448+
tracesdk.WithBatcher(exp),
449+
// Record information about this application in a Resource.
450+
tracesdk.WithResource(resource.NewWithAttributes(
451+
semconv.SchemaURL,
452+
semconv.ServiceName("phlare-go-test"),
453+
)),
454+
)
455+
return tp, nil
456+
}
457+
458+
func TestMain(m *testing.M) {
459+
tp, err := tracerProvider("http://localhost:14268/api/traces")
460+
if err != nil {
461+
log.Fatal(err)
462+
}
463+
464+
// Register our TracerProvider as the global so any imported
465+
// instrumentation in the future will default to using it.
466+
otel.SetTracerProvider(tp)
467+
468+
result := m.Run()
469+
470+
fmt.Println("shutting tracer down")
471+
tp.Shutdown(context.Background())
472+
473+
os.Exit(result)
474+
}
475+
371476
func TestBinaryJoinIterator(t *testing.T) {
477+
tr := otel.Tracer("query")
478+
479+
_, span := tr.Start(context.Background(), "TestBinaryJoinIterator")
480+
defer span.End()
481+
372482
rowCount := 1600
373483
pf := createProfileLikeFile(t, rowCount)
374484

@@ -438,8 +548,11 @@ func TestBinaryJoinIterator(t *testing.T) {
438548
metrics.pageReadsTotal.WithLabelValues("ts", "TimeNanos").Add(0)
439549
ctx = AddMetricsToContext(ctx, metrics)
440550

441-
seriesIt := NewSyncIterator(ctx, pf.RowGroups(), 0, "SeriesId", 1000, tc.seriesPredicate, "SeriesId")
442-
timeIt := NewSyncIterator(ctx, pf.RowGroups(), 1, "TimeNanos", 1000, tc.timePredicate, "TimeNanos")
551+
seriesIt := newIteratorTracer(span, "SeriesID", NewSyncIterator(ctx, pf.RowGroups(), 0, "SeriesId", 1000, tc.seriesPredicate, "SeriesId"))
552+
timeIt := newIteratorTracer(span, "TimeNanos", NewSyncIterator(ctx, pf.RowGroups(), 1, "TimeNanos", 1000, tc.timePredicate, "TimeNanos"))
553+
554+
ctx, span := tr.Start(ctx, t.Name())
555+
defer span.End()
443556

444557
it := NewBinaryJoinIterator(
445558
0,
@@ -449,6 +562,9 @@ func TestBinaryJoinIterator(t *testing.T) {
449562

450563
results := 0
451564
for it.Next() {
565+
span.AddEvent("match", trace.WithAttributes(
566+
attribute.Stringer("element", it.At()),
567+
))
452568
results++
453569
}
454570
require.NoError(t, it.Err())

0 commit comments

Comments
 (0)
Failed to load comments.