Skip to content

Commit

Permalink
Merge pull request #716 from nats-io/qsub-race-fixes
Browse files Browse the repository at this point in the history
js: Fixes to parallel creation of durable consumers
  • Loading branch information
wallyqs committed Apr 22, 2021
2 parents 2339257 + 5a6cad0 commit 6cc31aa
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 61 deletions.
2 changes: 1 addition & 1 deletion go_test.mod
Expand Up @@ -4,7 +4,7 @@ go 1.15

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.2.1
github.com/nats-io/nats-server/v2 v2.2.2-0.20210421232642-f2d3f5fb81d0
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
15 changes: 15 additions & 0 deletions go_test.sum
Expand Up @@ -35,6 +35,20 @@ github.com/nats-io/nats-server/v2 v2.1.8-0.20210227190344-51550e242af8/go.mod h1
github.com/nats-io/nats-server/v2 v2.2.1-0.20210330155036-61cbd74e213d/go.mod h1:eKlAaGmSQHZMFQA6x56AaP5/Bl9N3mWF4awyT2TTpzc=
github.com/nats-io/nats-server/v2 v2.2.1 h1:QaWKih9qAa1kod7xXy0G1ry0AEUGmDEaptaiqzuO1e8=
github.com/nats-io/nats-server/v2 v2.2.1/go.mod h1:A+5EOqdnhH7FvLxtAK6SEDx6hyHriVOwf+FT/eEV99c=
github.com/nats-io/nats-server/v2 v2.2.2-0.20210421001316-7ac0ff667439 h1:wbm+DoCrBx3XUkfgfnzSGKGKXSSnR8z0EzaH8iEsYT4=
github.com/nats-io/nats-server/v2 v2.2.2-0.20210421001316-7ac0ff667439/go.mod h1:A+5EOqdnhH7FvLxtAK6SEDx6hyHriVOwf+FT/eEV99c=
github.com/nats-io/nats-server/v2 v2.2.2-0.20210421031524-a3f66508dd3a h1:Ihh+7S9hHb3zn4nibE9EV8P3Ed7OrH4TlGXHqIUYDfk=
github.com/nats-io/nats-server/v2 v2.2.2-0.20210421031524-a3f66508dd3a/go.mod h1:aF2IwMZdYktJswITm41c/k66uCHjTvpTxGQ7+d4cPeg=
github.com/nats-io/nats-server/v2 v2.2.2-0.20210421135834-a9607573b30c h1:URcPI+y2OIGWM1pKzHhHTvRItB0Czlv3dzuJA0rklvk=
github.com/nats-io/nats-server/v2 v2.2.2-0.20210421135834-a9607573b30c/go.mod h1:aF2IwMZdYktJswITm41c/k66uCHjTvpTxGQ7+d4cPeg=
github.com/nats-io/nats-server/v2 v2.2.2-0.20210421164150-3d928c847a0c h1:cbbxAcABuk2WdXKRm9VezFcGsceRhls4VCmQ/2aRJjQ=
github.com/nats-io/nats-server/v2 v2.2.2-0.20210421164150-3d928c847a0c/go.mod h1:aF2IwMZdYktJswITm41c/k66uCHjTvpTxGQ7+d4cPeg=
github.com/nats-io/nats-server/v2 v2.2.2-0.20210421195432-ea21e86996f7 h1:wcd++VZMdwDpQ7P1VXJ7NpAwtgdlxcjFLZ12Y/pL8Nw=
github.com/nats-io/nats-server/v2 v2.2.2-0.20210421195432-ea21e86996f7/go.mod h1:aF2IwMZdYktJswITm41c/k66uCHjTvpTxGQ7+d4cPeg=
github.com/nats-io/nats-server/v2 v2.2.2-0.20210421215445-a48a39251636 h1:iy6c/tV66xi5DT9WLUu9rJ8uQj8Kf7kmwHAqlYfczP4=
github.com/nats-io/nats-server/v2 v2.2.2-0.20210421215445-a48a39251636/go.mod h1:aF2IwMZdYktJswITm41c/k66uCHjTvpTxGQ7+d4cPeg=
github.com/nats-io/nats-server/v2 v2.2.2-0.20210421232642-f2d3f5fb81d0 h1:e2MoeAShQE/oOSjkkV6J6R+l5ugbfkXI5spxgQykgoM=
github.com/nats-io/nats-server/v2 v2.2.2-0.20210421232642-f2d3f5fb81d0/go.mod h1:aF2IwMZdYktJswITm41c/k66uCHjTvpTxGQ7+d4cPeg=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I=
github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4=
Expand All @@ -43,6 +57,7 @@ github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a/go.mod h1:Sa3kL
github.com/nats-io/nats.go v1.10.1-0.20210211000709-75ded9c77585/go.mod h1:uBWnCKg9luW1g7hgzPxUjHFRI40EuTSX7RCzgnc74Jk=
github.com/nats-io/nats.go v1.10.1-0.20210228004050-ed743748acac/go.mod h1:hxFvLNbNmT6UppX5B5Tr/r3g+XSwGjJzFn6mxPNJEHc=
github.com/nats-io/nats.go v1.10.1-0.20210330225420-a0b1f60162f8/go.mod h1:Zq9IEHy7zurF0kFbU5aLIknnFI7guh8ijHk+2v+Vf5g=
github.com/nats-io/nats.go v1.10.1-0.20210419223411-20527524c393/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
Expand Down
11 changes: 5 additions & 6 deletions js.go
Expand Up @@ -1020,31 +1020,30 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync

