Skip to content

Commit

Permalink
Refactoring and fixes for endpoint in backend
Browse files Browse the repository at this point in the history
Signed-off-by: Navin Shrinivas <karupal2002@gmail.com>
  • Loading branch information
NavinShrinivas committed Jan 30, 2024
1 parent 817db96 commit 0dc3464
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 66 deletions.
41 changes: 26 additions & 15 deletions cmd/query/app/apiv3/otlp_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package apiv3
import (
"fmt"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
model2otel "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand Down Expand Up @@ -48,25 +47,37 @@ func modelToOTLP(spans []*model.Span) ([]*tracev1.ResourceSpans, error) {
return chunk.ResourceSpans, nil
}

func OTLP2model(OTLPSpans []byte) ([]model.Trace, error) {
ptrace_unmarshaler := ptrace.JSONUnmarshaler{}
otlp_traces, err := ptrace_unmarshaler.UnmarshalTraces(OTLPSpans)
func OTLP2model(OTLPSpans []byte) ([]*model.Batch, error) {
ptraceUnmarshaler := ptrace.JSONUnmarshaler{}
otlpTraces, err := ptraceUnmarshaler.UnmarshalTraces(OTLPSpans)
if err != nil {
fmt.Println(err)
return nil, fmt.Errorf("cannot marshal OTLP : %w", err)
return nil, fmt.Errorf("cannot unmarshal OTLP : %w", err)
}
batches, err := model2otel.ProtoFromTraces(otlp_traces)
fmt.Println(otlp_traces.ResourceSpans())
jaegerBatches, err := model2otel.ProtoFromTraces(otlpTraces)
if err != nil {
fmt.Println(err)
return nil, fmt.Errorf("cannot marshal OTLP : %w", err)
return nil, fmt.Errorf("cannot transform OTLP to Jaeger: %w", err)
}
jaeger_traces := make([]model.Trace, len(batches) )
for _, v := range batches {
mar := jsonpb.Marshaler{}
fmt.Println(mar.MarshalToString(v))
jaeger_trace := v.ConvertToTraces()
jaeger_traces = append(jaeger_traces, *jaeger_trace)

return jaegerBatches, nil
}

func BatchesToTraces(jaegerBatches []*model.Batch) ([]model.Trace, error) {
var jaegerTraces []model.Trace
spanMap := make(map[model.TraceID][]*model.Span)
for _, v := range jaegerBatches {
jaegerTrace := model.Trace{
Spans: v.Spans,
}
jaegerTrace.DenormalizeProcess(v.Process)
jaegerTrace.FlattenToSpansMaps(spanMap)
}
for _, v := range spanMap {
jaegerTrace := model.Trace{
Spans: v,
}
jaegerTraces = append(jaegerTraces, jaegerTrace)
}
return jaeger_traces, nil
return jaegerTraces, nil
}
58 changes: 58 additions & 0 deletions cmd/query/app/apiv3/package_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,67 @@ package apiv3
import (
"testing"

"github.com/jaegertracing/jaeger/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}

func TestBatchesToTraces(t *testing.T) {
b1 := &model.Batch{
Spans: []*model.Span{
{TraceID: model.NewTraceID(1, 2), SpanID: model.NewSpanID(1), OperationName: "x"},
{TraceID: model.NewTraceID(1, 3), SpanID: model.NewSpanID(2), OperationName: "y"},
},
Process: model.NewProcess("process1", model.KeyValues{}),
}

b2 := &model.Batch{
Spans: []*model.Span{
{TraceID: model.NewTraceID(1, 2), SpanID: model.NewSpanID(2), OperationName: "z"},
},
Process: model.NewProcess("process2", model.KeyValues{}),
}

mainBatch := []*model.Batch{b1, b2}

traces, err := BatchesToTraces(mainBatch)
require.Nil(t, err)

s1 := []*model.Span{
{
TraceID: model.NewTraceID(1, 2),
SpanID: model.NewSpanID(1),
OperationName: "x",
Process: model.NewProcess("process1", model.KeyValues{}),
},
{
TraceID: model.NewTraceID(1, 2),
SpanID: model.NewSpanID(2),
OperationName: "z",
Process: model.NewProcess("process2", model.KeyValues{}),
},
}

s2 := []*model.Span{
{
TraceID: model.NewTraceID(1, 3),
SpanID: model.NewSpanID(2),
OperationName: "y",
Process: model.NewProcess("process1", model.KeyValues{}),
},
}

t1 := model.Trace{
Spans: s1,
}
t2 := model.Trace{
Spans: s2,
}
mainTrace := []model.Trace{t1, t2}
assert.Equal(t, mainTrace, traces)
}
72 changes: 38 additions & 34 deletions cmd/query/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,14 @@ func NewAPIHandler(queryService *querysvc.QueryService, tm *tenancy.Manager, opt
// RegisterRoutes registers routes for this handler on the given router
func (aH *APIHandler) RegisterRoutes(router *mux.Router) {
aH.handleFunc(router, aH.getTrace, "/traces/{%s}", traceIDParam).Methods(http.MethodGet)
aH.handleFunc(router, aH.transformOTLP, "/transform").Methods(http.MethodPost)
aH.handleFunc(router, aH.archiveTrace, "/archive/{%s}", traceIDParam).Methods(http.MethodPost)
aH.handleFunc(router, aH.search, "/traces").Methods(http.MethodGet)
aH.handleFunc(router, aH.getServices, "/services").Methods(http.MethodGet)
// TODO change the UI to use this endpoint. Requires ?service= parameter.
aH.handleFunc(router, aH.getOperations, "/operations").Methods(http.MethodGet)
// TODO - remove this when UI catches up
aH.handleFunc(router, aH.getOperationsLegacy, "/services/{%s}/operations", serviceParam).Methods(http.MethodGet)
aH.handleFunc(router, aH.transformOTLP, "/transform").Methods(http.MethodPost)
aH.handleFunc(router, aH.dependencies, "/dependencies").Methods(http.MethodGet)
aH.handleFunc(router, aH.latencies, "/metrics/latencies").Methods(http.MethodGet)
aH.handleFunc(router, aH.calls, "/metrics/calls").Methods(http.MethodGet)
Expand Down Expand Up @@ -161,39 +161,6 @@ func (aH *APIHandler) formatRoute(route string, args ...interface{}) string {
return fmt.Sprintf("/%s"+route, args...)
}

func (aH *APIHandler) transformOTLP(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if aH.handleError(w, err, http.StatusBadRequest) {
return
}

if aH.handleError(w, err, http.StatusInternalServerError) {
return
}
var uiErrors []structuredError
traces, err := apiv3.OTLP2model(body)

if aH.handleError(w, err, http.StatusInternalServerError) {
return
}

uiTraces := make([]*ui.Trace, len(traces))
for i, v := range traces {
uiTrace, uiErr := aH.convertModelToUI(&v, false)
if uiErr != nil {
uiErrors = append(uiErrors, *uiErr)
}
uiTraces[i] = uiTrace
}

structuredRes := structuredResponse{
Data: uiTraces,
Errors: uiErrors,
}
aH.writeJSON(w,r,structuredRes)

}

func (aH *APIHandler) getServices(w http.ResponseWriter, r *http.Request) {
services, err := aH.queryService.GetServices(r.Context())
if aH.handleError(w, err, http.StatusInternalServerError) {
Expand Down Expand Up @@ -229,6 +196,43 @@ func (aH *APIHandler) getOperationsLegacy(w http.ResponseWriter, r *http.Request
aH.writeJSON(w, r, &structuredRes)
}

func (aH *APIHandler) transformOTLP(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if aH.handleError(w, err, http.StatusBadRequest) {
return
}

var uiErrors []structuredError
batches, err := apiv3.OTLP2model(body)

if aH.handleError(w, err, http.StatusInternalServerError) {
return
}

traces, err := apiv3.BatchesToTraces(batches)

if aH.handleError(w, err, http.StatusInternalServerError) {
return
}

uiTraces := make([]*ui.Trace, len(traces))

for i, v := range traces {
uiTrace, uiErr := aH.convertModelToUI(&v, false)
if uiErr != nil {
uiErrors = append(uiErrors, *uiErr)
}
uiTraces[i] = uiTrace
}

structuredRes := structuredResponse{
Data: uiTraces,
Errors: uiErrors,
}
aH.writeJSON(w, r, structuredRes)

}

func (aH *APIHandler) getOperations(w http.ResponseWriter, r *http.Request) {
service := r.FormValue(serviceParam)
if service == "" {
Expand Down
16 changes: 0 additions & 16 deletions model/model.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions model/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,21 @@ func (t *Trace) NormalizeTimestamps() {
span.NormalizeTimestamps()
}
}

func (m *Trace) DenormalizeProcess(p *Process) {
for _, v := range m.Spans {
v.Process = p
}
}

func (m *Trace) FlattenToSpansMaps(spanMap map[TraceID][]*Span) {
for _, v := range m.Spans {
val, ok := spanMap[v.TraceID]
if !ok {
spanMap[v.TraceID] = []*Span{v}
} else {
spanMap[v.TraceID] = append(val, v)
}
}

}
65 changes: 65 additions & 0 deletions model/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,68 @@ func TestTraceNormalizeTimestamps(t *testing.T) {
assert.Equal(t, span.StartTime, tt1.UTC())
assert.Equal(t, span.Logs[0].Timestamp, tt2.UTC())
}

func TestFlattenToSpanMaps(t *testing.T) {
b1 := &model.Trace{
Spans: []*model.Span{
{TraceID: model.NewTraceID(1, 2), SpanID: model.NewSpanID(1), OperationName: "x"},
{TraceID: model.NewTraceID(1, 3), SpanID: model.NewSpanID(2), OperationName: "y"},
},
}

b2 := &model.Trace{
Spans: []*model.Span{
{TraceID: model.NewTraceID(1, 2), SpanID: model.NewSpanID(2), OperationName: "z"},
},
}

t1 := []*model.Span{
{TraceID: model.NewTraceID(1, 2), SpanID: model.NewSpanID(1), OperationName: "x"},
{TraceID: model.NewTraceID(1, 2), SpanID: model.NewSpanID(2), OperationName: "z"}}

t2 := []*model.Span{{TraceID: model.NewTraceID(1, 3), SpanID: model.NewSpanID(2), OperationName: "y"}}
spanMap := make(map[model.TraceID][]*model.Span)
b1.FlattenToSpansMaps(spanMap)
b2.FlattenToSpansMaps(spanMap)
assert.Equal(t, t1, spanMap[model.NewTraceID(1, 2)])
assert.Equal(t, t2, spanMap[model.NewTraceID(1, 3)])
}


func TestDenormalizeProcess(t *testing.T){
t1 := &model.Trace{
Spans: []*model.Span{
{
TraceID: model.NewTraceID(1, 2),
SpanID: model.NewSpanID(1),
OperationName: "x",
},
{
TraceID: model.NewTraceID(1, 3),
SpanID: model.NewSpanID(2),
OperationName: "y",
},
},
}
p1 := model.NewProcess("process1", model.KeyValues{})

t2 := &model.Trace{
Spans: []*model.Span{
{
TraceID: model.NewTraceID(1, 2),
SpanID: model.NewSpanID(1),
OperationName: "x",
Process: model.NewProcess("process1", model.KeyValues{}),
},
{
TraceID: model.NewTraceID(1, 3),
SpanID: model.NewSpanID(2),
OperationName: "y",
Process: model.NewProcess("process1", model.KeyValues{}),
},
},
}
t1.DenormalizeProcess(p1)
assert.Equal(t, t1, t2)

}

0 comments on commit 0dc3464

Please sign in to comment.