Skip to content

Commit

Permalink
Handles group by traceID and adds some tests
Browse files Browse the repository at this point in the history
Signed-off-by: Navin Shrinivas <karupal2002@gmail.com>
  • Loading branch information
NavinShrinivas committed Jan 30, 2024
1 parent ba18f5a commit 02c039f
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 2 deletions.
14 changes: 13 additions & 1 deletion cmd/query/app/apiv3/otlp_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func modelToOTLP(spans []*model.Span) ([]*tracev1.ResourceSpans, error) {
return chunk.ResourceSpans, nil
}

func OTLP2model(OTLPSpans []byte) ([]model.Trace, error) {
func OTLP2model(OTLPSpans []byte) ([]*model.Batch, error) {
ptraceUnmarshaler := ptrace.JSONUnmarshaler{}
otlpTraces, err := ptraceUnmarshaler.UnmarshalTraces(OTLPSpans)
if err != nil {
Expand All @@ -59,12 +59,24 @@ func OTLP2model(OTLPSpans []byte) ([]model.Trace, error) {
fmt.Println(err)
return nil, fmt.Errorf("cannot transform OTLP to Jaeger: %w", err)
}

return jaegerBatches, nil
}

func BatchesToTraces(jaegerBatches []*model.Batch) ([]model.Trace, error) {
var jaegerTraces []model.Trace
spanMap := make(map[model.TraceID][]*model.Span)
for _, v := range jaegerBatches {
jaegerTrace := model.Trace{
Spans: v.Spans,
}
jaegerTrace.DenormalizeProcess(v.Process)
jaegerTrace.FlattenToSpansMaps(spanMap)
}
for _, v := range spanMap {
jaegerTrace := model.Trace{
Spans: v,
}
jaegerTraces = append(jaegerTraces, jaegerTrace)
}
return jaegerTraces, nil
Expand Down
58 changes: 58 additions & 0 deletions cmd/query/app/apiv3/package_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,67 @@ package apiv3
import (
"testing"

"github.com/jaegertracing/jaeger/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}

func TestBatchesToTraces(t *testing.T) {
b1 := &model.Batch{
Spans: []*model.Span{
{TraceID: model.NewTraceID(1, 2), SpanID: model.NewSpanID(1), OperationName: "x"},
{TraceID: model.NewTraceID(1, 3), SpanID: model.NewSpanID(2), OperationName: "y"},
},
Process: model.NewProcess("process1", model.KeyValues{}),
}

b2 := &model.Batch{
Spans: []*model.Span{
{TraceID: model.NewTraceID(1, 2), SpanID: model.NewSpanID(2), OperationName: "z"},
},
Process: model.NewProcess("process2", model.KeyValues{}),
}

mainBatch := []*model.Batch{b1, b2}

traces, err := BatchesToTraces(mainBatch)
require.Nil(t, err)

s1 := []*model.Span{
{
TraceID: model.NewTraceID(1, 2),
SpanID: model.NewSpanID(1),
OperationName: "x",
Process: model.NewProcess("process1", model.KeyValues{}),
},
{
TraceID: model.NewTraceID(1, 2),
SpanID: model.NewSpanID(2),
OperationName: "z",
Process: model.NewProcess("process2", model.KeyValues{}),
},
}

s2 := []*model.Span{
{
TraceID: model.NewTraceID(1, 3),
SpanID: model.NewSpanID(2),
OperationName: "y",
Process: model.NewProcess("process1", model.KeyValues{}),
},
}

t1 := model.Trace{
Spans: s1,
}
t2 := model.Trace{
Spans: s2,
}
mainTrace := []model.Trace{t1, t2}
assert.Equal(t, mainTrace, traces)
}
8 changes: 7 additions & 1 deletion cmd/query/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,13 @@ func (aH *APIHandler) transformOTLP(w http.ResponseWriter, r *http.Request) {
}

var uiErrors []structuredError
traces, err := apiv3.OTLP2model(body)
batches, err := apiv3.OTLP2model(body)

if aH.handleError(w, err, http.StatusInternalServerError) {
return
}

traces, err := apiv3.BatchesToTraces(batches)

if aH.handleError(w, err, http.StatusInternalServerError) {
return
Expand Down
12 changes: 12 additions & 0 deletions model/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,15 @@ func (m *Trace) DenormalizeProcess(p *Process) {
v.Process = p
}
}

func (m *Trace) FlattenToSpansMaps(spanMap map[TraceID][]*Span) {
for _, v := range m.Spans {
val, ok := spanMap[v.TraceID]
if !ok {
spanMap[v.TraceID] = []*Span{v}
} else {
spanMap[v.TraceID] = append(val, v)
}
}

}
65 changes: 65 additions & 0 deletions model/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,68 @@ func TestTraceNormalizeTimestamps(t *testing.T) {
assert.Equal(t, span.StartTime, tt1.UTC())
assert.Equal(t, span.Logs[0].Timestamp, tt2.UTC())
}

func TestFlattenToSpanMaps(t *testing.T) {
b1 := &model.Trace{
Spans: []*model.Span{
{TraceID: model.NewTraceID(1, 2), SpanID: model.NewSpanID(1), OperationName: "x"},
{TraceID: model.NewTraceID(1, 3), SpanID: model.NewSpanID(2), OperationName: "y"},
},
}

b2 := &model.Trace{
Spans: []*model.Span{
{TraceID: model.NewTraceID(1, 2), SpanID: model.NewSpanID(2), OperationName: "z"},
},
}

t1 := []*model.Span{
{TraceID: model.NewTraceID(1, 2), SpanID: model.NewSpanID(1), OperationName: "x"},
{TraceID: model.NewTraceID(1, 2), SpanID: model.NewSpanID(2), OperationName: "z"}}

t2 := []*model.Span{{TraceID: model.NewTraceID(1, 3), SpanID: model.NewSpanID(2), OperationName: "y"}}
spanMap := make(map[model.TraceID][]*model.Span)
b1.FlattenToSpansMaps(spanMap)
b2.FlattenToSpansMaps(spanMap)
assert.Equal(t, t1, spanMap[model.NewTraceID(1, 2)])
assert.Equal(t, t2, spanMap[model.NewTraceID(1, 3)])
}


func TestDenormalizeProcess(t *testing.T){
t1 := &model.Trace{
Spans: []*model.Span{
{
TraceID: model.NewTraceID(1, 2),
SpanID: model.NewSpanID(1),
OperationName: "x",
},
{
TraceID: model.NewTraceID(1, 3),
SpanID: model.NewSpanID(2),
OperationName: "y",
},
},
}
p1 := model.NewProcess("process1", model.KeyValues{})

t2 := &model.Trace{
Spans: []*model.Span{
{
TraceID: model.NewTraceID(1, 2),
SpanID: model.NewSpanID(1),
OperationName: "x",
Process: model.NewProcess("process1", model.KeyValues{}),
},
{
TraceID: model.NewTraceID(1, 3),
SpanID: model.NewSpanID(2),
OperationName: "y",
Process: model.NewProcess("process1", model.KeyValues{}),
},
},
}
t1.DenormalizeProcess(p1)
assert.Equal(t, t1, t2)

}

0 comments on commit 02c039f

Please sign in to comment.