diff --git a/plugin/storage/integration/domain_trace_compare_test.go b/plugin/storage/integration/domain_trace_compare_test.go index 14469290fb8..3e880f7e00d 100644 --- a/plugin/storage/integration/domain_trace_compare_test.go +++ b/plugin/storage/integration/domain_trace_compare_test.go @@ -47,6 +47,8 @@ func CompareTraces(t *testing.T, expected *model.Trace, actual *model.Trace) { require.Nil(t, actual.Spans) return } + require.NotNil(t, actual) + require.NotNil(t, actual.Spans) model.SortTrace(expected) model.SortTrace(actual) checkSize(t, expected, actual) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 5a8cf96ae3d..6fd09bf03c0 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -24,6 +24,7 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/require" "github.com/uber/jaeger-lib/metrics" + "go.uber.org/zap" "gopkg.in/olivere/elastic.v5" "github.com/jaegertracing/jaeger/pkg/es" @@ -42,9 +43,11 @@ const ( ) type ESStorageIntegration struct { - client *elastic.Client StorageIntegration + + client *elastic.Client bulkProcessor *elastic.BulkProcessor + logger *zap.Logger } func (s *ESStorageIntegration) initializeES() error { @@ -55,14 +58,13 @@ func (s *ESStorageIntegration) initializeES() error { if err != nil { return err } - logger, _ := testutils.NewLogger() + s.logger, _ = testutils.NewLogger() s.client = rawClient - s.logger = logger s.bulkProcessor, _ = s.client.BulkProcessor().Do(context.Background()) client := es.WrapESClient(s.client, s.bulkProcessor) - dependencyStore := dependencystore.NewDependencyStore(client, logger) + dependencyStore := dependencystore.NewDependencyStore(client, s.logger) s.DependencyReader = dependencyStore s.DependencyWriter = dependencyStore s.initSpanstore() diff --git a/plugin/storage/integration/integration_test.go b/plugin/storage/integration/integration_test.go index 41cb25e470e..1700e55e1b9 100644 --- a/plugin/storage/integration/integration_test.go +++ b/plugin/storage/integration/integration_test.go @@ -29,7 +29,6 @@ import ( "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/storage/dependencystore" @@ -37,8 +36,7 @@ import ( ) const ( - iterations = 30 - waitForBackendComment = "Waiting for storage backend to update documents, iteration %d out of %d" + iterations = 30 ) func TestParseAllFixtures(t *testing.T) { @@ -57,18 +55,16 @@ func TestParseAllFixtures(t *testing.T) { } type StorageIntegration struct { - // TODO make these public - logger *zap.Logger SpanWriter spanstore.Writer SpanReader spanstore.Reader DependencyWriter dependencystore.Writer DependencyReader dependencystore.Reader - // cleanUp() should ensure that the storage backend is clean before another test. + // CleanUp() should ensure that the storage backend is clean before another test. // called either before or after each test, and should be idempotent CleanUp func() error - // refresh() should ensure that the storage backend is up to date before being queried. + // Refresh() should ensure that the storage backend is up to date before being queried. // called between set-up and queries in each test Refresh func() error } @@ -98,6 +94,17 @@ func (s *StorageIntegration) refresh(t *testing.T) { require.NoError(t, s.Refresh()) } +func (s *StorageIntegration) waitForCondition(t *testing.T, predicate func(t *testing.T) bool) bool { + for i := 0; i < iterations; i++ { + t.Logf("Waiting for storage backend to update documents, iteration %d out of %d", i+1, iterations) + if predicate(t) { + return true + } + time.Sleep(100 * time.Millisecond) // Will wait up to 3 seconds at worst. + } + return predicate(t) +} + func (s *StorageIntegration) testGetServices(t *testing.T) { defer s.cleanUp(t) @@ -105,18 +112,12 @@ func (s *StorageIntegration) testGetServices(t *testing.T) { s.loadParseAndWriteExampleTrace(t) s.refresh(t) - var found bool - var actual []string - for i := 0; i < iterations; i++ { - s.logger.Info(fmt.Sprintf(waitForBackendComment, i+1, iterations)) + found := s.waitForCondition(t, func(t *testing.T) bool { actual, err := s.SpanReader.GetServices() require.NoError(t, err) - if found = assert.ObjectsAreEqualValues(expected, actual); found { - break - } - time.Sleep(100 * time.Millisecond) // Will wait up to 3 seconds at worst. - } + return assert.ObjectsAreEqualValues(expected, actual) + }) if !assert.True(t, found) { t.Log("\t Expected:", expected) @@ -125,7 +126,6 @@ func (s *StorageIntegration) testGetServices(t *testing.T) { } func (s *StorageIntegration) testGetLargeSpan(t *testing.T) { - defer s.cleanUp(t) t.Log("Testing Large Trace over 10K ...") @@ -134,16 +134,14 @@ func (s *StorageIntegration) testGetLargeSpan(t *testing.T) { s.refresh(t) var actual *model.Trace - for i := 0; i < 1; i++ { - s.logger.Info(fmt.Sprintf(waitForBackendComment, i+1, iterations)) + found := s.waitForCondition(t, func(t *testing.T) bool { var err error actual, err = s.SpanReader.GetTrace(expectedTraceID) - if err == nil && len(actual.Spans) == len(expected.Spans) { - break - } - time.Sleep(100 * time.Millisecond) // Will wait up to 3 seconds at worst. + return err == nil && len(actual.Spans) == len(expected.Spans) + }) + if !assert.True(t, found) { + CompareTraces(t, expected, actual) } - CompareTraces(t, expected, actual) } func (s *StorageIntegration) testGetOperations(t *testing.T) { @@ -153,17 +151,13 @@ func (s *StorageIntegration) testGetOperations(t *testing.T) { s.loadParseAndWriteExampleTrace(t) s.refresh(t) - var found bool var actual []string - for i := 0; i < iterations; i++ { - s.logger.Info(fmt.Sprintf(waitForBackendComment, i+1, iterations)) - actual, err := s.SpanReader.GetOperations("example-service-1") + found := s.waitForCondition(t, func(t *testing.T) bool { + var err error + actual, err = s.SpanReader.GetOperations("example-service-1") require.NoError(t, err) - if found = assert.ObjectsAreEqualValues(expected, actual); found { - break - } - time.Sleep(100 * time.Millisecond) // Will wait up to 3 seconds at worst. - } + return assert.ObjectsAreEqualValues(expected, actual) + }) if !assert.True(t, found) { t.Log("\t Expected:", expected) @@ -179,20 +173,21 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) { s.refresh(t) var actual *model.Trace - for i := 0; i < iterations; i++ { - s.logger.Info(fmt.Sprintf(waitForBackendComment, i+1, iterations)) + found := s.waitForCondition(t, func(t *testing.T) bool { var err error actual, err = s.SpanReader.GetTrace(expectedTraceID) - if err == nil && len(actual.Spans) == len(expected.Spans) { - break + if err != nil { + t.Log(err) } - time.Sleep(100 * time.Millisecond) // Will wait up to 3 seconds at worst. + return err == nil && len(actual.Spans) == len(expected.Spans) + }) + if !assert.True(t, found) { + CompareTraces(t, expected, actual) } - CompareTraces(t, expected, actual) } func (s *StorageIntegration) testFindTraces(t *testing.T) { - defer s.CleanUp() + defer s.cleanUp(t) // Note: all cases include ServiceName + StartTime range queryTestCases := loadAndParseQueryTestCases(t) @@ -226,17 +221,18 @@ func (s *StorageIntegration) testFindTraces(t *testing.T) { } func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *spanstore.TraceQueryParameters, expected []*model.Trace) []*model.Trace { - for i := 0; i < iterations; i++ { - s.logger.Info(fmt.Sprintf(waitForBackendComment, i+1, iterations)) - traces, err := s.SpanReader.FindTraces(query) + var traces []*model.Trace + found := s.waitForCondition(t, func(t *testing.T) bool { + var err error + traces, err = s.SpanReader.FindTraces(query) if err == nil && tracesMatch(t, traces, expected) { - return traces + return true } t.Logf("FindTraces: expected: %d, actual: %d, match: false", len(expected), len(traces)) - time.Sleep(100 * time.Millisecond) - } - t.Fatal("Failed to find expected traces") - return nil + return false + }) + require.True(t, found) + return traces } func (s *StorageIntegration) writeTraces(t *testing.T, traces []*model.Trace) error { diff --git a/plugin/storage/integration/kafka_test.go b/plugin/storage/integration/kafka_test.go index 8a78cda8318..0d8c539bfbb 100644 --- a/plugin/storage/integration/kafka_test.go +++ b/plugin/storage/integration/kafka_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/require" "github.com/uber/jaeger-lib/metrics" + "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/ingester/app" "github.com/jaegertracing/jaeger/cmd/ingester/app/builder" @@ -37,11 +38,11 @@ const defaultLocalKafkaBroker = "127.0.0.1:9092" type KafkaIntegrationTestSuite struct { StorageIntegration + logger *zap.Logger } func (s *KafkaIntegrationTestSuite) initialize() error { - logger, _ := testutils.NewLogger() - s.logger = logger + s.logger, _ = testutils.NewLogger() // A new topic is generated per execution to avoid data overlap topic := "jaeger-kafka-integration-test-" + strconv.FormatInt(time.Now().UnixNano(), 10) @@ -77,7 +78,7 @@ func (s *KafkaIntegrationTestSuite) initialize() error { options := app.Options{} options.InitFromViper(v) traceStore := memory.NewStore() - spanConsumer, err := builder.CreateConsumer(logger, metrics.NullFactory, traceStore, options) + spanConsumer, err := builder.CreateConsumer(s.logger, metrics.NullFactory, traceStore, options) if err != nil { return err } diff --git a/plugin/storage/integration/memstore_test.go b/plugin/storage/integration/memstore_test.go index 992b0bca7f1..4d813dd382a 100644 --- a/plugin/storage/integration/memstore_test.go +++ b/plugin/storage/integration/memstore_test.go @@ -18,6 +18,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/memory" @@ -25,11 +26,11 @@ import ( type MemStorageIntegrationTestSuite struct { StorageIntegration + logger *zap.Logger } func (s *MemStorageIntegrationTestSuite) initialize() error { - logger, _ := testutils.NewLogger() - s.logger = logger + s.logger, _ = testutils.NewLogger() store := memory.NewStore() s.SpanReader = store