From 0dc3464f1b5d70e6cdbdf8e5f4244c2720d7e46b Mon Sep 17 00:00:00 2001 From: Navin Shrinivas Date: Fri, 26 Jan 2024 11:09:02 +0530 Subject: [PATCH] Refactoring and fixes for endpoint in backend Signed-off-by: Navin Shrinivas --- cmd/query/app/apiv3/otlp_translator.go | 41 +++++++++------ cmd/query/app/apiv3/package_test.go | 58 +++++++++++++++++++++ cmd/query/app/http_handler.go | 72 ++++++++++++++------------ jaeger-ui | 2 +- model/model.pb.go | 16 ------ model/trace.go | 18 +++++++ model/trace_test.go | 65 +++++++++++++++++++++++ 7 files changed, 206 insertions(+), 66 deletions(-) diff --git a/cmd/query/app/apiv3/otlp_translator.go b/cmd/query/app/apiv3/otlp_translator.go index 44bada7780f..7c8277479e8 100644 --- a/cmd/query/app/apiv3/otlp_translator.go +++ b/cmd/query/app/apiv3/otlp_translator.go @@ -17,7 +17,6 @@ package apiv3 import ( "fmt" - "github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/proto" model2otel "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/pdata/ptrace" @@ -48,25 +47,37 @@ func modelToOTLP(spans []*model.Span) ([]*tracev1.ResourceSpans, error) { return chunk.ResourceSpans, nil } -func OTLP2model(OTLPSpans []byte) ([]model.Trace, error) { - ptrace_unmarshaler := ptrace.JSONUnmarshaler{} - otlp_traces, err := ptrace_unmarshaler.UnmarshalTraces(OTLPSpans) +func OTLP2model(OTLPSpans []byte) ([]*model.Batch, error) { + ptraceUnmarshaler := ptrace.JSONUnmarshaler{} + otlpTraces, err := ptraceUnmarshaler.UnmarshalTraces(OTLPSpans) if err != nil { fmt.Println(err) - return nil, fmt.Errorf("cannot marshal OTLP : %w", err) + return nil, fmt.Errorf("cannot unmarshal OTLP : %w", err) } - batches, err := model2otel.ProtoFromTraces(otlp_traces) - fmt.Println(otlp_traces.ResourceSpans()) + jaegerBatches, err := model2otel.ProtoFromTraces(otlpTraces) if err != nil { fmt.Println(err) - return nil, fmt.Errorf("cannot marshal OTLP : %w", err) + return nil, fmt.Errorf("cannot transform OTLP to Jaeger: %w", err) } - jaeger_traces := make([]model.Trace, len(batches) ) - for _, v := range batches { - mar := jsonpb.Marshaler{} - fmt.Println(mar.MarshalToString(v)) - jaeger_trace := v.ConvertToTraces() - jaeger_traces = append(jaeger_traces, *jaeger_trace) + + return jaegerBatches, nil +} + +func BatchesToTraces(jaegerBatches []*model.Batch) ([]model.Trace, error) { + var jaegerTraces []model.Trace + spanMap := make(map[model.TraceID][]*model.Span) + for _, v := range jaegerBatches { + jaegerTrace := model.Trace{ + Spans: v.Spans, + } + jaegerTrace.DenormalizeProcess(v.Process) + jaegerTrace.FlattenToSpansMaps(spanMap) + } + for _, v := range spanMap { + jaegerTrace := model.Trace{ + Spans: v, + } + jaegerTraces = append(jaegerTraces, jaegerTrace) } - return jaeger_traces, nil + return jaegerTraces, nil } diff --git a/cmd/query/app/apiv3/package_test.go b/cmd/query/app/apiv3/package_test.go index c56173a61a1..61051eefc8b 100644 --- a/cmd/query/app/apiv3/package_test.go +++ b/cmd/query/app/apiv3/package_test.go @@ -6,9 +6,67 @@ package apiv3 import ( "testing" + "github.com/jaegertracing/jaeger/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/goleak" ) func TestMain(m *testing.M) { goleak.VerifyTestMain(m) } + +func TestBatchesToTraces(t *testing.T) { + b1 := &model.Batch{ + Spans: []*model.Span{ + {TraceID: model.NewTraceID(1, 2), SpanID: model.NewSpanID(1), OperationName: "x"}, + {TraceID: model.NewTraceID(1, 3), SpanID: model.NewSpanID(2), OperationName: "y"}, + }, + Process: model.NewProcess("process1", model.KeyValues{}), + } + + b2 := &model.Batch{ + Spans: []*model.Span{ + {TraceID: model.NewTraceID(1, 2), SpanID: model.NewSpanID(2), OperationName: "z"}, + }, + Process: model.NewProcess("process2", model.KeyValues{}), + } + + mainBatch := []*model.Batch{b1, b2} + + traces, err := BatchesToTraces(mainBatch) + require.Nil(t, err) + + s1 := []*model.Span{ + { + TraceID: model.NewTraceID(1, 2), + SpanID: model.NewSpanID(1), + OperationName: "x", + Process: model.NewProcess("process1", model.KeyValues{}), + }, + { + TraceID: model.NewTraceID(1, 2), + SpanID: model.NewSpanID(2), + OperationName: "z", + Process: model.NewProcess("process2", model.KeyValues{}), + }, + } + + s2 := []*model.Span{ + { + TraceID: model.NewTraceID(1, 3), + SpanID: model.NewSpanID(2), + OperationName: "y", + Process: model.NewProcess("process1", model.KeyValues{}), + }, + } + + t1 := model.Trace{ + Spans: s1, + } + t2 := model.Trace{ + Spans: s2, + } + mainTrace := []model.Trace{t1, t2} + assert.Equal(t, mainTrace, traces) +} diff --git a/cmd/query/app/http_handler.go b/cmd/query/app/http_handler.go index 973d96026a8..bd8e9980bde 100644 --- a/cmd/query/app/http_handler.go +++ b/cmd/query/app/http_handler.go @@ -123,7 +123,6 @@ func NewAPIHandler(queryService *querysvc.QueryService, tm *tenancy.Manager, opt // RegisterRoutes registers routes for this handler on the given router func (aH *APIHandler) RegisterRoutes(router *mux.Router) { aH.handleFunc(router, aH.getTrace, "/traces/{%s}", traceIDParam).Methods(http.MethodGet) - aH.handleFunc(router, aH.transformOTLP, "/transform").Methods(http.MethodPost) aH.handleFunc(router, aH.archiveTrace, "/archive/{%s}", traceIDParam).Methods(http.MethodPost) aH.handleFunc(router, aH.search, "/traces").Methods(http.MethodGet) aH.handleFunc(router, aH.getServices, "/services").Methods(http.MethodGet) @@ -131,6 +130,7 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) { aH.handleFunc(router, aH.getOperations, "/operations").Methods(http.MethodGet) // TODO - remove this when UI catches up aH.handleFunc(router, aH.getOperationsLegacy, "/services/{%s}/operations", serviceParam).Methods(http.MethodGet) + aH.handleFunc(router, aH.transformOTLP, "/transform").Methods(http.MethodPost) aH.handleFunc(router, aH.dependencies, "/dependencies").Methods(http.MethodGet) aH.handleFunc(router, aH.latencies, "/metrics/latencies").Methods(http.MethodGet) aH.handleFunc(router, aH.calls, "/metrics/calls").Methods(http.MethodGet) @@ -161,39 +161,6 @@ func (aH *APIHandler) formatRoute(route string, args ...interface{}) string { return fmt.Sprintf("/%s"+route, args...) } -func (aH *APIHandler) transformOTLP(w http.ResponseWriter, r *http.Request) { - body, err := ioutil.ReadAll(r.Body) - if aH.handleError(w, err, http.StatusBadRequest) { - return - } - - if aH.handleError(w, err, http.StatusInternalServerError) { - return - } - var uiErrors []structuredError - traces, err := apiv3.OTLP2model(body) - - if aH.handleError(w, err, http.StatusInternalServerError) { - return - } - - uiTraces := make([]*ui.Trace, len(traces)) - for i, v := range traces { - uiTrace, uiErr := aH.convertModelToUI(&v, false) - if uiErr != nil { - uiErrors = append(uiErrors, *uiErr) - } - uiTraces[i] = uiTrace - } - - structuredRes := structuredResponse{ - Data: uiTraces, - Errors: uiErrors, - } - aH.writeJSON(w,r,structuredRes) - -} - func (aH *APIHandler) getServices(w http.ResponseWriter, r *http.Request) { services, err := aH.queryService.GetServices(r.Context()) if aH.handleError(w, err, http.StatusInternalServerError) { @@ -229,6 +196,43 @@ func (aH *APIHandler) getOperationsLegacy(w http.ResponseWriter, r *http.Request aH.writeJSON(w, r, &structuredRes) } +func (aH *APIHandler) transformOTLP(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } + + var uiErrors []structuredError + batches, err := apiv3.OTLP2model(body) + + if aH.handleError(w, err, http.StatusInternalServerError) { + return + } + + traces, err := apiv3.BatchesToTraces(batches) + + if aH.handleError(w, err, http.StatusInternalServerError) { + return + } + + uiTraces := make([]*ui.Trace, len(traces)) + + for i, v := range traces { + uiTrace, uiErr := aH.convertModelToUI(&v, false) + if uiErr != nil { + uiErrors = append(uiErrors, *uiErr) + } + uiTraces[i] = uiTrace + } + + structuredRes := structuredResponse{ + Data: uiTraces, + Errors: uiErrors, + } + aH.writeJSON(w, r, structuredRes) + +} + func (aH *APIHandler) getOperations(w http.ResponseWriter, r *http.Request) { service := r.FormValue(serviceParam) if service == "" { diff --git a/jaeger-ui b/jaeger-ui index 9bb97358c5f..a80f061179c 160000 --- a/jaeger-ui +++ b/jaeger-ui @@ -1 +1 @@ -Subproject commit 9bb97358c5f43740bc42c782ea5c38dedb5937f0 +Subproject commit a80f061179ccb8f01d4c52e57fc3e2cea786019d diff --git a/model/model.pb.go b/model/model.pb.go index 5d78c578d02..42290c9cce5 100644 --- a/model/model.pb.go +++ b/model/model.pb.go @@ -597,22 +597,6 @@ func (*Batch) ProtoMessage() {} func (*Batch) Descriptor() ([]byte, []int) { return fileDescriptor_4c16552f9fdb66d8, []int{6} } - -func (m *Batch)ConvertToTraces()(*Trace){ - ret_trace := Trace{} - ret_trace.Spans = m.Spans - for _,v := range ret_trace.Spans{ - v.Process = m.Process - } - ret_trace.ProcessMap = append(ret_trace.ProcessMap, Trace_ProcessMapping{ - ProcessID: m.Process.ServiceName, - Process: *m.Process, - }) - return &ret_trace - -} - - func (m *Batch) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } diff --git a/model/trace.go b/model/trace.go index fcc31b07ae3..2d0dd763c28 100644 --- a/model/trace.go +++ b/model/trace.go @@ -32,3 +32,21 @@ func (t *Trace) NormalizeTimestamps() { span.NormalizeTimestamps() } } + +func (m *Trace) DenormalizeProcess(p *Process) { + for _, v := range m.Spans { + v.Process = p + } +} + +func (m *Trace) FlattenToSpansMaps(spanMap map[TraceID][]*Span) { + for _, v := range m.Spans { + val, ok := spanMap[v.TraceID] + if !ok { + spanMap[v.TraceID] = []*Span{v} + } else { + spanMap[v.TraceID] = append(val, v) + } + } + +} diff --git a/model/trace_test.go b/model/trace_test.go index 01a6f86c6c6..6256b974bc5 100644 --- a/model/trace_test.go +++ b/model/trace_test.go @@ -67,3 +67,68 @@ func TestTraceNormalizeTimestamps(t *testing.T) { assert.Equal(t, span.StartTime, tt1.UTC()) assert.Equal(t, span.Logs[0].Timestamp, tt2.UTC()) } + +func TestFlattenToSpanMaps(t *testing.T) { + b1 := &model.Trace{ + Spans: []*model.Span{ + {TraceID: model.NewTraceID(1, 2), SpanID: model.NewSpanID(1), OperationName: "x"}, + {TraceID: model.NewTraceID(1, 3), SpanID: model.NewSpanID(2), OperationName: "y"}, + }, + } + + b2 := &model.Trace{ + Spans: []*model.Span{ + {TraceID: model.NewTraceID(1, 2), SpanID: model.NewSpanID(2), OperationName: "z"}, + }, + } + + t1 := []*model.Span{ + {TraceID: model.NewTraceID(1, 2), SpanID: model.NewSpanID(1), OperationName: "x"}, + {TraceID: model.NewTraceID(1, 2), SpanID: model.NewSpanID(2), OperationName: "z"}} + + t2 := []*model.Span{{TraceID: model.NewTraceID(1, 3), SpanID: model.NewSpanID(2), OperationName: "y"}} + spanMap := make(map[model.TraceID][]*model.Span) + b1.FlattenToSpansMaps(spanMap) + b2.FlattenToSpansMaps(spanMap) + assert.Equal(t, t1, spanMap[model.NewTraceID(1, 2)]) + assert.Equal(t, t2, spanMap[model.NewTraceID(1, 3)]) +} + + +func TestDenormalizeProcess(t *testing.T){ + t1 := &model.Trace{ + Spans: []*model.Span{ + { + TraceID: model.NewTraceID(1, 2), + SpanID: model.NewSpanID(1), + OperationName: "x", + }, + { + TraceID: model.NewTraceID(1, 3), + SpanID: model.NewSpanID(2), + OperationName: "y", + }, + }, + } + p1 := model.NewProcess("process1", model.KeyValues{}) + + t2 := &model.Trace{ + Spans: []*model.Span{ + { + TraceID: model.NewTraceID(1, 2), + SpanID: model.NewSpanID(1), + OperationName: "x", + Process: model.NewProcess("process1", model.KeyValues{}), + }, + { + TraceID: model.NewTraceID(1, 3), + SpanID: model.NewSpanID(2), + OperationName: "y", + Process: model.NewProcess("process1", model.KeyValues{}), + }, + }, + } + t1.DenormalizeProcess(p1) + assert.Equal(t, t1, t2) + +}