Skip to content

Commit

Permalink
add start and end parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
alburthoffman committed Mar 29, 2023
1 parent bc8e1d1 commit 15d3c5e
Show file tree
Hide file tree
Showing 34 changed files with 343 additions and 159 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ SWAGGER_IMAGE=quay.io/goswagger/swagger:v$(SWAGGER_VER)
SWAGGER=docker run --rm -it -u ${shell id -u} -v "${PWD}:/go/src/" -w /go/src/ $(SWAGGER_IMAGE)
SWAGGER_GEN_DIR=swagger-gen

JAEGER_DOCKER_PROTOBUF=jaegertracing/protobuf:0.3.0
JAEGER_DOCKER_PROTOBUF=jaegertracing/protobuf:arm64

COLOR_PASS=$(shell printf "\033[32mPASS\033[0m")
COLOR_FAIL=$(shell printf "\033[31mFAIL\033[0m")
Expand Down
4 changes: 2 additions & 2 deletions cmd/query/app/apiv3/grpc_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func testGRPCGatewayWithTenancy(t *testing.T, basePath string, serverTLS tlscfg.
defer httpServer.Shutdown(context.Background())

traceID := model.NewTraceID(150, 160)
reader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).Return(
reader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.TraceIDQueryParameters")).Return(
&model.Trace{
Spans: []*model.Span{
{
Expand Down Expand Up @@ -197,7 +197,7 @@ func TestTenancyGRPCRejection(t *testing.T) {
defer httpServer.Shutdown(context.Background())

traceID := model.NewTraceID(150, 160)
reader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).Return(
reader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.TraceIDQueryParameters")).Return(
&model.Trace{
Spans: []*model.Span{
{
Expand Down
16 changes: 15 additions & 1 deletion cmd/query/app/apiv3/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,21 @@ func (h *Handler) GetTrace(request *api_v3.GetTraceRequest, stream api_v3.QueryS
return err
}

trace, err := h.QueryService.GetTrace(stream.Context(), traceID)
start, err := types.TimestampFromProto(request.GetStart())
if err != nil {
return err
}

end, err := types.TimestampFromProto(request.GetStart())
if err != nil {
return err
}

trace, err := h.QueryService.GetTrace(stream.Context(), &spanstore.TraceIDQueryParameters{
ID: traceID,
Start: start,
End: end,
})
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/query/app/apiv3/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func newGrpcServer(t *testing.T, handler *Handler) (*grpc.Server, net.Addr) {

func TestGetTrace(t *testing.T) {
r := &spanstoremocks.Reader{}
r.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).Return(
r.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceIDQueryParameters")).Return(
&model.Trace{
Spans: []*model.Span{
{
Expand Down Expand Up @@ -83,7 +83,7 @@ func TestGetTrace(t *testing.T) {

func TestGetTrace_storage_error(t *testing.T) {
r := &spanstoremocks.Reader{}
r.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).Return(
r.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.TraceIDQueryParameters")).Return(
nil, fmt.Errorf("storage_error")).Once()

q := querysvc.NewQueryService(r, &dependencyStoreMocks.Reader{}, querysvc.QueryServiceOptions{})
Expand Down
8 changes: 6 additions & 2 deletions cmd/query/app/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ func (g *GRPCHandler) GetTrace(r *api_v2.GetTraceRequest, stream api_v2.QuerySer
if r.TraceID == (model.TraceID{}) {
return errUninitializedTraceID
}
trace, err := g.queryService.GetTrace(stream.Context(), r.TraceID)
trace, err := g.queryService.GetTrace(stream.Context(), &spanstore.TraceIDQueryParameters{
ID: r.TraceID,
})
if err == spanstore.ErrTraceNotFound {
g.logger.Error(msgTraceNotFound, zap.Error(err))
return status.Errorf(codes.NotFound, "%s: %v", msgTraceNotFound, err)
Expand All @@ -87,7 +89,9 @@ func (g *GRPCHandler) ArchiveTrace(ctx context.Context, r *api_v2.ArchiveTraceRe
if r.TraceID == (model.TraceID{}) {
return nil, errUninitializedTraceID
}
err := g.queryService.ArchiveTrace(ctx, r.TraceID)
err := g.queryService.ArchiveTrace(ctx, &spanstore.TraceIDQueryParameters{
ID: r.TraceID,
})
if err == spanstore.ErrTraceNotFound {
g.logger.Error("trace not found", zap.Error(err))
return nil, status.Errorf(codes.NotFound, "%s: %v", msgTraceNotFound, err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func withServerAndClient(t *testing.T, actualTest func(server *grpcServer, clien

func TestGetTraceSuccessGRPC(t *testing.T) {
withServerAndClient(t, func(server *grpcServer, client *grpcClient) {
server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceIDQueryParameters")).
Return(mockTrace, nil).Once()

res, err := client.GetTrace(context.Background(), &api_v2.GetTraceRequest{
Expand Down
12 changes: 9 additions & 3 deletions cmd/query/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,9 @@ func (aH *APIHandler) tracesByIDs(ctx context.Context, traceIDs []model.TraceID)
var errors []structuredError
retMe := make([]*model.Trace, 0, len(traceIDs))
for _, traceID := range traceIDs {
if trace, err := aH.queryService.GetTrace(ctx, traceID); err != nil {
if trace, err := aH.queryService.GetTrace(ctx, &spanstore.TraceIDQueryParameters{
ID: traceID,
}); err != nil {
if err != spanstore.ErrTraceNotFound {
return nil, nil, err
}
Expand Down Expand Up @@ -428,7 +430,9 @@ func (aH *APIHandler) getTrace(w http.ResponseWriter, r *http.Request) {
if !ok {
return
}
trace, err := aH.queryService.GetTrace(r.Context(), traceID)
trace, err := aH.queryService.GetTrace(r.Context(), &spanstore.TraceIDQueryParameters{
ID: traceID,
})
if err == spanstore.ErrTraceNotFound {
aH.handleError(w, err, http.StatusNotFound)
return
Expand Down Expand Up @@ -467,7 +471,9 @@ func (aH *APIHandler) archiveTrace(w http.ResponseWriter, r *http.Request) {
}

// QueryService.ArchiveTrace can now archive this traceID.
err := aH.queryService.ArchiveTrace(r.Context(), traceID)
err := aH.queryService.ArchiveTrace(r.Context(), &spanstore.TraceIDQueryParameters{
ID: traceID,
})
if err == spanstore.ErrTraceNotFound {
aH.handleError(w, err, http.StatusNotFound)
return
Expand Down
10 changes: 5 additions & 5 deletions cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ func NewQueryService(spanReader spanstore.Reader, dependencyReader dependencysto
}

// GetTrace is the queryService implementation of spanstore.Reader.GetTrace
func (qs QueryService) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
trace, err := qs.spanReader.GetTrace(ctx, traceID)
func (qs QueryService) GetTrace(ctx context.Context, query *spanstore.TraceIDQueryParameters) (*model.Trace, error) {
trace, err := qs.spanReader.GetTrace(ctx, query)
if err == spanstore.ErrTraceNotFound {
if qs.options.ArchiveSpanReader == nil {
return nil, err
}
trace, err = qs.options.ArchiveSpanReader.GetTrace(ctx, traceID)
trace, err = qs.options.ArchiveSpanReader.GetTrace(ctx, query)
}
return trace, err
}
Expand All @@ -93,11 +93,11 @@ func (qs QueryService) FindTraces(ctx context.Context, query *spanstore.TraceQue
}

// ArchiveTrace is the queryService utility to archive traces.
func (qs QueryService) ArchiveTrace(ctx context.Context, traceID model.TraceID) error {
func (qs QueryService) ArchiveTrace(ctx context.Context, query *spanstore.TraceIDQueryParameters) error {
if qs.options.ArchiveSpanWriter == nil {
return errNoArchiveSpanStorage
}
trace, err := qs.GetTrace(ctx, traceID)
trace, err := qs.GetTrace(ctx, query)
if err != nil {
return err
}
Expand Down
44 changes: 29 additions & 15 deletions cmd/query/app/querysvc/query_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,39 +116,45 @@ func initializeTestService(optionAppliers ...testOption) *testQueryService {
// Test QueryService.GetTrace()
func TestGetTraceSuccess(t *testing.T) {
tqs := initializeTestService()
tqs.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
tqs.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceIDQueryParameters")).
Return(mockTrace, nil).Once()

type contextKey string
ctx := context.Background()
res, err := tqs.queryService.GetTrace(context.WithValue(ctx, contextKey("foo"), "bar"), mockTraceID)
res, err := tqs.queryService.GetTrace(context.WithValue(ctx, contextKey("foo"), "bar"), &spanstore.TraceIDQueryParameters{
ID: mockTraceID,
})
assert.NoError(t, err)
assert.Equal(t, res, mockTrace)
}

// Test QueryService.GetTrace() without ArchiveSpanReader
func TestGetTraceNotFound(t *testing.T) {
tqs := initializeTestService()
tqs.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
tqs.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceIDQueryParameters")).
Return(nil, spanstore.ErrTraceNotFound).Once()

type contextKey string
ctx := context.Background()
_, err := tqs.queryService.GetTrace(context.WithValue(ctx, contextKey("foo"), "bar"), mockTraceID)
_, err := tqs.queryService.GetTrace(context.WithValue(ctx, contextKey("foo"), "bar"), &spanstore.TraceIDQueryParameters{
ID: mockTraceID,
})
assert.Equal(t, err, spanstore.ErrTraceNotFound)
}

// Test QueryService.GetTrace() with ArchiveSpanReader
func TestGetTraceFromArchiveStorage(t *testing.T) {
tqs := initializeTestService(withArchiveSpanReader())
tqs.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
tqs.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceIDQueryParameters")).
Return(nil, spanstore.ErrTraceNotFound).Once()
tqs.archiveSpanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
tqs.archiveSpanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceIDQueryParameters")).
Return(mockTrace, nil).Once()

type contextKey string
ctx := context.Background()
res, err := tqs.queryService.GetTrace(context.WithValue(ctx, contextKey("foo"), "bar"), mockTraceID)
res, err := tqs.queryService.GetTrace(context.WithValue(ctx, contextKey("foo"), "bar"), &spanstore.TraceIDQueryParameters{
ID: mockTraceID,
})
assert.NoError(t, err)
assert.Equal(t, res, mockTrace)
}
Expand Down Expand Up @@ -211,50 +217,58 @@ func TestArchiveTraceNoOptions(t *testing.T) {

type contextKey string
ctx := context.Background()
err := tqs.queryService.ArchiveTrace(context.WithValue(ctx, contextKey("foo"), "bar"), mockTraceID)
err := tqs.queryService.ArchiveTrace(context.WithValue(ctx, contextKey("foo"), "bar"), &spanstore.TraceIDQueryParameters{
ID: mockTraceID,
})
assert.Equal(t, errNoArchiveSpanStorage, err)
}

// Test QueryService.ArchiveTrace() with ArchiveSpanWriter but invalid traceID.
func TestArchiveTraceWithInvalidTraceID(t *testing.T) {
tqs := initializeTestService(withArchiveSpanReader(), withArchiveSpanWriter())
tqs.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
tqs.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceIDQueryParameters")).
Return(nil, spanstore.ErrTraceNotFound).Once()
tqs.archiveSpanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
tqs.archiveSpanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceIDQueryParameters")).
Return(nil, spanstore.ErrTraceNotFound).Once()

type contextKey string
ctx := context.Background()
err := tqs.queryService.ArchiveTrace(context.WithValue(ctx, contextKey("foo"), "bar"), mockTraceID)
err := tqs.queryService.ArchiveTrace(context.WithValue(ctx, contextKey("foo"), "bar"), &spanstore.TraceIDQueryParameters{
ID: mockTraceID,
})
assert.Equal(t, spanstore.ErrTraceNotFound, err)
}

// Test QueryService.ArchiveTrace(), save error with ArchiveSpanWriter.
func TestArchiveTraceWithArchiveWriterError(t *testing.T) {
tqs := initializeTestService(withArchiveSpanWriter())
tqs.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
tqs.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceIDQueryParameters")).
Return(mockTrace, nil).Once()
tqs.archiveSpanWriter.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")).
Return(errors.New("cannot save")).Times(2)

type contextKey string
ctx := context.Background()
joinErr := tqs.queryService.ArchiveTrace(context.WithValue(ctx, contextKey("foo"), "bar"), mockTraceID)
joinErr := tqs.queryService.ArchiveTrace(context.WithValue(ctx, contextKey("foo"), "bar"), &spanstore.TraceIDQueryParameters{
ID: mockTraceID,
})
// There are two spans in the mockTrace, ArchiveTrace should return a wrapped error.
assert.EqualError(t, joinErr, "cannot save\ncannot save")
}

// Test QueryService.ArchiveTrace() with correctly configured ArchiveSpanWriter.
func TestArchiveTraceSuccess(t *testing.T) {
tqs := initializeTestService(withArchiveSpanWriter())
tqs.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
tqs.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceIDQueryParameters")).
Return(mockTrace, nil).Once()
tqs.archiveSpanWriter.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")).
Return(nil).Times(2)

type contextKey string
ctx := context.Background()
err := tqs.queryService.ArchiveTrace(context.WithValue(ctx, contextKey("foo"), "bar"), mockTraceID)
err := tqs.queryService.ArchiveTrace(context.WithValue(ctx, contextKey("foo"), "bar"), &spanstore.TraceIDQueryParameters{
ID: mockTraceID,
})
assert.NoError(t, err)
}

Expand Down
20 changes: 13 additions & 7 deletions plugin/storage/badger/spanstore/read_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,11 @@ func TestWriteReadBack(t *testing.T) {
}

for i := 0; i < traces; i++ {
tr, err := sr.GetTrace(context.Background(), model.TraceID{
Low: uint64(i),
High: 1,
tr, err := sr.GetTrace(context.Background(), &spanstore.TraceIDQueryParameters{
ID: model.TraceID{
Low: uint64(i),
High: 1,
},
})
assert.NoError(t, err)

Expand Down Expand Up @@ -302,7 +304,9 @@ func TestFindNothing(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, trs, 0)

tr, err := sr.GetTrace(context.Background(), model.TraceID{High: 0, Low: 0})
tr, err := sr.GetTrace(context.Background(), &spanstore.TraceIDQueryParameters{
ID: model.TraceID{High: 0, Low: 0},
})
assert.Equal(t, spanstore.ErrTraceNotFound, err)
assert.Nil(t, tr)
})
Expand Down Expand Up @@ -431,9 +435,11 @@ func TestPersist(t *testing.T) {
})

p(t, dir, func(t *testing.T, sw spanstore.Writer, sr spanstore.Reader) {
trace, err := sr.GetTrace(context.Background(), model.TraceID{
Low: uint64(1),
High: 1,
trace, err := sr.GetTrace(context.Background(), &spanstore.TraceIDQueryParameters{
ID: model.TraceID{
Low: uint64(1),
High: 1,
},
})
assert.NoError(t, err)
assert.Equal(t, "operation-p", trace.Spans[0].OperationName)
Expand Down
4 changes: 2 additions & 2 deletions plugin/storage/badger/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ func (r *TraceReader) getTraces(traceIDs []model.TraceID) ([]*model.Trace, error
}

// GetTrace takes a traceID and returns a Trace associated with that traceID
func (r *TraceReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
traces, err := r.getTraces([]model.TraceID{traceID})
func (r *TraceReader) GetTrace(ctx context.Context, query *spanstore.TraceIDQueryParameters) (*model.Trace, error) {
traces, err := r.getTraces([]model.TraceID{query.ID})
if err != nil {
return nil, err
}
Expand Down
8 changes: 6 additions & 2 deletions plugin/storage/badger/spanstore/rw_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ func TestEncodingTypes(t *testing.T) {
err := sw.WriteSpan(context.Background(), &testSpan)
assert.NoError(t, err)

tr, err := rw.GetTrace(context.Background(), model.TraceID{Low: 0, High: 1})
tr, err := rw.GetTrace(context.Background(), &spanstore.TraceIDQueryParameters{
ID: model.TraceID{Low: 0, High: 1},
})
assert.NoError(t, err)
assert.Equal(t, 1, len(tr.Spans))
})
Expand Down Expand Up @@ -83,7 +85,9 @@ func TestEncodingTypes(t *testing.T) {
return nil
})

_, err = rw.GetTrace(context.Background(), model.TraceID{Low: 0, High: 1})
_, err = rw.GetTrace(context.Background(), &spanstore.TraceIDQueryParameters{
ID: model.TraceID{Low: 0, High: 1},
})
assert.EqualError(t, err, "unknown encoding type: 0x04")
})
}
Expand Down
4 changes: 3 additions & 1 deletion plugin/storage/cassandra/savetracetest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ func main() {
logger.Info("Saved span", zap.String("spanID", getSomeSpan().SpanID.String()))
}
s := getSomeSpan()
trace, err := spanReader.GetTrace(ctx, s.TraceID)
trace, err := spanReader.GetTrace(ctx, &spanstore.TraceIDQueryParameters{
ID: s.TraceID,
})
if err != nil {
logger.Fatal("Failed to read", zap.Error(err))
} else {
Expand Down
8 changes: 5 additions & 3 deletions plugin/storage/cassandra/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ func (s *SpanReader) readTraceInSpan(ctx context.Context, traceID dbmodel.TraceI
}

// GetTrace takes a traceID and returns a Trace associated with that traceID
func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
return s.readTrace(ctx, dbmodel.TraceIDFromDomain(traceID))
func (s *SpanReader) GetTrace(ctx context.Context, query *spanstore.TraceIDQueryParameters) (*model.Trace, error) {
return s.readTrace(ctx, dbmodel.TraceIDFromDomain(query.ID))
}

func validateQuery(p *spanstore.TraceQueryParameters) error {
Expand Down Expand Up @@ -242,7 +242,9 @@ func (s *SpanReader) FindTraces(ctx context.Context, traceQuery *spanstore.Trace
}
var retMe []*model.Trace
for _, traceID := range uniqueTraceIDs {
jTrace, err := s.GetTrace(ctx, traceID)
jTrace, err := s.GetTrace(ctx, &spanstore.TraceIDQueryParameters{
ID: traceID,
})
if err != nil {
s.logger.Error("Failure to read trace", zap.String("trace_id", traceID.String()), zap.Error(err))
continue
Expand Down

0 comments on commit 15d3c5e

Please sign in to comment.