Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Preserve Jaeger RefType when storing/retrieving traces.
Browse files Browse the repository at this point in the history
Jaeger, like OpenTracing, uses identifies the type of references between
Spans. There are only two types of references `CHILD_OF` and
`FOLLOWS_FROM`.

OTEL doesn't have an analogous to Jaeger reference type in their link
specification. An OTEL span can have only one parent, and that's set as
an attributed of the span instead of an OTEL link.

When using the Jaeger to OTEL translator the first reference with type
`CHILD_OF` is used to set the OTEL span parent attribute, all other
references are converted to links, and since there is no notion of
reference type that information is lost. On the reverse transformation,
OTEL to Jaeger, the OTEL Span parent is converted to a reference with
`CHILD_OF` type, while all the other OTEL links are converted to
`FOLLOWS_FROM` references.

The problem is that Jaeger, unlike OTEL, supports the notion of multiple
parents (one use case is fork/join workloads), running this multi-parent
through the translator and back returns a span with a single parent with
all the other parents turned into `FOLLOWS_FROM` references.

There's an open PR in the translator to keep this information by adding
the type as attribute to the links following the semantic convention for
OpenTracing compatibility:

https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/compatibility/#OpenTracing
open-telemetry/opentelemetry-collector-contrib#14463

While that gets merge we are going to be manually setting the reference
type as an attribute on the links when writing traces. On the retrieving
traces side, we'll be using those attributes to set the corresponding
reference type when constructing the Jaeger traces responses.
  • Loading branch information
alejandrodnm committed Sep 30, 2022
1 parent a54d3eb commit 934a18f
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 25 deletions.
74 changes: 72 additions & 2 deletions pkg/jaeger/store/find_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
jaegertranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"github.com/timescale/promscale/pkg/pgxconn"
"go.opentelemetry.io/collector/pdata/ptrace"
conventions "go.opentelemetry.io/collector/semconv/v1.9.0"
)

func findTraces(ctx context.Context, builder *Builder, conn pgxconn.PgxConn, q *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
Expand Down Expand Up @@ -54,10 +55,10 @@ func scanTraces(rows pgxconn.PgxRows) ([]*model.Trace, error) {
return nil, fmt.Errorf("internal-traces-to-jaeger-proto: %w", err)
}

return batchSliceToTraceSlice(batch), nil
return batchSliceToTraceSlice(batch, getOtherParents(traces)), nil
}

func batchSliceToTraceSlice(bSlice []*model.Batch) []*model.Trace {
func batchSliceToTraceSlice(bSlice []*model.Batch, otherParents map[model.SpanID][]int) []*model.Trace {
// Mostly Copied from Jaeger's grpc_client.go
// https://github.com/jaegertracing/jaeger/blob/067dff713ab635ade66315bbd05518d7b28f40c6/plugin/storage/grpc/shared/grpc_client.go#L179
traces := make([]*model.Trace, 0)
Expand All @@ -75,8 +76,77 @@ func batchSliceToTraceSlice(bSlice []*model.Batch) []*model.Trace {
//copy over the process from the batch
span.Process = batch.Process
decodeSpanBinaryTags(span)

// TODO: There's an open PR against the Jaeger translator that adds
// support for keeping the RefType. Once the PR is merged we can remove
// the following if condition and the getOtherParents function.
//
// https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/14463
if pIdxs, ok := otherParents[span.SpanID]; ok {
refs := span.GetReferences()
for _, i := range pIdxs {
refs[i].RefType = model.ChildOf
}
}
trace.Spans = append(trace.Spans, span)
}
}
return traces
}

// getOtherParents returns a map where the keys are the IDs of Spans that have
// more than one parent and the values are the position in the Span.References
// list where those other parents references are.
//
// A parent is a link that has the `child_of` attribute defined in the semantic
// convention for opentracing:
//
// https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/compatibility/#opentracing
//
// It tracks the position instead of the SpanID because there might multiple
// links to the same SpanID but different RefTypes.
func getOtherParents(traces ptrace.Traces) map[model.SpanID][]int {
otherParents := map[model.SpanID][]int{}

resourceSpans := traces.ResourceSpans()
for i := 0; i < resourceSpans.Len(); i++ {
rSpan := resourceSpans.At(i)
sSpans := rSpan.ScopeSpans()
for j := 0; j < sSpans.Len(); j++ {
sSpan := sSpans.At(j)
spans := sSpan.Spans()
if spans.Len() == 0 {
continue
}
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
links := span.Links()

// We need an offset because if the span has a ParentSpanID, then
// that's going to be the first link when translating from OTEL to
// Jaeger. We could say that is it doesn't have a ParentSpanID then
// it shouldn't have other parents, but just to be extra safe we
// inspect the attributes even if there's no ParentSpanID set.
offset := 0
if !span.ParentSpanID().IsEmpty() {
offset = 1
}
for l := 0; l < links.Len(); l++ {
link := links.At(l)
v, ok := link.Attributes().Get(conventions.AttributeOpentracingRefType)
if !ok || v.StringVal() != conventions.AttributeOpentracingRefTypeChildOf {
continue
}
spanID := spanIDToJaegerProto(span.SpanID().Bytes())
pIdxs, ok := otherParents[spanID]
if !ok {
pIdxs = []int{}
otherParents[spanID] = pIdxs
}
otherParents[spanID] = append(pIdxs, l+offset)
}
}
}
}
return otherParents
}
64 changes: 64 additions & 0 deletions pkg/jaeger/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package store

