Skip to content

Commit

Permalink
Merge 240cdfa into 135bed0
Browse files Browse the repository at this point in the history
  • Loading branch information
sramakr committed Apr 11, 2018
2 parents 135bed0 + 240cdfa commit 2479487
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 24 deletions.
77 changes: 57 additions & 20 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,39 +205,76 @@ func (s *SpanReader) FindTraces(traceQuery *spanstore.TraceQueryParameters) ([]*
}

func (s *SpanReader) multiRead(traceIDs []string, startTime, endTime time.Time) ([]*model.Trace, error) {

if len(traceIDs) == 0 {
return []*model.Trace{}, nil
}
searchRequests := make([]*elastic.SearchRequest, len(traceIDs))

for i, traceID := range traceIDs {
query := elastic.NewTermQuery("traceID", traceID)
searchRequests[i] = elastic.NewSearchRequest().IgnoreUnavailable(true).Type("span").
Source(elastic.NewSearchSource().Query(query).Size(defaultDocCount))
}

var traces []*model.Trace
// Add an hour in both directions so that traces that straddle two indexes are retrieved.
// ie starts in one and ends in another.
// i.e starts in one and ends in another.
indices := findIndices(spanIndexPrefix, startTime.Add(-time.Hour), endTime.Add(time.Hour))

results, err := s.client.MultiSearch().
Add(searchRequests...).
Index(indices...).
Do(s.ctx)
if err != nil {
return nil, err
}
nextTime := model.TimeAsEpochMicroseconds(startTime.Add(-time.Hour))

var traces []*model.Trace
for _, result := range results.Responses {
if result.Hits == nil || len(result.Hits.Hits) == 0 {
continue
searchAfterTime := make(map[string]uint64)
totalDocumentsFetched := make(map[string]int)
tracesMap := make(map[string]*model.Trace)
for {
if traceIDs == nil || len(traceIDs) == 0 {
break
}
spans, err := s.collectSpans(result.Hits.Hits)

for i, traceID := range traceIDs {
query := elastic.NewTermQuery("traceID", traceID)
if val, ok := searchAfterTime[traceID]; ok {
nextTime = val
}
searchRequests[i] = elastic.NewSearchRequest().IgnoreUnavailable(true).Type("span").Source(elastic.NewSearchSource().Query(query).Size(defaultDocCount).Sort("startTime", true).SearchAfter(nextTime))
}
// set traceIDs to empty
traceIDs = nil
results, err := s.client.MultiSearch().Add(searchRequests...).Index(indices...).Do(s.ctx)

if err != nil {
return nil, err
}
traces = append(traces, &model.Trace{Spans: spans})

if results.Responses == nil || len(results.Responses) == 0 {
break
}

for _, result := range results.Responses {
if result.Hits == nil || len(result.Hits.Hits) == 0 {
continue
}
spans, err := s.collectSpans(result.Hits.Hits)
if err != nil {
return nil, err
}
lastSpan := spans[len(spans)-1]
lastSpanTraceID := lastSpan.TraceID.String()

if traceSpan, ok := tracesMap[lastSpanTraceID]; ok {
for _, span := range spans {
traceSpan.Spans = append(traceSpan.Spans, span)
}

} else {
tracesMap[lastSpanTraceID] = &model.Trace{Spans: spans}
}

totalDocumentsFetched[lastSpanTraceID] = totalDocumentsFetched[lastSpanTraceID] + len(result.Hits.Hits)
if totalDocumentsFetched[lastSpanTraceID] < int(result.TotalHits()) {
traceIDs = append(traceIDs, lastSpanTraceID)
searchAfterTime[lastSpanTraceID] = model.TimeAsEpochMicroseconds(lastSpan.StartTime)
}
}
}

for _, trace := range tracesMap {
traces = append(traces, trace)
}
return traces, nil
}
Expand Down
36 changes: 33 additions & 3 deletions plugin/storage/es/spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,36 @@ func TestSpanReader_GetTrace(t *testing.T) {
})
}

