From 5a6cad0ca74dbf074aec1421ce95cb7dabcc82d9 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Mon, 19 Apr 2021 11:28:12 -0700 Subject: [PATCH] js: Fixes to parallel creation of durable consumers Signed-off-by: Waldemar Quevedo --- go_test.mod | 2 +- go_test.sum | 15 +++++ js.go | 11 ++-- test/js_test.go | 167 ++++++++++++++++++++++++++++++++---------------- 4 files changed, 134 insertions(+), 61 deletions(-) diff --git a/go_test.mod b/go_test.mod index f07a6629d..72bfb4bd8 100644 --- a/go_test.mod +++ b/go_test.mod @@ -4,7 +4,7 @@ go 1.15 require ( github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.2.1 + github.com/nats-io/nats-server/v2 v2.2.2-0.20210421232642-f2d3f5fb81d0 github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 diff --git a/go_test.sum b/go_test.sum index 0e1fafb69..9d240804c 100644 --- a/go_test.sum +++ b/go_test.sum @@ -35,6 +35,20 @@ github.com/nats-io/nats-server/v2 v2.1.8-0.20210227190344-51550e242af8/go.mod h1 github.com/nats-io/nats-server/v2 v2.2.1-0.20210330155036-61cbd74e213d/go.mod h1:eKlAaGmSQHZMFQA6x56AaP5/Bl9N3mWF4awyT2TTpzc= github.com/nats-io/nats-server/v2 v2.2.1 h1:QaWKih9qAa1kod7xXy0G1ry0AEUGmDEaptaiqzuO1e8= github.com/nats-io/nats-server/v2 v2.2.1/go.mod h1:A+5EOqdnhH7FvLxtAK6SEDx6hyHriVOwf+FT/eEV99c= +github.com/nats-io/nats-server/v2 v2.2.2-0.20210421001316-7ac0ff667439 h1:wbm+DoCrBx3XUkfgfnzSGKGKXSSnR8z0EzaH8iEsYT4= +github.com/nats-io/nats-server/v2 v2.2.2-0.20210421001316-7ac0ff667439/go.mod h1:A+5EOqdnhH7FvLxtAK6SEDx6hyHriVOwf+FT/eEV99c= +github.com/nats-io/nats-server/v2 v2.2.2-0.20210421031524-a3f66508dd3a h1:Ihh+7S9hHb3zn4nibE9EV8P3Ed7OrH4TlGXHqIUYDfk= +github.com/nats-io/nats-server/v2 v2.2.2-0.20210421031524-a3f66508dd3a/go.mod h1:aF2IwMZdYktJswITm41c/k66uCHjTvpTxGQ7+d4cPeg= +github.com/nats-io/nats-server/v2 v2.2.2-0.20210421135834-a9607573b30c h1:URcPI+y2OIGWM1pKzHhHTvRItB0Czlv3dzuJA0rklvk= +github.com/nats-io/nats-server/v2 v2.2.2-0.20210421135834-a9607573b30c/go.mod h1:aF2IwMZdYktJswITm41c/k66uCHjTvpTxGQ7+d4cPeg= +github.com/nats-io/nats-server/v2 v2.2.2-0.20210421164150-3d928c847a0c h1:cbbxAcABuk2WdXKRm9VezFcGsceRhls4VCmQ/2aRJjQ= +github.com/nats-io/nats-server/v2 v2.2.2-0.20210421164150-3d928c847a0c/go.mod h1:aF2IwMZdYktJswITm41c/k66uCHjTvpTxGQ7+d4cPeg= +github.com/nats-io/nats-server/v2 v2.2.2-0.20210421195432-ea21e86996f7 h1:wcd++VZMdwDpQ7P1VXJ7NpAwtgdlxcjFLZ12Y/pL8Nw= +github.com/nats-io/nats-server/v2 v2.2.2-0.20210421195432-ea21e86996f7/go.mod h1:aF2IwMZdYktJswITm41c/k66uCHjTvpTxGQ7+d4cPeg= +github.com/nats-io/nats-server/v2 v2.2.2-0.20210421215445-a48a39251636 h1:iy6c/tV66xi5DT9WLUu9rJ8uQj8Kf7kmwHAqlYfczP4= +github.com/nats-io/nats-server/v2 v2.2.2-0.20210421215445-a48a39251636/go.mod h1:aF2IwMZdYktJswITm41c/k66uCHjTvpTxGQ7+d4cPeg= +github.com/nats-io/nats-server/v2 v2.2.2-0.20210421232642-f2d3f5fb81d0 h1:e2MoeAShQE/oOSjkkV6J6R+l5ugbfkXI5spxgQykgoM= +github.com/nats-io/nats-server/v2 v2.2.2-0.20210421232642-f2d3f5fb81d0/go.mod h1:aF2IwMZdYktJswITm41c/k66uCHjTvpTxGQ7+d4cPeg= github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I= github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4= @@ -43,6 +57,7 @@ github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a/go.mod h1:Sa3kL github.com/nats-io/nats.go v1.10.1-0.20210211000709-75ded9c77585/go.mod h1:uBWnCKg9luW1g7hgzPxUjHFRI40EuTSX7RCzgnc74Jk= github.com/nats-io/nats.go v1.10.1-0.20210228004050-ed743748acac/go.mod h1:hxFvLNbNmT6UppX5B5Tr/r3g+XSwGjJzFn6mxPNJEHc= github.com/nats-io/nats.go v1.10.1-0.20210330225420-a0b1f60162f8/go.mod h1:Zq9IEHy7zurF0kFbU5aLIknnFI7guh8ijHk+2v+Vf5g= +github.com/nats-io/nats.go v1.10.1-0.20210419223411-20527524c393/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= diff --git a/js.go b/js.go index 626bd819c..4b8bc322a 100644 --- a/js.go +++ b/js.go @@ -1020,31 +1020,30 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync resp, err := js.nc.Request(js.apiSubj(ccSubj), j, js.opts.wait) if err != nil { + sub.Drain() if err == ErrNoResponders { err = ErrJetStreamNotEnabled } - sub.Unsubscribe() return nil, err } - var cinfo consumerResponse err = json.Unmarshal(resp.Data, &cinfo) if err != nil { - sub.Unsubscribe() + sub.Drain() return nil, err } info = cinfo.ConsumerInfo if cinfo.Error != nil { // Remove interest from previous subscribe since it // may have an incorrect delivery subject. - sub.Unsubscribe() + sub.Drain() // Multiple subscribers could compete in creating the first consumer // that will be shared using the same durable name. If this happens, then // do a lookup of the consumer info and resubscribe using the latest info. - if consumer != _EMPTY_ && strings.Contains(cinfo.Error.Description, `consumer already exists`) { + if consumer != _EMPTY_ && (strings.Contains(cinfo.Error.Description, `consumer already exists`) || strings.Contains(cinfo.Error.Description, `consumer name already in use`)) { info, err = js.ConsumerInfo(stream, consumer) - if err != nil && err.Error() != "nats: consumer not found" { + if err != nil { return nil, err } ccfg = &info.Config diff --git a/test/js_test.go b/test/js_test.go index 116d76972..c9e171c37 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -4573,35 +4573,42 @@ func testJetStreamMirror_Source(t *testing.T, nodes ...*jsServer) { } func TestJetStream_ClusterMultipleSubscribe(t *testing.T) { - nodes := []int{1} + nodes := []int{1, 3} + replicas := []int{1} for _, n := range nodes { - t.Run(fmt.Sprintf("sub n=%d", n), func(t *testing.T) { - name := fmt.Sprintf("SUB%d", n) - stream := &nats.StreamConfig{ - Name: name, - Replicas: n, + for _, r := range replicas { + if r > 1 && n == 1 { + continue } - withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultipleSubscribe) - }) - t.Run(fmt.Sprintf("qsub n=%d", n), func(t *testing.T) { - name := fmt.Sprintf("MSUB%d", n) - stream := &nats.StreamConfig{ - Name: name, - Replicas: n, - } - withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultipleQueueSubscribe) - }) + t.Run(fmt.Sprintf("sub n=%d r=%d", n, r), func(t *testing.T) { + name := fmt.Sprintf("SUB%d%d", n, r) + stream := &nats.StreamConfig{ + Name: name, + Replicas: n, + } + withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultipleSubscribe) + }) - t.Run(fmt.Sprintf("psub n=%d", n), func(t *testing.T) { - name := fmt.Sprintf("PSUB%d", n) - stream := &nats.StreamConfig{ - Name: name, - Replicas: n, - } - withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultiplePullSubscribe) - }) + t.Run(fmt.Sprintf("qsub n=%d r=%d", n, r), func(t *testing.T) { + name := fmt.Sprintf("MSUB%d%d", n, r) + stream := &nats.StreamConfig{ + Name: name, + Replicas: r, + } + withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultipleQueueSubscribe) + }) + + t.Run(fmt.Sprintf("psub n=%d r=%d", n, r), func(t *testing.T) { + name := fmt.Sprintf("PSUBN%d%d", n, r) + stream := &nats.StreamConfig{ + Name: name, + Replicas: n, + } + withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultiplePullSubscribe) + }) + } } } @@ -4621,18 +4628,28 @@ func testJetStream_ClusterMultipleSubscribe(t *testing.T, subject string, srvs . t.Fatal(err) } - size := 50 + size := 5 subs := make([]*nats.Subscription, size) - errCh := make(chan error, 1) + errCh := make(chan error, size) for i := 0; i < size; i++ { wg.Add(1) go func(n int) { defer wg.Done() - sub, err := js.SubscribeSync(subject, nats.Durable("shared")) + var sub *nats.Subscription + var err error + for attempt := 0; attempt < 5; attempt++ { + sub, err = js.SubscribeSync(subject, nats.Durable("shared")) + if err != nil { + time.Sleep(1 * time.Second) + continue + } + break + } if err != nil { errCh <- err + } else { + subs[n] = sub } - subs[n] = sub }(i) } @@ -4642,18 +4659,25 @@ func testJetStream_ClusterMultipleSubscribe(t *testing.T, subject string, srvs . done() }() + wg.Wait() for i := 0; i < size*2; i++ { js.Publish(subject, []byte("test")) } - wg.Wait() delivered := 0 - for _, sub := range subs { - _, err := sub.NextMsg(10 * time.Millisecond) - if err != nil { + for i, sub := range subs { + if sub == nil { continue } - delivered++ + for attempt := 0; attempt < 4; attempt++ { + _, err = sub.NextMsg(250 * time.Millisecond) + if err != nil { + t.Logf("%v WARN: Timeout waiting for next message: %v", i, err) + continue + } + delivered++ + break + } } if delivered < 2 { t.Fatalf("Expected more than one subscriber to receive a message, got: %v", delivered) @@ -4663,7 +4687,7 @@ func testJetStream_ClusterMultipleSubscribe(t *testing.T, subject string, srvs . case <-ctx.Done(): case err := <-errCh: if err != nil { - t.Fatalf("Unexpected error with multiple queue subscribers: %v", err) + t.Fatalf("Unexpected error with multiple subscribers: %v", err) } } } @@ -4684,18 +4708,28 @@ func testJetStream_ClusterMultipleQueueSubscribe(t *testing.T, subject string, s t.Fatal(err) } - size := 50 + size := 5 subs := make([]*nats.Subscription, size) - errCh := make(chan error, 1) + errCh := make(chan error, size) for i := 0; i < size; i++ { wg.Add(1) go func(n int) { defer wg.Done() - sub, err := js.QueueSubscribeSync(subject, "wq", nats.Durable("shared")) + var sub *nats.Subscription + var err error + for attempt := 0; attempt < 5; attempt++ { + sub, err = js.QueueSubscribeSync(subject, "wq", nats.Durable("shared")) + if err != nil { + time.Sleep(1 * time.Second) + continue + } + break + } if err != nil { errCh <- err + } else { + subs[n] = sub } - subs[n] = sub }(i) } @@ -4705,18 +4739,26 @@ func testJetStream_ClusterMultipleQueueSubscribe(t *testing.T, subject string, s done() }() + wg.Wait() for i := 0; i < size*2; i++ { js.Publish(subject, []byte("test")) } - wg.Wait() delivered := 0 - for _, sub := range subs { - _, err := sub.NextMsg(10 * time.Millisecond) - if err != nil { + for i, sub := range subs { + if sub == nil { continue } - delivered++ + + for attempt := 0; attempt < 4; attempt++ { + _, err = sub.NextMsg(250 * time.Millisecond) + if err != nil { + t.Logf("%v WARN: Timeout waiting for next message: %v", i, err) + continue + } + delivered++ + break + } } if delivered < 2 { t.Fatalf("Expected more than one subscriber to receive a message, got: %v", delivered) @@ -4747,18 +4789,28 @@ func testJetStream_ClusterMultiplePullSubscribe(t *testing.T, subject string, sr t.Fatal(err) } - size := 50 + size := 5 subs := make([]*nats.Subscription, size) - errCh := make(chan error, 1) + errCh := make(chan error, size) for i := 0; i < size; i++ { wg.Add(1) go func(n int) { - sub, err := js.PullSubscribe(subject, "shared") + defer wg.Done() + var sub *nats.Subscription + var err error + for attempt := 0; attempt < 5; attempt++ { + sub, err = js.PullSubscribe(subject, "shared") + if err != nil { + time.Sleep(1 * time.Second) + continue + } + break + } if err != nil { errCh <- err + } else { + subs[n] = sub } - subs[n] = sub - wg.Done() }(i) } @@ -4768,18 +4820,25 @@ func testJetStream_ClusterMultiplePullSubscribe(t *testing.T, subject string, sr done() }() + wg.Wait() for i := 0; i < size*2; i++ { js.Publish(subject, []byte("test")) } - wg.Wait() delivered := 0 - for _, sub := range subs { - _, err := sub.Fetch(1, nats.MaxWait(100*time.Millisecond)) - if err != nil { + for i, sub := range subs { + if sub == nil { continue } - delivered++ + for attempt := 0; attempt < 4; attempt++ { + _, err := sub.Fetch(1, nats.MaxWait(250*time.Millisecond)) + if err != nil { + t.Logf("%v WARN: Timeout waiting for next message: %v", i, err) + continue + } + delivered++ + break + } } if delivered < 2 { @@ -4790,7 +4849,7 @@ func testJetStream_ClusterMultiplePullSubscribe(t *testing.T, subject string, sr case <-ctx.Done(): case err := <-errCh: if err != nil { - t.Fatalf("Unexpected error with multiple queue subscribers: %v", err) + t.Fatalf("Unexpected error with multiple pull subscribers: %v", err) } } }