resp, err := js.nc.Request(js.apiSubj(ccSubj), j, js.opts.wait)
if err != nil {
sub.Drain()
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
}
sub.Unsubscribe()
return nil, err
}

var cinfo consumerResponse
err = json.Unmarshal(resp.Data, &cinfo)
if err != nil {
sub.Unsubscribe()
sub.Drain()
return nil, err
}
info = cinfo.ConsumerInfo
if cinfo.Error != nil {
// Remove interest from previous subscribe since it
// may have an incorrect delivery subject.
sub.Unsubscribe()
sub.Drain()

// Multiple subscribers could compete in creating the first consumer
// that will be shared using the same durable name. If this happens, then
// do a lookup of the consumer info and resubscribe using the latest info.
if consumer != _EMPTY_ && strings.Contains(cinfo.Error.Description, `consumer already exists`) {
if consumer != _EMPTY_ && (strings.Contains(cinfo.Error.Description, `consumer already exists`) || strings.Contains(cinfo.Error.Description, `consumer name already in use`)) {
info, err = js.ConsumerInfo(stream, consumer)
if err != nil && err.Error() != "nats: consumer not found" {
if err != nil {
return nil, err
}
ccfg = &info.Config
Expand Down
167 changes: 113 additions & 54 deletions test/js_test.go
Expand Up @@ -4587,35 +4587,42 @@ func testJetStreamMirror_Source(t *testing.T, nodes ...*jsServer) {
}

func TestJetStream_ClusterMultipleSubscribe(t *testing.T) {
nodes := []int{1}
nodes := []int{1, 3}
replicas := []int{1}

for _, n := range nodes {
t.Run(fmt.Sprintf("sub n=%d", n), func(t *testing.T) {
name := fmt.Sprintf("SUB%d", n)
stream := &nats.StreamConfig{
Name: name,
Replicas: n,
for _, r := range replicas {
if r > 1 && n == 1 {
continue
}
withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultipleSubscribe)
})

t.Run(fmt.Sprintf("qsub n=%d", n), func(t *testing.T) {
name := fmt.Sprintf("MSUB%d", n)
stream := &nats.StreamConfig{
Name: name,
Replicas: n,
}
withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultipleQueueSubscribe)
})
t.Run(fmt.Sprintf("sub n=%d r=%d", n, r), func(t *testing.T) {
name := fmt.Sprintf("SUB%d%d", n, r)
stream := &nats.StreamConfig{
Name: name,
Replicas: n,
}
withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultipleSubscribe)
})

t.Run(fmt.Sprintf("psub n=%d", n), func(t *testing.T) {
name := fmt.Sprintf("PSUB%d", n)
stream := &nats.StreamConfig{
Name: name,
Replicas: n,
}
withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultiplePullSubscribe)
})
t.Run(fmt.Sprintf("qsub n=%d r=%d", n, r), func(t *testing.T) {
name := fmt.Sprintf("MSUB%d%d", n, r)
stream := &nats.StreamConfig{
Name: name,
Replicas: r,
}
withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultipleQueueSubscribe)
})

t.Run(fmt.Sprintf("psub n=%d r=%d", n, r), func(t *testing.T) {
name := fmt.Sprintf("PSUBN%d%d", n, r)
stream := &nats.StreamConfig{
Name: name,
Replicas: n,
}
withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultiplePullSubscribe)
})
}
}
}

Expand All @@ -4635,18 +4642,28 @@ func testJetStream_ClusterMultipleSubscribe(t *testing.T, subject string, srvs .
t.Fatal(err)
}

size := 50
size := 5
subs := make([]*nats.Subscription, size)
errCh := make(chan error, 1)
errCh := make(chan error, size)
for i := 0; i < size; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
sub, err := js.SubscribeSync(subject, nats.Durable("shared"))
var sub *nats.Subscription
var err error
for attempt := 0; attempt < 5; attempt++ {
sub, err = js.SubscribeSync(subject, nats.Durable("shared"))
if err != nil {
time.Sleep(1 * time.Second)
continue
}
break
}
if err != nil {
errCh <- err
} else {
subs[n] = sub
}
subs[n] = sub
}(i)
}

Expand All @@ -4656,18 +4673,25 @@ func testJetStream_ClusterMultipleSubscribe(t *testing.T, subject string, srvs .
done()
}()