func TestSpanReader_SearchAfter(t *testing.T) {
withSpanReader(func(r *spanReaderTest) {
var hits []*elastic.SearchHit

for i := 0; i < 10000; i++ {
hit := &elastic.SearchHit{Source: (*json.RawMessage)(&exampleESSpan)}
hits = append(hits, hit)
}

searchHits := &elastic.SearchHits{Hits: hits, TotalHits: int64(10040)}

mockSearchService(r).Return(&elastic.SearchResult{Hits: searchHits}, nil)
mockMultiSearchService(r).
Return(&elastic.MultiSearchResult{
Responses: []*elastic.SearchResult{
{Hits: searchHits},
},
}, nil).Times(2)

trace, err := r.reader.GetTrace(model.TraceID{Low: 1})
require.NoError(t, err)
require.NotNil(t, trace)

expectedSpans, err := r.reader.collectSpans(hits)
require.NoError(t, err)

assert.EqualValues(t, trace.Spans[0], expectedSpans[0])
})
}

func TestSpanReader_GetTraceQueryError(t *testing.T) {
withSpanReader(func(r *spanReaderTest) {
mockSearchService(r).
Expand Down Expand Up @@ -408,18 +438,18 @@ func TestSpanReader_FindTraces(t *testing.T) {
},
StartTimeMin: time.Now().Add(-1 * time.Hour),
StartTimeMax: time.Now(),
NumTraces: 2,
NumTraces: 1,
}

traces, err := r.reader.FindTraces(traceQuery)
require.NoError(t, err)
assert.Len(t, traces, 2)
assert.Len(t, traces, 1)

trace := traces[0]
expectedSpans, err := r.reader.collectSpans(hits)
require.NoError(t, err)

require.Len(t, trace.Spans, 1)
require.Len(t, trace.Spans, 2)
assert.EqualValues(t, trace.Spans[0], expectedSpans[0])
})
}
Expand Down
41 changes: 40 additions & 1 deletion plugin/storage/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,28 @@ func (s *StorageIntegration) testGetServices(t *testing.T) {
}
}

func (s *StorageIntegration) testGetLargeSpan(t *testing.T) {

defer s.cleanUp(t)

t.Log("Testing Large Trace over 10K ...")
expected := s.loadParseAndWriteLargeTrace(t)
expectedTraceID := expected.Spans[0].TraceID
s.refresh(t)

var actual *model.Trace
for i := 0; i < 1; i++ {
s.logger.Info(fmt.Sprintf(waitForBackendComment, i+1, iterations))
var err error
actual, err = s.SpanReader.GetTrace(expectedTraceID)
if err == nil && len(actual.Spans) == len(expected.Spans) {
break
}
time.Sleep(100 * time.Millisecond) // Will wait up to 3 seconds at worst.
}
CompareTraces(t, expected, actual)
}

func (s *StorageIntegration) testGetOperations(t *testing.T) {
defer s.cleanUp(t)

Expand Down Expand Up @@ -146,7 +168,6 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) {
}
time.Sleep(100 * time.Millisecond) // Will wait up to 3 seconds at worst.
}

CompareTraces(t, expected, actual)
}

Expand Down Expand Up @@ -223,6 +244,23 @@ func (s *StorageIntegration) loadParseAndWriteExampleTrace(t *testing.T) *model.
return trace
}

func (s *StorageIntegration) loadParseAndWriteLargeTrace(t *testing.T) *model.Trace {
trace := getTraceFixture(t, "example_trace")
span := trace.Spans[0]
spns := make([]*model.Span, 1, 10008)
trace.Spans = spns
trace.Spans[0] = span
for i := 1; i < 10008; i++ {
s := new(model.Span)
*s = *span
s.StartTime = s.StartTime.Add(time.Second * time.Duration(i+1))
trace.Spans = append(trace.Spans, s)
}
err := s.writeTrace(t, trace)
require.NoError(t, err, "Not expecting error when writing example_trace to storage")
return trace
}

func getTraceFixture(t *testing.T, fixture string) *model.Trace {
var trace model.Trace
fileName := fmt.Sprintf("fixtures/traces/%s.json", fixture)
Expand Down Expand Up @@ -310,6 +348,7 @@ func (s *StorageIntegration) IntegrationTestAll(t *testing.T) {
t.Run("GetServices", s.testGetServices)
t.Run("GetOperations", s.testGetOperations)
t.Run("GetTrace", s.testGetTrace)
t.Run("GetLargeSpans", s.testGetLargeSpan)
t.Run("FindTraces", s.testFindTraces)
t.Run("GetDependencies", s.testGetDependencies)
}

0 comments on commit 2479487

Please sign in to comment.