Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Preserve Jaeger RefType when storing/retrieving traces.
Browse files Browse the repository at this point in the history
Jaeger, like OpenTracing, uses identifies the type of references between
Spans. There are only two types of references `CHILD_OF` and
`FOLLOWS_FROM`.

OTEL doesn't have an analogous to Jaeger reference type in their link
specification. An OTEL span can have only one parent, and that's set as
an attributed of the span instead of an OTEL link.

When using the Jaeger to OTEL translator the first reference with type
`CHILD_OF` is used to set the OTEL span parent attribute, all other
references are converted to links, and since there is no notion of
reference type that information is lost. On the reverse transformation,
OTEL to Jaeger, the OTEL Span parent is converted to a reference with
`CHILD_OF` type, while all the other OTEL links are converted to
`FOLLOWS_FROM` references.

The problem is that Jaeger, unlike OTEL, supports the notion of multiple
parents (one use case is fork/join workloads), running this multi-parent
through the translator and back returns a span with a single parent with
all the other parents turned into `FOLLOWS_FROM` references.

There's an open PR in the translator to keep this information by adding
the type as attribute to the links following the semantic convention for
OpenTracing compatibility:

https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/compatibility/#OpenTracing
open-telemetry/opentelemetry-collector-contrib#14463

While that gets merge we are going to be manually setting the reference
type as an attribute on the links when writing traces. On the retrieving
traces side, we'll be using those attributes to set the corresponding
reference type when constructing the Jaeger traces responses.
  • Loading branch information
