diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 847b2e076cd..0bde22e4899 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -205,39 +205,76 @@ func (s *SpanReader) FindTraces(traceQuery *spanstore.TraceQueryParameters) ([]* } func (s *SpanReader) multiRead(traceIDs []string, startTime, endTime time.Time) ([]*model.Trace, error) { + if len(traceIDs) == 0 { return []*model.Trace{}, nil } searchRequests := make([]*elastic.SearchRequest, len(traceIDs)) - for i, traceID := range traceIDs { - query := elastic.NewTermQuery("traceID", traceID) - searchRequests[i] = elastic.NewSearchRequest().IgnoreUnavailable(true).Type("span"). - Source(elastic.NewSearchSource().Query(query).Size(defaultDocCount)) - } - + var traces []*model.Trace // Add an hour in both directions so that traces that straddle two indexes are retrieved. - // ie starts in one and ends in another. + // i.e starts in one and ends in another. indices := findIndices(spanIndexPrefix, startTime.Add(-time.Hour), endTime.Add(time.Hour)) - results, err := s.client.MultiSearch(). - Add(searchRequests...). - Index(indices...). - Do(s.ctx) - if err != nil { - return nil, err - } + nextTime := model.TimeAsEpochMicroseconds(startTime.Add(-time.Hour)) - var traces []*model.Trace - for _, result := range results.Responses { - if result.Hits == nil || len(result.Hits.Hits) == 0 { - continue + searchAfterTime := make(map[string]uint64) + totalDocumentsFetched := make(map[string]int) + tracesMap := make(map[string]*model.Trace) + for { + if traceIDs == nil || len(traceIDs) == 0 { + break } - spans, err := s.collectSpans(result.Hits.Hits) + + for i, traceID := range traceIDs { + query := elastic.NewTermQuery("traceID", traceID) + if val, ok := searchAfterTime[traceID]; ok { + nextTime = val + } + searchRequests[i] = elastic.NewSearchRequest().IgnoreUnavailable(true).Type("span").Source(elastic.NewSearchSource().Query(query).Size(defaultDocCount).Sort("startTime", true).SearchAfter(nextTime)) + } + // set traceIDs to empty + traceIDs = nil + results, err := s.client.MultiSearch().Add(searchRequests...).Index(indices...).Do(s.ctx) + if err != nil { return nil, err } - traces = append(traces, &model.Trace{Spans: spans}) + + if results.Responses == nil || len(results.Responses) == 0 { + break + } + + for _, result := range results.Responses { + if result.Hits == nil || len(result.Hits.Hits) == 0 { + continue + } + spans, err := s.collectSpans(result.Hits.Hits) + if err != nil { + return nil, err + } + lastSpan := spans[len(spans)-1] + lastSpanTraceID := lastSpan.TraceID.String() + + if traceSpan, ok := tracesMap[lastSpanTraceID]; ok { + for _, span := range spans { + traceSpan.Spans = append(traceSpan.Spans, span) + } + + } else { + tracesMap[lastSpanTraceID] = &model.Trace{Spans: spans} + } + + totalDocumentsFetched[lastSpanTraceID] = totalDocumentsFetched[lastSpanTraceID] + len(result.Hits.Hits) + if totalDocumentsFetched[lastSpanTraceID] < int(result.TotalHits()) { + traceIDs = append(traceIDs, lastSpanTraceID) + searchAfterTime[lastSpanTraceID] = model.TimeAsEpochMicroseconds(lastSpan.StartTime) + } + } + } + + for _, trace := range tracesMap { + traces = append(traces, trace) } return traces, nil } diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index d71edf65c03..95a27e8c784 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -130,6 +130,36 @@ func TestSpanReader_GetTrace(t *testing.T) { }) } +func TestSpanReader_SearchAfter(t *testing.T) { + withSpanReader(func(r *spanReaderTest) { + var hits []*elastic.SearchHit + + for i := 0; i < 10000; i++ { + hit := &elastic.SearchHit{Source: (*json.RawMessage)(&exampleESSpan)} + hits = append(hits, hit) + } + + searchHits := &elastic.SearchHits{Hits: hits, TotalHits: int64(10040)} + + mockSearchService(r).Return(&elastic.SearchResult{Hits: searchHits}, nil) + mockMultiSearchService(r). + Return(&elastic.MultiSearchResult{ + Responses: []*elastic.SearchResult{ + {Hits: searchHits}, + }, + }, nil).Times(2) + + trace, err := r.reader.GetTrace(model.TraceID{Low: 1}) + require.NoError(t, err) + require.NotNil(t, trace) + + expectedSpans, err := r.reader.collectSpans(hits) + require.NoError(t, err) + + assert.EqualValues(t, trace.Spans[0], expectedSpans[0]) + }) +} + func TestSpanReader_GetTraceQueryError(t *testing.T) { withSpanReader(func(r *spanReaderTest) { mockSearchService(r). @@ -408,18 +438,18 @@ func TestSpanReader_FindTraces(t *testing.T) { }, StartTimeMin: time.Now().Add(-1 * time.Hour), StartTimeMax: time.Now(), - NumTraces: 2, + NumTraces: 1, } traces, err := r.reader.FindTraces(traceQuery) require.NoError(t, err) - assert.Len(t, traces, 2) + assert.Len(t, traces, 1) trace := traces[0] expectedSpans, err := r.reader.collectSpans(hits) require.NoError(t, err) - require.Len(t, trace.Spans, 1) + require.Len(t, trace.Spans, 2) assert.EqualValues(t, trace.Spans[0], expectedSpans[0]) }) } diff --git a/plugin/storage/integration/integration_test.go b/plugin/storage/integration/integration_test.go index 603416e6f19..3f603d9ef86 100644 --- a/plugin/storage/integration/integration_test.go +++ b/plugin/storage/integration/integration_test.go @@ -104,6 +104,28 @@ func (s *StorageIntegration) testGetServices(t *testing.T) { } } +func (s *StorageIntegration) testGetLargeSpan(t *testing.T) { + + defer s.cleanUp(t) + + t.Log("Testing Large Trace over 10K ...") + expected := s.loadParseAndWriteLargeTrace(t) + expectedTraceID := expected.Spans[0].TraceID + s.refresh(t) + + var actual *model.Trace + for i := 0; i < 1; i++ { + s.logger.Info(fmt.Sprintf(waitForBackendComment, i+1, iterations)) + var err error + actual, err = s.SpanReader.GetTrace(expectedTraceID) + if err == nil && len(actual.Spans) == len(expected.Spans) { + break + } + time.Sleep(100 * time.Millisecond) // Will wait up to 3 seconds at worst. + } + CompareTraces(t, expected, actual) +} + func (s *StorageIntegration) testGetOperations(t *testing.T) { defer s.cleanUp(t) @@ -146,7 +168,6 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) { } time.Sleep(100 * time.Millisecond) // Will wait up to 3 seconds at worst. } - CompareTraces(t, expected, actual) } @@ -223,6 +244,23 @@ func (s *StorageIntegration) loadParseAndWriteExampleTrace(t *testing.T) *model. return trace } +func (s *StorageIntegration) loadParseAndWriteLargeTrace(t *testing.T) *model.Trace { + trace := getTraceFixture(t, "example_trace") + span := trace.Spans[0] + spns := make([]*model.Span, 1, 10008) + trace.Spans = spns + trace.Spans[0] = span + for i := 1; i < 10008; i++ { + s := new(model.Span) + *s = *span + s.StartTime = s.StartTime.Add(time.Second * time.Duration(i+1)) + trace.Spans = append(trace.Spans, s) + } + err := s.writeTrace(t, trace) + require.NoError(t, err, "Not expecting error when writing example_trace to storage") + return trace +} + func getTraceFixture(t *testing.T, fixture string) *model.Trace { var trace model.Trace fileName := fmt.Sprintf("fixtures/traces/%s.json", fixture) @@ -310,6 +348,7 @@ func (s *StorageIntegration) IntegrationTestAll(t *testing.T) { t.Run("GetServices", s.testGetServices) t.Run("GetOperations", s.testGetOperations) t.Run("GetTrace", s.testGetTrace) + t.Run("GetLargeSpans", s.testGetLargeSpan) t.Run("FindTraces", s.testFindTraces) t.Run("GetDependencies", s.testGetDependencies) }