Skip to content

Commit

Permalink
Fix race conditions in push_test.go (#340)
Browse files Browse the repository at this point in the history
* Fix race conditions in push_test.go

* Add circle race test
  • Loading branch information
jmacd committed Nov 22, 2019
1 parent b9706b2 commit 799a418
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 19 deletions.
10 changes: 9 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ test-with-coverage:
done

.PHONY: circle-ci
circle-ci: precommit test-clean-work-tree test-with-coverage test-386 examples
circle-ci: precommit test-clean-work-tree test-with-coverage test-race test-386 examples

.PHONY: test-clean-work-tree
test-clean-work-tree:
Expand All @@ -79,6 +79,14 @@ test: examples
$(GOTEST) $(GOTEST_OPT_WITH_RACE) ./...); \
done

.PHONY: test-race
test-race:
set -e; for dir in $(ALL_GO_MOD_DIRS); do \
echo "go test -race ./... in $${dir}"; \
(cd "$${dir}" && \
$(GOTEST) $(GOTEST_OPT_WITH_RACE) ./...); \
done

.PHONY: test-386
test-386:
set -e; for dir in $(ALL_GO_MOD_DIRS); do \
Expand Down
72 changes: 54 additions & 18 deletions sdk/metric/controller/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"runtime"
"sync"
"testing"
"time"

Expand All @@ -34,13 +35,15 @@ import (

type testBatcher struct {
t *testing.T
lock sync.Mutex
checkpointSet *test.CheckpointSet
checkpoints int
finishes int
}

type testExporter struct {
t *testing.T
lock sync.Mutex
exports int
records []export.Record
retErr error
Expand Down Expand Up @@ -85,27 +88,49 @@ func (b *testBatcher) AggregatorFor(*export.Descriptor) export.Aggregator {
}

func (b *testBatcher) CheckpointSet() export.CheckpointSet {
b.lock.Lock()
defer b.lock.Unlock()
b.checkpoints++
return b.checkpointSet
}

func (b *testBatcher) FinishedCollection() {
b.lock.Lock()
defer b.lock.Unlock()
b.finishes++
}

func (b *testBatcher) Process(_ context.Context, record export.Record) error {
b.lock.Lock()
defer b.lock.Unlock()
b.checkpointSet.Add(record.Descriptor(), record.Aggregator(), record.Labels().Ordered()...)
return nil
}

func (b *testBatcher) getCounts() (checkpoints, finishes int) {
b.lock.Lock()
defer b.lock.Unlock()
return b.checkpoints, b.finishes
}

func (e *testExporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
e.lock.Lock()
defer e.lock.Unlock()
e.exports++
checkpointSet.ForEach(func(r export.Record) {
e.records = append(e.records, r)
})
return e.retErr
}

func (e *testExporter) resetRecords() ([]export.Record, int) {
e.lock.Lock()
defer e.lock.Unlock()
r := e.records
e.records = nil
return r, e.exports
}

func (c mockClock) Now() time.Time {
return c.mock.Now()
}
Expand Down Expand Up @@ -159,39 +184,44 @@ func TestPushTicker(t *testing.T) {

counter.Add(ctx, 3, meter.Labels())

require.Equal(t, 0, fix.batcher.checkpoints)
require.Equal(t, 0, fix.batcher.finishes)
require.Equal(t, 0, fix.exporter.exports)
require.Equal(t, 0, len(fix.exporter.records))
records, exports := fix.exporter.resetRecords()
checkpoints, finishes := fix.batcher.getCounts()
require.Equal(t, 0, checkpoints)
require.Equal(t, 0, finishes)
require.Equal(t, 0, exports)
require.Equal(t, 0, len(records))

mock.Add(time.Second)
runtime.Gosched()

require.Equal(t, 1, fix.batcher.checkpoints)
require.Equal(t, 1, fix.exporter.exports)
require.Equal(t, 1, fix.batcher.finishes)
require.Equal(t, 1, len(fix.exporter.records))
require.Equal(t, "counter", fix.exporter.records[0].Descriptor().Name())
records, exports = fix.exporter.resetRecords()
checkpoints, finishes = fix.batcher.getCounts()
require.Equal(t, 1, checkpoints)
require.Equal(t, 1, finishes)
require.Equal(t, 1, exports)
require.Equal(t, 1, len(records))
require.Equal(t, "counter", records[0].Descriptor().Name())

sum, err := fix.exporter.records[0].Aggregator().(aggregator.Sum).Sum()
sum, err := records[0].Aggregator().(aggregator.Sum).Sum()
require.Equal(t, int64(3), sum.AsInt64())
require.Nil(t, err)

fix.checkpointSet.Reset()
fix.exporter.records = nil

counter.Add(ctx, 7, meter.Labels())

mock.Add(time.Second)
runtime.Gosched()

require.Equal(t, 2, fix.batcher.checkpoints)
require.Equal(t, 2, fix.batcher.finishes)
require.Equal(t, 2, fix.exporter.exports)
require.Equal(t, 1, len(fix.exporter.records))
require.Equal(t, "counter", fix.exporter.records[0].Descriptor().Name())
records, exports = fix.exporter.resetRecords()
checkpoints, finishes = fix.batcher.getCounts()
require.Equal(t, 2, checkpoints)
require.Equal(t, 2, finishes)
require.Equal(t, 2, exports)
require.Equal(t, 1, len(records))
require.Equal(t, "counter", records[0].Descriptor().Name())

sum, err = fix.exporter.records[0].Aggregator().(aggregator.Sum).Sum()
sum, err = records[0].Aggregator().(aggregator.Sum).Sum()
require.Equal(t, int64(7), sum.AsInt64())
require.Nil(t, err)

Expand All @@ -205,7 +235,10 @@ func TestPushExportError(t *testing.T) {
p := push.New(fix.batcher, fix.exporter, time.Second)

var err error
var lock sync.Mutex
p.SetErrorHandler(func(sdkErr error) {
lock.Lock()
defer lock.Unlock()
err = sdkErr
})

Expand All @@ -221,9 +254,12 @@ func TestPushExportError(t *testing.T) {
mock.Add(time.Second)
runtime.Gosched()

require.Equal(t, 1, fix.exporter.exports)
lock.Lock()
_, exports := fix.batcher.getCounts()
require.Equal(t, 1, exports)
require.Error(t, err)
require.Equal(t, fix.exporter.retErr, err)
lock.Unlock()

p.Stop()
}

0 comments on commit 799a418

Please sign in to comment.