Skip to content

Commit

Permalink
Merge d448bd9 into 8241797
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrpio committed Dec 5, 2022
2 parents 8241797 + d448bd9 commit 5164bf8
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 12 deletions.
2 changes: 1 addition & 1 deletion test/basic_test.go
Expand Up @@ -56,7 +56,7 @@ func getStableNumGoroutine(t *testing.T) int {

func checkNoGoroutineLeak(t *testing.T, base int, action string) {
t.Helper()
waitFor(t, 2*time.Second, 100*time.Millisecond, func() error {
waitFor(t, 10*time.Second, 100*time.Millisecond, func() error {
delta := (runtime.NumGoroutine() - base)
if delta > 0 {
return fmt.Errorf("%d Go routines still exist after %s", delta, action)
Expand Down
6 changes: 3 additions & 3 deletions test/drain_test.go
Expand Up @@ -177,7 +177,7 @@ func TestDrainUnSubs(t *testing.T) {
subs[i].Drain()
}
// Should happen quickly that we get to zero, so do not need to wait long.
waitFor(t, 2*time.Second, 10*time.Millisecond, func() error {
waitFor(t, 5*time.Second, 10*time.Millisecond, func() error {
if numSubs := nc.NumSubscriptions(); numSubs != 0 {
return fmt.Errorf("Expected no subscriptions, got %d", numSubs)
}
Expand Down Expand Up @@ -214,7 +214,7 @@ func TestDrainSlowSubscriber(t *testing.T) {
sub.Drain()

// Should take a second or so to drain away.
waitFor(t, 2*time.Second, 100*time.Millisecond, func() error {
waitFor(t, 5*time.Second, 100*time.Millisecond, func() error {
// Wait for it to become invalid. Once drained it is unsubscribed.
_, _, err := sub.Pending()
if err != nats.ErrBadSubscription {
Expand Down Expand Up @@ -467,7 +467,7 @@ func TestDrainConnDuringReconnect(t *testing.T) {
// Shutdown the server.
s.Shutdown()

waitFor(t, time.Second, 10*time.Millisecond, func() error {
waitFor(t, 5*time.Second, 10*time.Millisecond, func() error {
if nc.IsReconnecting() {
return nil
}
Expand Down
25 changes: 20 additions & 5 deletions test/object_test.go
Expand Up @@ -827,11 +827,26 @@ func TestObjectList(t *testing.T) {
}
})

t.Run("context timeout", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
defer cancel()
_, err := root.List(nats.Context(ctx))
expectErr(t, err, context.DeadlineExceeded)
t.Run("context canceled", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

errs := make(chan error)
go func() {
for {
_, err := root.List(nats.Context(ctx))
if err != nil {
errs <- err
}
}
}()
time.Sleep(10 * time.Millisecond)
cancel()
select {
case err := <-errs:
expectErr(t, err, context.Canceled)
case <-time.After(5 * time.Second):
t.Fatal("Expected error, got none")
}
})
}

Expand Down
6 changes: 3 additions & 3 deletions test/sub_test.go
Expand Up @@ -310,7 +310,7 @@ func TestAutoUnsubscribeFromCallback(t *testing.T) {
nc.Publish("foo", msg)
nc.Flush()

waitFor(t, time.Second, 100*time.Millisecond, func() error {
waitFor(t, 5*time.Second, 100*time.Millisecond, func() error {
recv := atomic.LoadInt64(&received)
if recv != resetUnsubMark {
return fmt.Errorf("Wrong number of received messages. Original max was %v reset to %v, actual received: %v",
Expand Down Expand Up @@ -345,7 +345,7 @@ func TestAutoUnsubscribeFromCallback(t *testing.T) {
nc.Publish("foo", msg)
nc.Flush()

waitFor(t, time.Second, 100*time.Millisecond, func() error {
waitFor(t, 5*time.Second, 100*time.Millisecond, func() error {
recv := atomic.LoadInt64(&received)
if recv != newMax {
return fmt.Errorf("Wrong number of received messages. Original max was %v reset to %v, actual received: %v",
Expand Down Expand Up @@ -1193,7 +1193,7 @@ func TestAsyncSubscriptionPendingDrain(t *testing.T) {
nc.Flush()

// Wait for all delivered.
waitFor(t, 2*time.Second, 15*time.Millisecond, func() error {
waitFor(t, 5*time.Second, 15*time.Millisecond, func() error {
if d, _ := sub.Delivered(); d != int64(total) {
return fmt.Errorf("Wrong delivered count: %v vs %v", d, total)
}
Expand Down

0 comments on commit 5164bf8

Please sign in to comment.