alejandrodnm committed Oct 14, 2022
1 parent d0fc6f2 commit beea06b
Show file tree
Hide file tree
Showing 12 changed files with 401 additions and 61 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ We use the following categories for changes:
ingested using the native Jaeger API.
- Fix traces queries returning duplicated events and links (or logs and
references in Jaeger) for traces with more than one event and one link.
- Fix incorrect reference types when retrieving Jaeger traces with multiple
parent references [#1681].

## [0.14.0] - 2022-08-30

Expand Down
4 changes: 3 additions & 1 deletion pkg/jaeger/store/binary_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ const MEDIA_TYPE_ENCODED_BINARY_LEN = len(MEDIA_TYPE_ENCODED_BINARY)
// removing the prefix and decoding the base64 string.
func decodeSpanBinaryTags(span *model.Span) {
decodeBinaryTags(span.Tags)
decodeBinaryTags(span.Process.Tags)
if span.Process != nil {
decodeBinaryTags(span.Process.Tags)
}
for _, log := range span.Logs {
decodeBinaryTags(log.Fields)
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/jaeger/store/find_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
jaegertranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"github.com/timescale/promscale/pkg/pgxconn"
"go.opentelemetry.io/collector/pdata/ptrace"
)
Expand Down Expand Up @@ -49,7 +48,7 @@ func scanTraces(rows pgxconn.PgxRows) ([]*model.Trace, error) {
return nil, fmt.Errorf("trace row iterator: %w", rows.Err())
}

batch, err := jaegertranslator.ProtoFromTraces(traces)
batch, err := ProtoFromTraces(traces)
if err != nil {
return nil, fmt.Errorf("internal-traces-to-jaeger-proto: %w", err)
}
Expand All @@ -74,7 +73,6 @@ func batchSliceToTraceSlice(bSlice []*model.Batch) []*model.Trace {
}
//copy over the process from the batch
span.Process = batch.Process
decodeSpanBinaryTags(span)
trace.Spans = append(trace.Spans, span)
}
}
Expand Down
11 changes: 1 addition & 10 deletions pkg/jaeger/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ import (
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"

jaegertranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"

"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgmodel/ingestor"
"github.com/timescale/promscale/pkg/pgmodel/metrics"
Expand Down Expand Up @@ -50,13 +47,7 @@ func (p *Store) StreamingSpanWriter() spanstore.Writer {
}

func (p *Store) WriteSpan(ctx context.Context, span *model.Span) error {
encodeBinaryTags(span)
batches := []*model.Batch{
{
Spans: []*model.Span{span},
},
}
traces, err := jaegertranslator.ProtoToTraces(batches)
traces, err := ProtoToTraces(span)
if err != nil {
return err
}
Expand Down
19 changes: 16 additions & 3 deletions pkg/jaeger/store/trace_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,22 @@ import (
)

const (
// A lateral join is used for the `link` table instead of using directly
// `array_agg` calls in the main SELECT clause to avoid returning duplicated
// values when the cartesian product of event x link is greater than 1.
// A lateral join is used for the `link` and `event` table instead of using
// directly `array_agg` calls in the main SELECT clause to avoid returning
// duplicated values when the cartesian product of event x link is greater
// than 1.
//
// The GROUP BY clause of the query can be removed because we are not doing
// any kind of aggregation that would require it, but the tests we did showed
// that with the GROUP BY the resulting query plan is better. In the
// no-group-by case the join between the trace_ids with clause and the
// main _ps_trace.span table happens as a hash_join where both sides scan
// the span table.
//
// Query plans:
//
// - With GROUP BY https://explain.dalibo.com/plan/e6c74995bc36begd
// - Without GROUP BY https://explain.dalibo.com/plan/f09259cd21g57dh3
completeTraceSQLFormat = `
SELECT
s.trace_id,
Expand Down
166 changes: 166 additions & 0 deletions pkg/jaeger/store/translation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@
package store

import (
"encoding/binary"
"fmt"

"github.com/jaegertracing/jaeger/model"
jaegertranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
conventions "go.opentelemetry.io/collector/semconv/v1.9.0"
)

var (
Expand Down Expand Up @@ -79,3 +84,164 @@ func jSpanKindToInternal(spanKind string) string {
}
return "unspecified"
}

func ProtoToTraces(span *model.Span) (ptrace.Traces, error) {
encodeBinaryTags(span)
batches := []*model.Batch{
{
Spans: []*model.Span{span},
},
}
traces, err := jaegertranslator.ProtoToTraces(batches)
if err != nil {
return ptrace.NewTraces(), err
}

// TODO: There's an open PR against the Jaeger translator that adds support
// for keeping the RefType. Once the PR is merged we can remove the following
// if condition and the addRefTypeAttributeToLinks function.
//
// https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/14463
if len(span.References) > 1 {
links := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Links()
addRefTypeAttributeToLinks(span, links)
}

return traces, nil
}

// TODO: There's an open PR against the Jaeger translator that adds support
// for keeping the RefType. Once the PR is merged we can delete this function.
//
// https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/14463
//
// addRefTypeAttributeToLinks adds the RefType of the Jaeger Span references as
// an attribute to their corresponding OTEL links. The `links` argument must
// be the OTEL representation of the given `span.References`.
//
// The added attributes follow the OpenTracing to OTEL semantic convention
// https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/compatibility/#opentracing
func addRefTypeAttributeToLinks(span *model.Span, links ptrace.SpanLinkSlice) {

// The reference to the parent span is stored directly as an attribute
// of the Span and not as a Link.
parentsSpanID := span.ParentSpanID()
otherParentsSpanIDs := map[model.SpanID]struct{}{}

// Since there are only 2 types of refereces, ChildOf and FollowsFrom, we
// keep track only of the former.
for _, ref := range span.References {
if ref.RefType == model.ChildOf && ref.SpanID != parentsSpanID {
otherParentsSpanIDs[ref.SpanID] = struct{}{}
}
}

for i := 0; i < links.Len(); i++ {
link := links.At(i)
spanID := spanIDToJaegerProto(link.SpanID().Bytes())

// Everything that's not ChildOf will be set as FollowsFrom.
if _, ok := otherParentsSpanIDs[spanID]; ok {
link.Attributes().InsertString(
conventions.AttributeOpentracingRefType,
conventions.AttributeOpentracingRefTypeChildOf,
)
continue
}
link.Attributes().InsertString(
conventions.AttributeOpentracingRefType,
conventions.AttributeOpentracingRefTypeFollowsFrom,
)
}
}

func ProtoFromTraces(traces ptrace.Traces) ([]*model.Batch, error) {
batches, err := jaegertranslator.ProtoFromTraces(traces)
if err != nil {
return nil, fmt.Errorf("internal-traces-to-jaeger-proto: %w", err)
}
otherParents := getOtherParents(traces)
for _, batch := range batches {
for _, span := range batch.GetSpans() {
decodeSpanBinaryTags(span)

// TODO: There's an open PR against the Jaeger translator that adds
// support for keeping the RefType. Once the PR is merged we can remove
// the following if condition and the getOtherParents function.
//
// https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/14463
if pIdxs, ok := otherParents[span.SpanID]; ok {
refs := span.GetReferences()
for _, i := range pIdxs {
refs[i].RefType = model.ChildOf
}
}
}
}
return batches, nil
}

// getOtherParents returns a map where the keys are the IDs of Spans that have
// more than one parent and the values are the position in the Span.References
// list where those other parents references are.
//
// A parent is a link that has the `child_of` attribute defined in the semantic
// convention for opentracing:
//
// https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/compatibility/#opentracing
//
// It tracks the position instead of the SpanID because there might multiple
// links to the same SpanID but different RefTypes.
func getOtherParents(traces ptrace.Traces) map[model.SpanID][]int {
otherParents := map[model.SpanID][]int{}

resourceSpans := traces.ResourceSpans()
for i := 0; i < resourceSpans.Len(); i++ {
rSpan := resourceSpans.At(i)
sSpans := rSpan.ScopeSpans()
for j := 0; j < sSpans.Len(); j++ {
sSpan := sSpans.At(j)
spans := sSpan.Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
links := span.Links()

// We need an offset because if the span has a ParentSpanID, then
// that's going to be the first link when translating from OTEL to
// Jaeger. We could say that is it doesn't have a ParentSpanID then
// it shouldn't have other parents, but just to be extra safe we
// inspect the attributes even if there's no ParentSpanID set.
offset := 0
if !span.ParentSpanID().IsEmpty() {
offset = 1
}
for l := 0; l < links.Len(); l++ {
link := links.At(l)
v, ok := link.Attributes().Get(conventions.AttributeOpentracingRefType)
if !ok || v.StringVal() != conventions.AttributeOpentracingRefTypeChildOf {
continue
}
spanID := spanIDToJaegerProto(span.SpanID().Bytes())
pIdxs, ok := otherParents[spanID]
if !ok {
pIdxs = []int{}
otherParents[spanID] = pIdxs
}
otherParents[spanID] = append(pIdxs, l+offset)
}
}
}
}
return otherParents
}

func uInt64ToSpanID(id uint64) pcommon.SpanID {
spanID := [8]byte{}
binary.BigEndian.PutUint64(spanID[:], id)
return pcommon.NewSpanID(spanID)
}

// SpanIDToUInt64 converts the pcommon.SpanID to uint64 representation.
func spanIDToJaegerProto(rawSpanID [8]byte) model.SpanID {
return model.SpanID(binary.BigEndian.Uint64(rawSpanID[:]))
}
Loading

0 comments on commit beea06b

Please sign in to comment.