From d09392cd48d209f2bc18b6e406f76dde38cc7c36 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Tue, 16 May 2023 19:05:18 +0200 Subject: [PATCH 1/2] Fix flaky test Signed-off-by: Piotr Piotrowski --- js_test.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/js_test.go b/js_test.go index d6cfe4773..92c4a2ee6 100644 --- a/js_test.go +++ b/js_test.go @@ -537,7 +537,7 @@ func TestJetStreamConcurrentQueueDurablePushConsumers(t *testing.T) { } // Now create 10 durables concurrently. - subs := make(chan *Subscription, 10) + subsCh := make(chan *Subscription, 10) var wg sync.WaitGroup for i := 0; i < 10; i++ { @@ -545,12 +545,16 @@ func TestJetStreamConcurrentQueueDurablePushConsumers(t *testing.T) { go func() { defer wg.Done() sub, _ := js.QueueSubscribeSync("foo", "bar") - subs <- sub + subsCh <- sub }() } // Wait for all the consumers. wg.Wait() - close(subs) + close(subsCh) + subs := make([]*Subscription, 0, len(subsCh)) + for sub := range subsCh { + subs = append(subs, sub) + } si, err := js.StreamInfo("TEST") if err != nil { @@ -570,7 +574,7 @@ func TestJetStreamConcurrentQueueDurablePushConsumers(t *testing.T) { got := 0 for time.Now().Before(timeout) { got = 0 - for sub := range subs { + for _, sub := range subs { pending, _, _ := sub.Pending() // If a single sub has the total, then probably something is not right. if pending == total { From 7667996c05a07013263a425814b9e52317ef6044 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Fri, 19 May 2023 12:24:33 +0200 Subject: [PATCH 2/2] Use slice with mutex instead of channel Signed-off-by: Piotr Piotrowski --- js_test.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/js_test.go b/js_test.go index 92c4a2ee6..c73a26156 100644 --- a/js_test.go +++ b/js_test.go @@ -537,24 +537,22 @@ func TestJetStreamConcurrentQueueDurablePushConsumers(t *testing.T) { } // Now create 10 durables concurrently. - subsCh := make(chan *Subscription, 10) + subs := make([]*Subscription, 0, 10) var wg sync.WaitGroup + mx := &sync.Mutex{} for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() sub, _ := js.QueueSubscribeSync("foo", "bar") - subsCh <- sub + mx.Lock() + subs = append(subs, sub) + mx.Unlock() }() } // Wait for all the consumers. wg.Wait() - close(subsCh) - subs := make([]*Subscription, 0, len(subsCh)) - for sub := range subsCh { - subs = append(subs, sub) - } si, err := js.StreamInfo("TEST") if err != nil {