import (
"context"
"encoding/binary"
"time"

"github.com/pkg/errors"
Expand All @@ -16,6 +17,8 @@ import (
"github.com/jaegertracing/jaeger/storage/spanstore"

jaegertranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/pdata/ptrace"
conventions "go.opentelemetry.io/collector/semconv/v1.9.0"

"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgmodel/ingestor"
Expand Down Expand Up @@ -60,9 +63,70 @@ func (p *Store) WriteSpan(ctx context.Context, span *model.Span) error {
if err != nil {
return err
}

// TODO: There's an open PR against the Jaeger translator that adds support
// for keeping the RefType. Once the PR is merged we can remove the following
// if condition and the addRefTypeAttributeToLinks function.
//
// https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/14463
if len(span.References) > 1 {
links := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Links()
addRefTypeAttributeToLinks(span, links)
}

return p.inserter.IngestTraces(ctx, traces)
}

// TODO: There's an open PR against the Jaeger translator that adds support
// for keeping the RefType. Once the PR is merged we can delete this function.
//
// https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/14463
//
// addRefTypeAttributeToLinks adds the RefType of the Jaeger Span references as
// an attribute to their corresponding OTEL links. The `links` argument must
// be the OTEL representation of the given `span.References`.
//
// The added attributes follow the OpenTracing to OTEL semantic convention
// https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/compatibility/#opentracing
func addRefTypeAttributeToLinks(span *model.Span, links ptrace.SpanLinkSlice) {

// The reference to the parent span is stored directly as an attribute
// of the Span and not as a Link.
parentsSpanID := span.ParentSpanID()
otherParentsSpanIDs := make(map[model.SpanID]struct{})

// Since there are only 2 types of refereces, ChildOf and FollowsFrom, we
// keep track only of the former.
for _, ref := range span.References {
if ref.RefType == model.ChildOf && ref.SpanID != parentsSpanID {
otherParentsSpanIDs[ref.SpanID] = struct{}{}
}
}

for i := 0; i < links.Len(); i++ {
link := links.At(i)
spanID := spanIDToJaegerProto(link.SpanID().Bytes())

// Everything that's not ChildOf will be set as FollowsFrom.
if _, ok := otherParentsSpanIDs[spanID]; ok {
link.Attributes().InsertString(
conventions.AttributeOpentracingRefType,
conventions.AttributeOpentracingRefTypeChildOf,
)
continue
}
link.Attributes().InsertString(
conventions.AttributeOpentracingRefType,
conventions.AttributeOpentracingRefTypeFollowsFrom,
)
}
}

// SpanIDToUInt64 converts the pcommon.SpanID to uint64 representation.
func spanIDToJaegerProto(rawSpanID [8]byte) model.SpanID {
return model.SpanID(binary.BigEndian.Uint64(rawSpanID[:]))
}

// Close performs graceful shutdown of SpanWriter on Jaeger collector shutdown.
// In our case we have nothing to do
// Noop impl avoid getting GRPC error message when Jaeger collector shuts down.
Expand Down
71 changes: 56 additions & 15 deletions pkg/tests/end_to_end_tests/ingest_trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,20 +194,18 @@ func getOperationsTest(t testing.TB, q *store.Store) {
// format so that they can be used to compare results from Jaeger query
// responses.
type findTraceTestFixtures struct {
traces ptrace.Traces
trace1 *model.Trace
trace2 *model.Trace
traces ptrace.Traces
batches []*model.Batch
trace1 *model.Trace
trace2 *model.Trace
}

func getFindTraceTestFixtures() (findTraceTestFixtures, error) {

traces := testdata.GenerateTestTrace()

fixtureBatch, err := jaegertranslator.ProtoFromTraces(traces.Clone())
func tracesFixturesToBatches(traces ptrace.Traces) ([]*model.Batch, error) {
batches, err := jaegertranslator.ProtoFromTraces(traces)
if err != nil {
return findTraceTestFixtures{}, err
return nil, err
}
for _, b := range fixtureBatch {
for _, b := range batches {
for _, s := range b.Spans {
// ProtoFromTraces doesn't populates span.Process because it is already been exposed by batch.Process.
// See https://github.com/jaegertracing/jaeger-idl/blob/05fe64e9c305526901f70ff692030b388787e388/proto/api_v2/model.proto#L152-L160
Expand All @@ -221,27 +219,70 @@ func getFindTraceTestFixtures() (findTraceTestFixtures, error) {
}
}

for _, span := range batches[0].Spans[4:] {
if span.GetOperationName() != "operationB" {
panic("invalid span")
}
span.GetReferences()[0].RefType = model.ChildOf
}

return batches, nil
}

func getFindTraceTestFixtures() (findTraceTestFixtures, error) {

traces := testdata.GenerateTestTrace()
batches, err := tracesFixturesToBatches(traces.Clone())
if err != nil {
return findTraceTestFixtures{}, err
}

// After passing the traces from testdata.GenerateTestTrace through the
// translator we end up with 2 batches. The first one has 8 spans, the second
// 4. The first 4 spans of the first batch belong to the same trace and the
// other 4 belong to the same trace as all the spans in the second batch.
// other 4 belong to the same trace as all the spans in the second batch,
// meaning that there are 2 traces.
//
// Batches are ordered by Process, the analogous to Resource in OTEL.
//
// Basically::
// batches = [
// [sT1, sT1, sT1, sT1, s1T2, s1T2, s1T2, s1T2],
// [s2T2, s2T2, s2T2, s2T2],
// ]
//
// Note: in the example there are just three types of spans sT1, s1T2 and
// s2T2, that's because each type shares the same attributes they just have
// unique Span IDs.
trace1 := &model.Trace{
Spans: fixtureBatch[0].Spans[:4],
Spans: batches[0].Spans[:4],
ProcessMap: nil,
Warnings: make([]string, 0),
}

trace2Spans := make([]*model.Span, 0)
trace2Spans = append(trace2Spans, fixtureBatch[0].Spans[4:]...)
trace2Spans = append(trace2Spans, fixtureBatch[1].Spans...)
trace2Spans = append(trace2Spans, batches[0].Spans[4:]...)
trace2Spans = append(trace2Spans, batches[1].Spans...)

trace2 := &model.Trace{
Spans: trace2Spans,
ProcessMap: nil,
Warnings: make([]string, 0),
}

return findTraceTestFixtures{traces, trace1, trace2}, nil
// We recreate the batches to have a unique copy that can be
// modified without altering trace1 and trace2
batches, err = tracesFixturesToBatches(traces.Clone())
if err != nil {
return findTraceTestFixtures{}, err
}

return findTraceTestFixtures{
traces,
batches,
trace1,
trace2,
}, nil
}

func findTraceTest(t testing.TB, q *store.Store, fixtures findTraceTestFixtures) {
Expand Down
8 changes: 1 addition & 7 deletions pkg/tests/end_to_end_tests/jaeger_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"

"github.com/jackc/pgx/v4/pgxpool"
jaegertranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"github.com/stretchr/testify/require"
"github.com/timescale/promscale/pkg/jaeger/store"
jaegerstore "github.com/timescale/promscale/pkg/jaeger/store"
Expand All @@ -26,13 +25,8 @@ func TestJaegerSpanIngestion(t *testing.T) {
if err != nil {
require.NoError(t, err)
}
batch, err := jaegertranslator.ProtoFromTraces(fixtures.traces)
require.NoError(t, err)
for _, b := range batch {
for _, b := range fixtures.batches {
for _, s := range b.Spans {
// ProtoFromTraces doesn't populates span.Process because it is already been exposed by batch.Process.
// See https://github.com/jaegertracing/jaeger-idl/blob/05fe64e9c305526901f70ff692030b388787e388/proto/api_v2/model.proto#L152-L160
s.Process = b.Process
err = jaegerStore.SpanWriter().WriteSpan(context.Background(), s)
require.NoError(t, err)
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/tests/testdata/trace_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/golang/snappy"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
conventions "go.opentelemetry.io/collector/semconv/v1.9.0"
)

const TracesEntryCount = 5932 // All entries in the traces-dataset.sz file.
Expand Down Expand Up @@ -41,7 +42,11 @@ var (
},
)
spanEventAttributes = pcommon.NewMapFromRaw(map[string]interface{}{"span-event-attr": "span-event-attr-val"})
spanLinkAttributes = pcommon.NewMapFromRaw(map[string]interface{}{"span-link-attr": "span-link-attr-val"})
spanLinkAttributes = pcommon.NewMapFromRaw(map[string]interface{}{
"span-link-attr": "span-link-attr-val",
// In Jaeger terms this means that the span has 2 parents.
conventions.AttributeOpentracingRefType: conventions.AttributeOpentracingRefTypeChildOf,
})
)

func GetTraceId(bSlice [16]byte) string {
Expand All @@ -60,6 +65,16 @@ func GenerateBrokenTestTraces() ptrace.Traces {
return data
}

// GenerateTestTrace returns 2 traces with the following characteristics:
//
// - There are only 3 span types, in the sense that each span type shares
// the same attributes, but each instance has a unique SpanID. They are
// referenced below as Span1, Span2 and Span3.
// - Trace 1 has 4 spans of type Span1.
// - Trace 2 has 4 spans of type Span2 and 4 of type Span3.
// - Span1 and Span2 belong to the same Resource and InstrumentationScope.
// - Span1 has 2 Events and 2 Links to random SpanIDs.
// - Span2 has 2 Links to random SpanIDs.
func GenerateTestTrace() ptrace.Traces {
rand.Seed(1)
spanCount := 4
Expand Down

0 comments on commit 934a18f

Please sign in to comment.