Skip to content

Commit

Permalink
[IMPROVED] Fix few more flaky tests
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed Feb 26, 2024
1 parent 56ae534 commit c3e230f
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 42 deletions.
7 changes: 0 additions & 7 deletions jetstream/test/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,13 +974,6 @@ func TestObjectList(t *testing.T) {
t.Fatalf("Expected %+v but got %+v", expected, omap)
}
})

t.Run("context timeout", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
defer cancel()
_, err := root.List(ctx)
expectErr(t, err, context.DeadlineExceeded)
})
}

func TestObjectMaxBytes(t *testing.T) {
Expand Down
11 changes: 8 additions & 3 deletions jetstream/test/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1269,18 +1269,23 @@ func TestPublishMsgAsyncWithPendingMsgs(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

_, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
_, err = js.CreateStream(ctx, jetstream.StreamConfig{
Name: "foo",
Subjects: []string{"FOO.*"},
// disable stream acks
NoAck: true,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

for i := 0; i < 5; i++ {
_, err = js.PublishAsync("FOO.1", []byte("msg"), jetstream.WithStallWait(1*time.Nanosecond))
_, err = js.PublishAsync("FOO.1", []byte("msg"), jetstream.WithStallWait(10*time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
if _, err = js.PublishAsync("FOO.1", []byte("msg"), jetstream.WithStallWait(1*time.Nanosecond)); err == nil || !errors.Is(err, jetstream.ErrTooManyStalledMsgs) {
if _, err = js.PublishAsync("FOO.1", []byte("msg"), jetstream.WithStallWait(10*time.Millisecond)); err == nil || !errors.Is(err, jetstream.ErrTooManyStalledMsgs) {
t.Fatalf("Expected error: %v; got: %v", jetstream.ErrTooManyStalledMsgs, err)
}
})
Expand Down
26 changes: 17 additions & 9 deletions test/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,20 +220,28 @@ func testContextRequestWithCancel(t *testing.T, nc *nats.Conn) {
cancelCB()
})

nc.Subscribe("slow", func(m *nats.Msg) {
sub1, err := nc.Subscribe("slow", func(m *nats.Msg) {
// simulates latency into the client so that timeout is hit.
time.Sleep(40 * time.Millisecond)
nc.Publish(m.Reply, []byte("OK"))
})
nc.Subscribe("slower", func(m *nats.Msg) {
if err != nil {
t.Fatalf("Expected to be able to subscribe: %s", err)
}
defer sub1.Unsubscribe()
sub2, err := nc.Subscribe("slower", func(m *nats.Msg) {
// we know this request will take longer so extend the timeout
expirationTimer.Reset(100 * time.Millisecond)

// slower reply which would have hit original timeout
time.Sleep(90 * time.Millisecond)
time.Sleep(70 * time.Millisecond)

nc.Publish(m.Reply, []byte("Also OK"))
})
if err != nil {
t.Fatalf("Expected to be able to subscribe: %s", err)
}
defer sub2.Unsubscribe()

for i := 0; i < 2; i++ {
resp, err := nc.RequestWithContext(ctx, "slow", []byte(""))
Expand Down Expand Up @@ -263,7 +271,7 @@ func testContextRequestWithCancel(t *testing.T, nc *nats.Conn) {
}

// One more slow request will expire the timer and cause an error...
_, err := nc.RequestWithContext(ctx, "slow", []byte(""))
_, err = nc.RequestWithContext(ctx, "slow", []byte(""))
if err == nil {
t.Fatal("Expected request with cancellation context to fail")
}
Expand Down Expand Up @@ -1089,12 +1097,12 @@ func TestFlushWithContext(t *testing.T) {
t.Fatalf("Expected '%v', got '%v'", nats.ErrNoDeadlineContext, err)
}

dctx, cancel := context.WithTimeout(ctx, 0)
defer cancel()
dctx, cancel := context.WithTimeout(ctx, 10*time.Second)
cancel()

// A context with a deadline should return when expired.
if err := nc.FlushWithContext(dctx); err != context.DeadlineExceeded {
t.Fatalf("Expected '%v', got '%v'", context.DeadlineExceeded, err)
// A closed context should error.
if err := nc.FlushWithContext(dctx); err != context.Canceled {
t.Fatalf("Expected '%v', got '%v'", context.Canceled, err)
}
}

Expand Down
29 changes: 17 additions & 12 deletions test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,17 +593,22 @@ func TestJetStreamSubscribe(t *testing.T) {

// If we are here we have received all of the messages.
// We hang the ConsumerInfo option off of the subscription, so we use that to check status.
info, _ := sub3.ConsumerInfo()
if info.Config.AckPolicy != nats.AckExplicitPolicy {
t.Fatalf("Expected ack explicit policy, got %q", info.Config.AckPolicy)
}
if info.Delivered.Consumer != uint64(toSend) {
t.Fatalf("Expected to have received all %d messages, got %d", toSend, info.Delivered.Consumer)
}
// Make sure we auto-ack'd
if info.AckFloor.Consumer != uint64(toSend) {
t.Fatalf("Expected to have ack'd all %d messages, got ack floor of %d", toSend, info.AckFloor.Consumer)
}
// We may need to retry this check since the acks sent by the client have to be processed
// on the server.
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
info, _ := sub3.ConsumerInfo()
if info.Config.AckPolicy != nats.AckExplicitPolicy {
t.Fatalf("Expected ack explicit policy, got %q", info.Config.AckPolicy)
}
if info.Delivered.Consumer != uint64(toSend) {
return fmt.Errorf("Expected to have received all %d messages, got %d", toSend, info.Delivered.Consumer)
}
// Make sure we auto-ack'd
if info.AckFloor.Consumer != uint64(toSend) {
return fmt.Errorf("Expected to have ack'd all %d messages, got ack floor of %d", toSend, info.AckFloor.Consumer)
}
return nil
})
sub3.Unsubscribe()
sub2.Unsubscribe()
sub1.Unsubscribe()
Expand All @@ -619,7 +624,7 @@ func TestJetStreamSubscribe(t *testing.T) {
expectConsumers(t, 1)

// Make sure we registered as a durable.
info, _ = sub.ConsumerInfo()
info, _ := sub.ConsumerInfo()
if info.Config.Durable != dname {
t.Fatalf("Expected durable name to be set to %q, got %q", dname, info.Config.Durable)
}
Expand Down
15 changes: 4 additions & 11 deletions test/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,22 +892,15 @@ func TestObjectList(t *testing.T) {
t.Fatalf("Expected 4 total objects, got %d", len(omap))
}
expected := map[string]struct{}{
"A": struct{}{},
"B": struct{}{},
"C": struct{}{},
"b": struct{}{},
"A": {},
"B": {},
"C": {},
"b": {},
}
if !reflect.DeepEqual(omap, expected) {
t.Fatalf("Expected %+v but got %+v", expected, omap)
}
})

t.Run("context timeout", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
defer cancel()
_, err := root.List(nats.Context(ctx))
expectErr(t, err, context.DeadlineExceeded)
})
}

func TestObjectMaxBytes(t *testing.T) {
Expand Down

0 comments on commit c3e230f

Please sign in to comment.