Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Fix flaky tests #1564

Merged
merged 4 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 8 additions & 8 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
language: go
go:
- "1.22.x"
- "1.21.x"
- "1.20.x"
go_import_path: github.com/nats-io/nats.go
install:
- go get -t ./...
- curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin
- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then
- if [[ "$TRAVIS_GO_VERSION" =~ 1.22 ]]; then
go install github.com/mattn/goveralls@latest;
go install github.com/wadey/gocovmerge@latest;
go install honnef.co/go/tools/cmd/staticcheck@latest;
Expand All @@ -15,22 +15,22 @@ install:
before_script:
- $(exit $(go fmt ./... | wc -l))
- go vet -modfile=go_test.mod ./...
- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then
- if [[ "$TRAVIS_GO_VERSION" =~ 1.22 ]]; then
find . -type f -name "*.go" | xargs misspell -error -locale US;
GOFLAGS="-mod=mod -modfile=go_test.mod" staticcheck ./...;
fi
- golangci-lint run ./jetstream/...
script:
- go test -modfile=go_test.mod -v -run=TestNoRace -p=1 ./... --failfast -vet=off
- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast -vet=off -tags=internal_testing; fi
- if [[ "$TRAVIS_GO_VERSION" =~ 1.22 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast -vet=off -tags=internal_testing; fi
after_success:
- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then $HOME/gopath/bin/goveralls -coverprofile=acc.out -service travis-ci; fi
- if [[ "$TRAVIS_GO_VERSION" =~ 1.22 ]]; then $HOME/gopath/bin/goveralls -coverprofile=acc.out -service travis-ci; fi

jobs:
include:
- name: "Go: 1.21.x (nats-server@main)"
go: "1.21.x"
- name: "Go: 1.22.x (nats-server@main)"
go: "1.22.x"
before_script:
- go get -modfile go_test.mod github.com/nats-io/nats-server/v2@main
allow_failures:
- name: "Go: 1.21.x (nats-server@main)"
- name: "Go: 1.22.x (nats-server@main)"
10 changes: 5 additions & 5 deletions go_test.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ go 1.19

require (
github.com/golang/protobuf v1.4.2
github.com/klauspost/compress v1.17.4
github.com/nats-io/nats-server/v2 v2.10.9
github.com/klauspost/compress v1.17.6
github.com/nats-io/nats-server/v2 v2.10.11
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
go.uber.org/goleak v1.2.1
go.uber.org/goleak v1.3.0
golang.org/x/text v0.14.0
google.golang.org/protobuf v1.23.0
)

require (
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.5.3 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/time v0.5.0 // indirect
)
20 changes: 10 additions & 10 deletions go_test.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,27 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/compress v1.17.6 h1:60eq2E/jlfwQXtvZEeBUYADs+BwKBWURIY+Gj2eRGjI=
github.com/klauspost/compress v1.17.6/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
github.com/nats-io/nats-server/v2 v2.10.9 h1:VEW43Zz+p+9lARtiPM9ctd6ckun+92ZT2T17HWtwiFI=
github.com/nats-io/nats-server/v2 v2.10.9/go.mod h1:oorGiV9j3BOLLO3ejQe+U7pfAGyPo+ppD7rpgNF6KTQ=
github.com/nats-io/nats-server/v2 v2.10.11 h1:yKUiLVincZISpo3A4YljJQ+HfLltGAgoNNJl99KL8I0=
github.com/nats-io/nats-server/v2 v2.10.11/go.mod h1:dXtOqVWzbMTEj+tUyC/itXjJhW37xh0tUBrTAlqAfx8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
Expand Down
60 changes: 37 additions & 23 deletions jetstream/test/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,35 @@ func TestListKeyValueStores(t *testing.T) {
}

func TestKeyValueMirrorCrossDomains(t *testing.T) {
keyExists := func(t *testing.T, kv jetstream.KeyValue, key string, expected string) jetstream.KeyValueEntry {
var e jetstream.KeyValueEntry
var err error
checkFor(t, 10*time.Second, 10*time.Millisecond, func() error {
e, err = kv.Get(context.Background(), key)
if err != nil {
return err
}
if string(e.Value()) != expected {
return fmt.Errorf("Expected value to be %q, got %q", expected, e.Value())
}
return nil
})

return e
}

keyDeleted := func(t *testing.T, kv jetstream.KeyValue, key string) {
checkFor(t, 10*time.Second, 10*time.Millisecond, func() error {
_, err := kv.Get(context.Background(), key)
if err == nil {
return fmt.Errorf("Expected key to be gone")
}
if !errors.Is(err, jetstream.ErrKeyNotFound) {
return err
}
return nil
})
}
conf := createConfFile(t, []byte(`
server_name: HUB
listen: 127.0.0.1:-1
Expand Down Expand Up @@ -1303,21 +1332,16 @@ func TestKeyValueMirrorCrossDomains(t *testing.T) {

_, err = mkv.PutString(ctx, "v", "vv")
expectOk(t, err)
e, err := mkv.Get(ctx, "v")
expectOk(t, err)

e := keyExists(t, kv, "v", "vv")
if e.Operation() != jetstream.KeyValuePut {
t.Fatalf("Got wrong value: %q vs %q", e.Operation(), nats.KeyValuePut)
}
err = mkv.Delete(ctx, "v")
expectOk(t, err)
_, err = mkv.Get(ctx, "v")
expectErr(t, err, jetstream.ErrKeyNotFound)
keyDeleted(t, kv, "v")

e, err = mkv.Get(ctx, "name")
expectOk(t, err)
if string(e.Value()) != "rip" {
t.Fatalf("Got wrong value: %q vs %q", e.Value(), "rip")
}
keyExists(t, kv, "name", "rip")

// Also make sure we can create a watcher on the mirror KV.
watcher, err := mkv.WatchAll(ctx)
Expand All @@ -1334,31 +1358,21 @@ func TestKeyValueMirrorCrossDomains(t *testing.T) {
_, err = rkv.PutString(ctx, "name", "ivan")
expectOk(t, err)

e, err = rkv.Get(ctx, "name")
expectOk(t, err)
if string(e.Value()) != "ivan" {
t.Fatalf("Got wrong value: %q vs %q", e.Value(), "ivan")
}
keyExists(t, mkv, "name", "ivan")
_, err = rkv.PutString(ctx, "v", "vv")
expectOk(t, err)
e, err = mkv.Get(ctx, "v")
expectOk(t, err)
e = keyExists(t, mkv, "v", "vv")
if e.Operation() != jetstream.KeyValuePut {
t.Fatalf("Got wrong value: %q vs %q", e.Operation(), nats.KeyValuePut)
}
err = rkv.Delete(ctx, "v")
expectOk(t, err)
_, err = rkv.Get(ctx, "v")
expectErr(t, err, jetstream.ErrKeyNotFound)
keyDeleted(t, mkv, "v")

// Shutdown cluster and test get still work.
shutdownJSServerAndRemoveStorage(t, s)

e, err = mkv.Get(ctx, "name")
expectOk(t, err)
if string(e.Value()) != "ivan" {
t.Fatalf("Got wrong value: %q vs %q", e.Value(), "ivan")
}
keyExists(t, mkv, "name", "ivan")
}

func TestKeyValueRePublish(t *testing.T) {
Expand Down
7 changes: 0 additions & 7 deletions jetstream/test/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,13 +974,6 @@ func TestObjectList(t *testing.T) {
t.Fatalf("Expected %+v but got %+v", expected, omap)
}
})

t.Run("context timeout", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
defer cancel()
_, err := root.List(ctx)
expectErr(t, err, context.DeadlineExceeded)
})
}

func TestObjectMaxBytes(t *testing.T) {
Expand Down
11 changes: 8 additions & 3 deletions jetstream/test/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1269,18 +1269,23 @@ func TestPublishMsgAsyncWithPendingMsgs(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

_, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
_, err = js.CreateStream(ctx, jetstream.StreamConfig{
Name: "foo",
Subjects: []string{"FOO.*"},
// disable stream acks
NoAck: true,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

for i := 0; i < 5; i++ {
_, err = js.PublishAsync("FOO.1", []byte("msg"), jetstream.WithStallWait(1*time.Nanosecond))
_, err = js.PublishAsync("FOO.1", []byte("msg"), jetstream.WithStallWait(10*time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
if _, err = js.PublishAsync("FOO.1", []byte("msg"), jetstream.WithStallWait(1*time.Nanosecond)); err == nil || !errors.Is(err, jetstream.ErrTooManyStalledMsgs) {
if _, err = js.PublishAsync("FOO.1", []byte("msg"), jetstream.WithStallWait(10*time.Millisecond)); err == nil || !errors.Is(err, jetstream.ErrTooManyStalledMsgs) {
t.Fatalf("Expected error: %v; got: %v", jetstream.ErrTooManyStalledMsgs, err)
}
})
Expand Down
26 changes: 17 additions & 9 deletions test/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,20 +220,28 @@ func testContextRequestWithCancel(t *testing.T, nc *nats.Conn) {
cancelCB()
})

nc.Subscribe("slow", func(m *nats.Msg) {
sub1, err := nc.Subscribe("slow", func(m *nats.Msg) {
// simulates latency into the client so that timeout is hit.
time.Sleep(40 * time.Millisecond)
nc.Publish(m.Reply, []byte("OK"))
})
nc.Subscribe("slower", func(m *nats.Msg) {
if err != nil {
t.Fatalf("Expected to be able to subscribe: %s", err)
}
defer sub1.Unsubscribe()
sub2, err := nc.Subscribe("slower", func(m *nats.Msg) {
// we know this request will take longer so extend the timeout
expirationTimer.Reset(100 * time.Millisecond)

// slower reply which would have hit original timeout
time.Sleep(90 * time.Millisecond)
time.Sleep(70 * time.Millisecond)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for this change here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test depends on request either timing out or not depending on the circumstances. The expiration timer is reset in each subscribe callback to 100ms - sleeping for 90ms was making it fail quite often if there the subsequent request had 10ms delay. Now it's 30ms "buffer", which makes it less, but still, flaky...

I get why this test is useful but its tough to fix it entirely while retaining the logic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, gotcha. That feels pretty flaky no matter what we set here, seeing faster or slower machines running it...


nc.Publish(m.Reply, []byte("Also OK"))
})
if err != nil {
t.Fatalf("Expected to be able to subscribe: %s", err)
}
defer sub2.Unsubscribe()

for i := 0; i < 2; i++ {
resp, err := nc.RequestWithContext(ctx, "slow", []byte(""))
Expand Down Expand Up @@ -263,7 +271,7 @@ func testContextRequestWithCancel(t *testing.T, nc *nats.Conn) {
}

// One more slow request will expire the timer and cause an error...
_, err := nc.RequestWithContext(ctx, "slow", []byte(""))
_, err = nc.RequestWithContext(ctx, "slow", []byte(""))
if err == nil {
t.Fatal("Expected request with cancellation context to fail")
}
Expand Down Expand Up @@ -1089,12 +1097,12 @@ func TestFlushWithContext(t *testing.T) {
t.Fatalf("Expected '%v', got '%v'", nats.ErrNoDeadlineContext, err)
}

dctx, cancel := context.WithTimeout(ctx, 0)
defer cancel()
dctx, cancel := context.WithTimeout(ctx, 10*time.Second)
cancel()

// A context with a deadline should return when expired.
if err := nc.FlushWithContext(dctx); err != context.DeadlineExceeded {
t.Fatalf("Expected '%v', got '%v'", context.DeadlineExceeded, err)
// A closed context should error.
if err := nc.FlushWithContext(dctx); err != context.Canceled {
t.Fatalf("Expected '%v', got '%v'", context.Canceled, err)
}
}

Expand Down
29 changes: 17 additions & 12 deletions test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,17 +593,22 @@ func TestJetStreamSubscribe(t *testing.T) {

// If we are here we have received all of the messages.
// We hang the ConsumerInfo option off of the subscription, so we use that to check status.
info, _ := sub3.ConsumerInfo()
if info.Config.AckPolicy != nats.AckExplicitPolicy {
t.Fatalf("Expected ack explicit policy, got %q", info.Config.AckPolicy)
}
if info.Delivered.Consumer != uint64(toSend) {
t.Fatalf("Expected to have received all %d messages, got %d", toSend, info.Delivered.Consumer)
}
// Make sure we auto-ack'd
if info.AckFloor.Consumer != uint64(toSend) {
t.Fatalf("Expected to have ack'd all %d messages, got ack floor of %d", toSend, info.AckFloor.Consumer)
}
// We may need to retry this check since the acks sent by the client have to be processed
// on the server.
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
info, _ := sub3.ConsumerInfo()
if info.Config.AckPolicy != nats.AckExplicitPolicy {
t.Fatalf("Expected ack explicit policy, got %q", info.Config.AckPolicy)
}
if info.Delivered.Consumer != uint64(toSend) {
return fmt.Errorf("Expected to have received all %d messages, got %d", toSend, info.Delivered.Consumer)
}
// Make sure we auto-ack'd
if info.AckFloor.Consumer != uint64(toSend) {
return fmt.Errorf("Expected to have ack'd all %d messages, got ack floor of %d", toSend, info.AckFloor.Consumer)
}
return nil
})
sub3.Unsubscribe()
sub2.Unsubscribe()
sub1.Unsubscribe()
Expand All @@ -619,7 +624,7 @@ func TestJetStreamSubscribe(t *testing.T) {
expectConsumers(t, 1)

// Make sure we registered as a durable.
info, _ = sub.ConsumerInfo()
info, _ := sub.ConsumerInfo()
if info.Config.Durable != dname {
t.Fatalf("Expected durable name to be set to %q, got %q", dname, info.Config.Durable)
}
Expand Down