wg.Wait()
for i := 0; i < size*2; i++ {
js.Publish(subject, []byte("test"))
}
wg.Wait()

delivered := 0
for _, sub := range subs {
_, err := sub.NextMsg(10 * time.Millisecond)
if err != nil {
for i, sub := range subs {
if sub == nil {
continue
}
delivered++
for attempt := 0; attempt < 4; attempt++ {
_, err = sub.NextMsg(250 * time.Millisecond)
if err != nil {
t.Logf("%v WARN: Timeout waiting for next message: %v", i, err)
continue
}
delivered++
break
}
}
if delivered < 2 {
t.Fatalf("Expected more than one subscriber to receive a message, got: %v", delivered)
Expand All @@ -4677,7 +4701,7 @@ func testJetStream_ClusterMultipleSubscribe(t *testing.T, subject string, srvs .
case <-ctx.Done():
case err := <-errCh:
if err != nil {
t.Fatalf("Unexpected error with multiple queue subscribers: %v", err)
t.Fatalf("Unexpected error with multiple subscribers: %v", err)
}
}
}
Expand All @@ -4698,18 +4722,28 @@ func testJetStream_ClusterMultipleQueueSubscribe(t *testing.T, subject string, s
t.Fatal(err)
}

size := 50
size := 5
subs := make([]*nats.Subscription, size)
errCh := make(chan error, 1)
errCh := make(chan error, size)
for i := 0; i < size; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
sub, err := js.QueueSubscribeSync(subject, "wq", nats.Durable("shared"))
var sub *nats.Subscription
var err error
for attempt := 0; attempt < 5; attempt++ {
sub, err = js.QueueSubscribeSync(subject, "wq", nats.Durable("shared"))
if err != nil {
time.Sleep(1 * time.Second)
continue
}
break
}
if err != nil {
errCh <- err
} else {
subs[n] = sub
}
subs[n] = sub
}(i)
}

Expand All @@ -4719,18 +4753,26 @@ func testJetStream_ClusterMultipleQueueSubscribe(t *testing.T, subject string, s
done()
}()

wg.Wait()
for i := 0; i < size*2; i++ {
js.Publish(subject, []byte("test"))
}
wg.Wait()

delivered := 0
for _, sub := range subs {
_, err := sub.NextMsg(10 * time.Millisecond)
if err != nil {
for i, sub := range subs {
if sub == nil {
continue
}
delivered++

for attempt := 0; attempt < 4; attempt++ {
_, err = sub.NextMsg(250 * time.Millisecond)
if err != nil {
t.Logf("%v WARN: Timeout waiting for next message: %v", i, err)
continue
}
delivered++
break
}
}
if delivered < 2 {
t.Fatalf("Expected more than one subscriber to receive a message, got: %v", delivered)
Expand Down Expand Up @@ -4761,18 +4803,28 @@ func testJetStream_ClusterMultiplePullSubscribe(t *testing.T, subject string, sr
t.Fatal(err)
}

size := 50
size := 5
subs := make([]*nats.Subscription, size)
errCh := make(chan error, 1)
errCh := make(chan error, size)
for i := 0; i < size; i++ {
wg.Add(1)
go func(n int) {
sub, err := js.PullSubscribe(subject, "shared")
defer wg.Done()
var sub *nats.Subscription
var err error
for attempt := 0; attempt < 5; attempt++ {
sub, err = js.PullSubscribe(subject, "shared")
if err != nil {
time.Sleep(1 * time.Second)
continue
}
break
}
if err != nil {
errCh <- err
} else {
subs[n] = sub
}
subs[n] = sub
wg.Done()
}(i)
}

Expand All @@ -4782,18 +4834,25 @@ func testJetStream_ClusterMultiplePullSubscribe(t *testing.T, subject string, sr
done()
}()

wg.Wait()
for i := 0; i < size*2; i++ {
js.Publish(subject, []byte("test"))
}
wg.Wait()

delivered := 0
for _, sub := range subs {
_, err := sub.Fetch(1, nats.MaxWait(100*time.Millisecond))
if err != nil {
for i, sub := range subs {
if sub == nil {
continue
}
delivered++
for attempt := 0; attempt < 4; attempt++ {
_, err := sub.Fetch(1, nats.MaxWait(250*time.Millisecond))
if err != nil {
t.Logf("%v WARN: Timeout waiting for next message: %v", i, err)
continue
}
delivered++
break
}
}

if delivered < 2 {
Expand All @@ -4804,7 +4863,7 @@ func testJetStream_ClusterMultiplePullSubscribe(t *testing.T, subject string, sr
case <-ctx.Done():
case err := <-errCh:
if err != nil {
t.Fatalf("Unexpected error with multiple queue subscribers: %v", err)
t.Fatalf("Unexpected error with multiple pull subscribers: %v", err)
}
}
}
Expand Down

0 comments on commit 6cc31aa

Please sign in to comment.