Skip to content

Commit

Permalink
Fix duplicated spans when querying Elasticsearch (#1677)
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay committed Jul 23, 2019
1 parent 428cc1a commit a14ca3c
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 3 deletions.
5 changes: 2 additions & 3 deletions plugin/storage/es/spanstore/reader.go
Expand Up @@ -308,9 +308,7 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st
if len(traceIDs) == 0 {
return []*model.Trace{}, nil
}
searchRequests := make([]*elastic.SearchRequest, len(traceIDs))

var traces []*model.Trace
// Add an hour in both directions so that traces that straddle two indexes are retrieved.
// i.e starts in one and ends in another.
indices := s.timeRangeIndices(s.spanIndexPrefix, startTime.Add(-time.Hour), endTime.Add(time.Hour))
Expand All @@ -323,7 +321,7 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st
if len(traceIDs) == 0 {
break
}

searchRequests := make([]*elastic.SearchRequest, len(traceIDs))
for i, traceID := range traceIDs {
query := elastic.NewTermQuery("traceID", traceID.String())
if val, ok := searchAfterTime[traceID]; ok {
Expand Down Expand Up @@ -375,6 +373,7 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st
}
}

var traces []*model.Trace
for _, trace := range tracesMap {
traces = append(traces, trace)
}
Expand Down
76 changes: 76 additions & 0 deletions plugin/storage/es/spanstore/reader_test.go
Expand Up @@ -205,6 +205,82 @@ func TestSpanReader_GetTrace(t *testing.T) {
})
}

func TestSpanReader_multiRead_followUp_query(t *testing.T) {
withSpanReader(func(r *spanReaderTest) {
now := time.Now()
spanID1 := dbmodel.Span{SpanID: "0", TraceID:"1", StartTime: model.TimeAsEpochMicroseconds(now)}
spanBytesID1, err := json.Marshal(spanID1)
require.NoError(t, err)
spanID2 := dbmodel.Span{SpanID: "0", TraceID:"2", StartTime: model.TimeAsEpochMicroseconds(now)}
spanBytesID2, err := json.Marshal(spanID2)
require.NoError(t, err)

id1Query := elastic.NewTermQuery("traceID", model.TraceID{High: 0, Low:1}.String())
id1Search := elastic.NewSearchRequest().
IgnoreUnavailable(true).
Type(spanType).
Source(r.reader.sourceFn(id1Query, model.TimeAsEpochMicroseconds(now.Add(-time.Hour))))
id2Query := elastic.NewTermQuery("traceID", model.TraceID{High: 0, Low:2}.String())
id2Search := elastic.NewSearchRequest().
IgnoreUnavailable(true).
Type(spanType).
Source(r.reader.sourceFn(id2Query, model.TimeAsEpochMicroseconds(now.Add(-time.Hour))))
id1SearchSpanTime := elastic.NewSearchRequest().
IgnoreUnavailable(true).
Type(spanType).
Source(r.reader.sourceFn(id1Query, spanID1.StartTime))

multiSearchService := &mocks.MultiSearchService{}
firstMultiSearch := &mocks.MultiSearchService{}
secondMultiSearch := &mocks.MultiSearchService{}
multiSearchService.On("Add", id1Search, id2Search).Return(firstMultiSearch)
multiSearchService.On("Add", id1SearchSpanTime).Return(secondMultiSearch)

firstMultiSearch.On("Index", mock.AnythingOfType("string")).Return(firstMultiSearch)
secondMultiSearch.On("Index", mock.AnythingOfType("string")).Return(secondMultiSearch)
r.client.On("MultiSearch").Return(multiSearchService)

fistMultiSearchMock := firstMultiSearch.On("Do", mock.AnythingOfType("*context.emptyCtx"))
secondMultiSearchMock := secondMultiSearch.On("Do", mock.AnythingOfType("*context.emptyCtx"))

// set TotalHits to two to trigger the follow up query
// the client will return only one span therefore the implementation
// triggers follow up query for the same traceID with the timestamp of the last span
searchHitsID1 := &elastic.SearchHits{Hits: []*elastic.SearchHit{
{Source: (*json.RawMessage)(&spanBytesID1)},
}, TotalHits: 2}
fistMultiSearchMock.
Return(&elastic.MultiSearchResult{
Responses: []*elastic.SearchResult{
{Hits: searchHitsID1},
},
}, nil)

searchHitsID2 := &elastic.SearchHits{Hits: []*elastic.SearchHit{
{Source: (*json.RawMessage)(&spanBytesID2)},
}, TotalHits: 1}
secondMultiSearchMock.
Return(&elastic.MultiSearchResult{
Responses: []*elastic.SearchResult{
{Hits: searchHitsID2},
},
}, nil)

traces, err := r.reader.multiRead(context.Background(), []model.TraceID{{High:0, Low:1}, {High:0, Low:2}}, now, now)
require.NoError(t, err)
require.NotNil(t, traces)
require.Len(t, traces, 2)

toDomain := dbmodel.NewToDomain("-")
sModel1, err := toDomain.SpanToDomain(&spanID1)
require.NoError(t, err)
sModel2, err := toDomain.SpanToDomain(&spanID2)
require.NoError(t, err)
assert.EqualValues(t, traces[0].Spans[0], sModel1)
assert.EqualValues(t, traces[1].Spans[0], sModel2)
})
}

func TestSpanReader_SearchAfter(t *testing.T) {
withSpanReader(func(r *spanReaderTest) {
var hits []*elastic.SearchHit
Expand Down

0 comments on commit a14ca3c

Please sign in to comment.