Skip to content

Commit

Permalink
Fix logging in the integration tests
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Shkuro <ys@uber.com>
  • Loading branch information
Yuri Shkuro committed Aug 13, 2018
1 parent c77bb1d commit c1f3b4b
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 57 deletions.
2 changes: 2 additions & 0 deletions plugin/storage/integration/domain_trace_compare_test.go
Expand Up @@ -47,6 +47,8 @@ func CompareTraces(t *testing.T, expected *model.Trace, actual *model.Trace) {
require.Nil(t, actual.Spans)
return
}
require.NotNil(t, actual)
require.NotNil(t, actual.Spans)
model.SortTrace(expected)
model.SortTrace(actual)
checkSize(t, expected, actual)
Expand Down
10 changes: 6 additions & 4 deletions plugin/storage/integration/elasticsearch_test.go
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"gopkg.in/olivere/elastic.v5"

"github.com/jaegertracing/jaeger/pkg/es"
Expand All @@ -42,9 +43,11 @@ const (
)

type ESStorageIntegration struct {
client *elastic.Client
StorageIntegration

client *elastic.Client
bulkProcessor *elastic.BulkProcessor
logger *zap.Logger
}

func (s *ESStorageIntegration) initializeES() error {
Expand All @@ -55,14 +58,13 @@ func (s *ESStorageIntegration) initializeES() error {
if err != nil {
return err
}
logger, _ := testutils.NewLogger()
s.logger, _ = testutils.NewLogger()

s.client = rawClient
s.logger = logger

s.bulkProcessor, _ = s.client.BulkProcessor().Do(context.Background())
client := es.WrapESClient(s.client, s.bulkProcessor)
dependencyStore := dependencystore.NewDependencyStore(client, logger)
dependencyStore := dependencystore.NewDependencyStore(client, s.logger)
s.DependencyReader = dependencyStore
s.DependencyWriter = dependencyStore
s.initSpanstore()
Expand Down
92 changes: 44 additions & 48 deletions plugin/storage/integration/integration_test.go
Expand Up @@ -29,16 +29,14 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

const (
iterations = 30
waitForBackendComment = "Waiting for storage backend to update documents, iteration %d out of %d"
iterations = 30
)

func TestParseAllFixtures(t *testing.T) {
Expand All @@ -57,18 +55,16 @@ func TestParseAllFixtures(t *testing.T) {
}

type StorageIntegration struct {
// TODO make these public
logger *zap.Logger
SpanWriter spanstore.Writer
SpanReader spanstore.Reader
DependencyWriter dependencystore.Writer
DependencyReader dependencystore.Reader

// cleanUp() should ensure that the storage backend is clean before another test.
// CleanUp() should ensure that the storage backend is clean before another test.
// called either before or after each test, and should be idempotent
CleanUp func() error

// refresh() should ensure that the storage backend is up to date before being queried.
// Refresh() should ensure that the storage backend is up to date before being queried.
// called between set-up and queries in each test
Refresh func() error
}
Expand Down Expand Up @@ -98,25 +94,30 @@ func (s *StorageIntegration) refresh(t *testing.T) {
require.NoError(t, s.Refresh())
}

func (s *StorageIntegration) waitForCondition(t *testing.T, predicate func(t *testing.T) bool) bool {
for i := 0; i < iterations; i++ {
t.Logf("Waiting for storage backend to update documents, iteration %d out of %d", i+1, iterations)
if predicate(t) {
return true
}
time.Sleep(100 * time.Millisecond) // Will wait up to 3 seconds at worst.
}
return predicate(t)
}

func (s *StorageIntegration) testGetServices(t *testing.T) {
defer s.cleanUp(t)

expected := []string{"example-service-1", "example-service-2", "example-service-3"}
s.loadParseAndWriteExampleTrace(t)
s.refresh(t)

var found bool

var actual []string
for i := 0; i < iterations; i++ {
s.logger.Info(fmt.Sprintf(waitForBackendComment, i+1, iterations))
found := s.waitForCondition(t, func(t *testing.T) bool {
actual, err := s.SpanReader.GetServices()
require.NoError(t, err)
if found = assert.ObjectsAreEqualValues(expected, actual); found {
break
}
time.Sleep(100 * time.Millisecond) // Will wait up to 3 seconds at worst.
}
return assert.ObjectsAreEqualValues(expected, actual)
})

if !assert.True(t, found) {
t.Log("\t Expected:", expected)
Expand All @@ -125,7 +126,6 @@ func (s *StorageIntegration) testGetServices(t *testing.T) {
}

func (s *StorageIntegration) testGetLargeSpan(t *testing.T) {

defer s.cleanUp(t)

t.Log("Testing Large Trace over 10K ...")
Expand All @@ -134,16 +134,14 @@ func (s *StorageIntegration) testGetLargeSpan(t *testing.T) {
s.refresh(t)

var actual *model.Trace
for i := 0; i < 1; i++ {
s.logger.Info(fmt.Sprintf(waitForBackendComment, i+1, iterations))
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
actual, err = s.SpanReader.GetTrace(expectedTraceID)
if err == nil && len(actual.Spans) == len(expected.Spans) {
break
}
time.Sleep(100 * time.Millisecond) // Will wait up to 3 seconds at worst.
return err == nil && len(actual.Spans) == len(expected.Spans)
})
if !assert.True(t, found) {
CompareTraces(t, expected, actual)
}
CompareTraces(t, expected, actual)
}

func (s *StorageIntegration) testGetOperations(t *testing.T) {
Expand All @@ -153,17 +151,13 @@ func (s *StorageIntegration) testGetOperations(t *testing.T) {
s.loadParseAndWriteExampleTrace(t)
s.refresh(t)

var found bool
var actual []string
for i := 0; i < iterations; i++ {
s.logger.Info(fmt.Sprintf(waitForBackendComment, i+1, iterations))
actual, err := s.SpanReader.GetOperations("example-service-1")
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
actual, err = s.SpanReader.GetOperations("example-service-1")
require.NoError(t, err)
if found = assert.ObjectsAreEqualValues(expected, actual); found {
break
}
time.Sleep(100 * time.Millisecond) // Will wait up to 3 seconds at worst.
}
return assert.ObjectsAreEqualValues(expected, actual)
})

if !assert.True(t, found) {
t.Log("\t Expected:", expected)
Expand All @@ -179,20 +173,21 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) {
s.refresh(t)

var actual *model.Trace
for i := 0; i < iterations; i++ {
s.logger.Info(fmt.Sprintf(waitForBackendComment, i+1, iterations))
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
actual, err = s.SpanReader.GetTrace(expectedTraceID)
if err == nil && len(actual.Spans) == len(expected.Spans) {
break
if err != nil {
t.Log(err)
}
time.Sleep(100 * time.Millisecond) // Will wait up to 3 seconds at worst.
return err == nil && len(actual.Spans) == len(expected.Spans)
})
if !assert.True(t, found) {
CompareTraces(t, expected, actual)
}
CompareTraces(t, expected, actual)
}

func (s *StorageIntegration) testFindTraces(t *testing.T) {
defer s.CleanUp()
defer s.cleanUp(t)

// Note: all cases include ServiceName + StartTime range
queryTestCases := loadAndParseQueryTestCases(t)
Expand Down Expand Up @@ -226,17 +221,18 @@ func (s *StorageIntegration) testFindTraces(t *testing.T) {
}

func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *spanstore.TraceQueryParameters, expected []*model.Trace) []*model.Trace {
for i := 0; i < iterations; i++ {
s.logger.Info(fmt.Sprintf(waitForBackendComment, i+1, iterations))
traces, err := s.SpanReader.FindTraces(query)
var traces []*model.Trace
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
traces, err = s.SpanReader.FindTraces(query)
if err == nil && tracesMatch(t, traces, expected) {
return traces
return true
}
t.Logf("FindTraces: expected: %d, actual: %d, match: false", len(expected), len(traces))
time.Sleep(100 * time.Millisecond)
}
t.Fatal("Failed to find expected traces")
return nil
return false
})
require.True(t, found)
return traces
}

func (s *StorageIntegration) writeTraces(t *testing.T, traces []*model.Trace) error {
Expand Down
7 changes: 4 additions & 3 deletions plugin/storage/integration/kafka_test.go
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/ingester/app"
"github.com/jaegertracing/jaeger/cmd/ingester/app/builder"
Expand All @@ -37,11 +38,11 @@ const defaultLocalKafkaBroker = "127.0.0.1:9092"

type KafkaIntegrationTestSuite struct {
StorageIntegration
logger *zap.Logger
}

func (s *KafkaIntegrationTestSuite) initialize() error {
logger, _ := testutils.NewLogger()
s.logger = logger
s.logger, _ = testutils.NewLogger()
// A new topic is generated per execution to avoid data overlap
topic := "jaeger-kafka-integration-test-" + strconv.FormatInt(time.Now().UnixNano(), 10)

Expand Down Expand Up @@ -77,7 +78,7 @@ func (s *KafkaIntegrationTestSuite) initialize() error {
options := app.Options{}
options.InitFromViper(v)
traceStore := memory.NewStore()
spanConsumer, err := builder.CreateConsumer(logger, metrics.NullFactory, traceStore, options)
spanConsumer, err := builder.CreateConsumer(s.logger, metrics.NullFactory, traceStore, options)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions plugin/storage/integration/memstore_test.go
Expand Up @@ -18,18 +18,19 @@ import (
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
)

type MemStorageIntegrationTestSuite struct {
StorageIntegration
logger *zap.Logger
}

func (s *MemStorageIntegrationTestSuite) initialize() error {
logger, _ := testutils.NewLogger()
s.logger = logger
s.logger, _ = testutils.NewLogger()

store := memory.NewStore()
s.SpanReader = store
Expand Down

0 comments on commit c1f3b4b

Please sign in to comment.