From f6ce0880e0253397988fba1357d4dfd22852a669 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 14 Apr 2022 15:43:08 -0600 Subject: [PATCH 1/2] [FIXED] JetStream: do not automatically set MaxAckPending We were still setting the MaxAckPending value to the capacity of the go channel for synchronous or channel subscriptions. The issue is that the server may have a limit imposed on this value which would prevent creation of synchronous subscriptions without having to create a connection with a lower SyncQueueLen... Signed-off-by: Ivan Kozlovic --- go_test.mod | 8 +-- go_test.sum | 20 ++++--- js.go | 8 ++- js_test.go | 73 +++++++++++++++++++++++++- test/js_test.go | 124 +++----------------------------------------- test/norace_test.go | 2 +- 6 files changed, 97 insertions(+), 138 deletions(-) diff --git a/go_test.mod b/go_test.mod index a9a586e30..fcf31af6b 100644 --- a/go_test.mod +++ b/go_test.mod @@ -4,16 +4,16 @@ go 1.17 require ( github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.7.2 + github.com/nats-io/nats-server/v2 v2.7.5-0.20220415000625-a6b62f61a703 github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 ) require ( - github.com/klauspost/compress v1.13.4 // indirect - github.com/minio/highwayhash v1.0.1 // indirect - github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 // indirect + github.com/klauspost/compress v1.14.4 // indirect + github.com/minio/highwayhash v1.0.2 // indirect + github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a // indirect golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce // indirect golang.org/x/sys v0.0.0-20220111092808-5a964db01320 // indirect golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect diff --git a/go_test.sum b/go_test.sum index f7a98a561..6678eab2d 100644 --- a/go_test.sum +++ b/go_test.sum @@ -5,21 +5,19 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= -github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s= -github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= -github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= -github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= -github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY= -github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= -github.com/nats-io/nats-server/v2 v2.7.2 h1:+LEN8m0+jdCkiGc884WnDuxR+qj80/5arj+szKuRpRI= -github.com/nats-io/nats-server/v2 v2.7.2/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8= -github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4= +github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28gth9P+wV2K/zYUUAkJ+55U8cpS0p5I= +github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= +github.com/nats-io/nats-server/v2 v2.7.5-0.20220415000625-a6b62f61a703 h1:d8siT+8VQ68hfqPqYZvpMrHIihlMVW3gGy+o2hRDCyg= +github.com/nats-io/nats-server/v2 v2.7.5-0.20220415000625-a6b62f61a703/go.mod h1:5vic7C58BFEVltiZhs7Kq81q2WcEPhJPsmNv1FOrdv0= +github.com/nats-io/nats.go v1.14.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/js.go b/js.go index 23480abc7..302f99e89 100644 --- a/js.go +++ b/js.go @@ -1465,12 +1465,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, if cfg.ReplayPolicy == replayPolicyNotSet { cfg.ReplayPolicy = ReplayInstantPolicy } + // Note: Do not set MaxAckPending if not set by the user. The server may + // have limits that are lower than the sync subscription go channel size, + // so don't use the cap size as the "default" MaxAckPending. - // If we have acks at all and the MaxAckPending is not set go ahead - // and set to the internal max for channel based consumers - if cfg.MaxAckPending == 0 && ch != nil && cfg.AckPolicy != AckNonePolicy { - cfg.MaxAckPending = cap(ch) - } // Create request here. ccreq = &createConsumerRequest{ Stream: stream, diff --git a/js_test.go b/js_test.go index 904e1fdef..197816513 100644 --- a/js_test.go +++ b/js_test.go @@ -20,6 +20,7 @@ package nats import ( "crypto/sha256" "encoding/base64" + "encoding/json" "fmt" "io/ioutil" "math/rand" @@ -843,7 +844,7 @@ func TestJetStreamFlowControlStalled(t *testing.T) { } // Publish bunch of messages. - payload := make([]byte, 1024) + payload := make([]byte, 100*1024) for i := 0; i < 250; i++ { nc.Publish("a", payload) } @@ -939,3 +940,73 @@ func TestJetStreamExpiredPullRequests(t *testing.T) { } } } + +func TestJetStreamSyncSubscribeWithMaxAckPending(t *testing.T) { + opts := natsserver.DefaultTestOptions + opts.Port = -1 + opts.JetStream = true + opts.JetStreamLimits.MaxAckPending = 100 + s := natsserver.RunServer(&opts) + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + if _, err := js.AddStream(&StreamConfig{Name: "MAX_ACK_PENDING", Subjects: []string{"foo"}}); err != nil { + t.Fatalf("Error adding stream: %v", err) + } + + // We should be able to create a sync subscription + sub, err := js.SubscribeSync("foo") + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + sub.Unsubscribe() + + // And pull sub + sub, err = js.PullSubscribe("foo", "bar") + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + sub.Unsubscribe() +} + +func TestJetStreamClusterPlacement(t *testing.T) { + // There used to be a test here that would not work because it would require + // all servers in the cluster to know about each other tags. So we will simply + // verify that if a stream is configured with placement and tags, the proper + // "stream create" request is sent. + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + sub, err := nc.SubscribeSync(fmt.Sprintf("$JS.API."+apiStreamCreateT, "TEST")) + if err != nil { + t.Fatalf("Error on sub: %v", err) + } + js.AddStream(&StreamConfig{ + Name: "TEST", + Placement: &Placement{ + Tags: []string{"my_tag"}, + }, + }) + msg, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error getting stream create request: %v", err) + } + var req StreamConfig + if err := json.Unmarshal(msg.Data, &req); err != nil { + t.Fatalf("Unmarshal error: %v", err) + } + if req.Placement == nil { + t.Fatal("Expected placement, did not get it") + } + if n := len(req.Placement.Tags); n != 1 { + t.Fatalf("Expected 1 tag, got %v", n) + } + if v := req.Placement.Tags[0]; v != "my_tag" { + t.Fatalf("Unexpected tag: %q", v) + } +} diff --git a/test/js_test.go b/test/js_test.go index 43554efbb..a7fb03d95 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -2162,7 +2162,7 @@ func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) { checkConsume(t, js2, publishSubj, sourceName, toSend) } -func TestJetStreamAutoMaxAckPending(t *testing.T) { +func TestJetStreamNoAutoMaxAckPending(t *testing.T) { s := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, s) @@ -2176,14 +2176,9 @@ func TestJetStreamAutoMaxAckPending(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - toSend := 10_000 - - msg := []byte("Hello") - for i := 0; i < toSend; i++ { - // Use plain NATS here for speed. - nc.Publish("foo", msg) - } - nc.Flush() + // We no longer use the cap of the sync sub's go channel as the + // default MaxAckPending, so check that it is actually NOT set + // to our value. // Create a consumer. sub, err := js.SubscribeSync("foo") @@ -2191,39 +2186,13 @@ func TestJetStreamAutoMaxAckPending(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } defer sub.Unsubscribe() - expectedMaxAck, _, _ := sub.PendingLimits() ci, err := sub.ConsumerInfo() if err != nil { t.Fatalf("Unexpected error: %v", err) } - if ci.Config.MaxAckPending != expectedMaxAck { - t.Fatalf("Expected MaxAckPending to be set to %d, got %d", expectedMaxAck, ci.Config.MaxAckPending) - } - - waitForPending := func(n int) { - timeout := time.Now().Add(2 * time.Second) - for time.Now().Before(timeout) { - if msgs, _, _ := sub.Pending(); msgs == n { - return - } - time.Sleep(10 * time.Millisecond) - } - msgs, _, _ := sub.Pending() - t.Fatalf("Expected to receive %d messages, but got %d", n, msgs) - } - - waitForPending(expectedMaxAck) - // We do it twice to make sure it does not go over. - waitForPending(expectedMaxAck) - - // Now make sure we can consume them all with no slow consumers etc. - for i := 0; i < toSend; i++ { - m, err := sub.NextMsg(time.Second) - if err != nil { - t.Fatalf("Unexpected error receiving %d: %v", i+1, err) - } - m.Ack() + if ci.Config.MaxAckPending == 500 { + t.Fatalf("Expected MaxAckPending to not be set to the channel's cap, got %d", ci.Config.MaxAckPending) } } @@ -3674,7 +3643,7 @@ func TestJetStreamSubscribe_ConfigCantChange(t *testing.T) { {"default ack wait", nats.AckWait(30 * time.Second)}, {"default replay policy", nats.ReplayInstant()}, {"default max waiting", nats.PullMaxWaiting(512)}, - {"default ack pending", nats.MaxAckPending(65536)}, + {"default ack pending", nats.MaxAckPending(1000)}, } { t.Run(test.name, func(t *testing.T) { durName := nuid.Next() @@ -3683,6 +3652,7 @@ func TestJetStreamSubscribe_ConfigCantChange(t *testing.T) { t.Fatalf("Error on subscribe: %v", err) } // If the option is the same as the server default, it is not an error either. + // This test is dependent on what the server default is, so it may need adjustment. _, err = js.PullSubscribe("foo", durName, test.opt) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -3924,84 +3894,6 @@ func checkSubsPending(t *testing.T, sub *nats.Subscription, numExpected int) { }) } -func TestJetStream_ClusterPlacement(t *testing.T) { - size := 3 - - t.Run("default cluster", func(t *testing.T) { - cluster := "PLC1" - withJSCluster(t, cluster, size, func(t *testing.T, nodes ...*jsServer) { - srvA := nodes[0] - nc, js := jsClient(t, srvA.Server) - defer nc.Close() - - var err error - - stream := &nats.StreamConfig{ - Name: "TEST", - Placement: &nats.Placement{ - Tags: []string{"NODE_0"}, - }, - } - - _, err = js.AddStream(stream) - if err != nil { - t.Errorf("Unexpected error placing stream: %v", err) - } - }) - }) - - t.Run("known cluster", func(t *testing.T) { - cluster := "PLC2" - withJSCluster(t, cluster, size, func(t *testing.T, nodes ...*jsServer) { - srvA := nodes[0] - nc, js := jsClient(t, srvA.Server) - defer nc.Close() - - var err error - - stream := &nats.StreamConfig{ - Name: "TEST", - Placement: &nats.Placement{ - Cluster: cluster, - Tags: []string{"NODE_0"}, - }, - } - - _, err = js.AddStream(stream) - if err != nil { - t.Errorf("Unexpected error placing stream: %v", err) - } - }) - }) - - t.Run("unknown cluster", func(t *testing.T) { - cluster := "PLC3" - withJSCluster(t, cluster, size, func(t *testing.T, nodes ...*jsServer) { - srvA := nodes[0] - nc, js := jsClient(t, srvA.Server) - defer nc.Close() - - var err error - - stream := &nats.StreamConfig{ - Name: "TEST", - Placement: &nats.Placement{ - Cluster: "UNKNOWN", - }, - } - - _, err = js.AddStream(stream) - if err == nil { - t.Error("Unexpected success creating stream in unknown cluster") - } - expected := `insufficient resources` - if err != nil && err.Error() != expected { - t.Errorf("Expected %q error, got: %v", expected, err) - } - }) - }) -} - func TestJetStreamStreamMirror(t *testing.T) { withJSServer(t, testJetStreamMirror_Source) } diff --git a/test/norace_test.go b/test/norace_test.go index c5620852a..20f5bcc03 100644 --- a/test/norace_test.go +++ b/test/norace_test.go @@ -53,7 +53,7 @@ func TestNoRaceObjectContextOpt(t *testing.T) { } // Now put a large object in there. - blob := make([]byte, 8*1024*1024) + blob := make([]byte, 16*1024*1024) rand.Read(blob) _, err = obs.PutBytes("BLOB", blob) expectOk(t, err) From 87ba3d23ea9f3d741d370c3062bea322c3eaaf02 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 14 Apr 2022 18:57:55 -0600 Subject: [PATCH 2/2] Fix some flappers and do not run NoRace tests during code coverage Signed-off-by: Ivan Kozlovic --- norace_test.go | 4 ++-- scripts/cov.sh | 8 +++---- test/js_test.go | 57 ++++++++++++++++++++++++++++++++----------------- 3 files changed, 43 insertions(+), 26 deletions(-) diff --git a/norace_test.go b/norace_test.go index 3a9779a8e..afac46397 100644 --- a/norace_test.go +++ b/norace_test.go @@ -603,7 +603,7 @@ Loop: }) } -func TestJetStreamPushFlowControl_SubscribeAsyncAndChannel(t *testing.T) { +func TestNoRaceJetStreamPushFlowControl_SubscribeAsyncAndChannel(t *testing.T) { s := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, s) @@ -673,7 +673,7 @@ func TestJetStreamPushFlowControl_SubscribeAsyncAndChannel(t *testing.T) { defer sub.Unsubscribe() // Set this lower then normal to make sure we do not exceed bytes pending with FC turned on. - sub.SetPendingLimits(totalMsgs, 1024*1024) // This matches server window for flowcontrol. + sub.SetPendingLimits(totalMsgs, 4*1024*1024) // This matches server window for flowcontrol. info, err := sub.ConsumerInfo() if err != nil { diff --git a/scripts/cov.sh b/scripts/cov.sh index f8f33db75..7ea925358 100755 --- a/scripts/cov.sh +++ b/scripts/cov.sh @@ -3,10 +3,10 @@ rm -rf ./cov mkdir cov -go test -modfile=go_test.mod --failfast -vet=off -v -race -covermode=atomic -coverprofile=./cov/nats.out -go test -modfile=go_test.mod --failfast -vet=off -v -race -covermode=atomic -coverprofile=./cov/test.out -coverpkg=github.com/nats-io/nats.go ./test -go test -modfile=go_test.mod --failfast -vet=off -v -race -covermode=atomic -coverprofile=./cov/builtin.out -coverpkg=github.com/nats-io/nats.go/encoders/builtin ./test -run EncBuiltin -go test -modfile=go_test.mod --failfast -vet=off -v -race -covermode=atomic -coverprofile=./cov/protobuf.out -coverpkg=github.com/nats-io/nats.go/encoders/protobuf ./test -run EncProto +go test -modfile=go_test.mod --failfast -vet=off -v -covermode=atomic -coverprofile=./cov/nats.out -run='Test[^NoRace]' +go test -modfile=go_test.mod --failfast -vet=off -v -covermode=atomic -coverprofile=./cov/test.out -coverpkg=github.com/nats-io/nats.go -run='Test[^NoRace]' ./test +go test -modfile=go_test.mod --failfast -vet=off -v -covermode=atomic -coverprofile=./cov/builtin.out -coverpkg=github.com/nats-io/nats.go/encoders/builtin ./test -run EncBuiltin +go test -modfile=go_test.mod --failfast -vet=off -v -covermode=atomic -coverprofile=./cov/protobuf.out -coverpkg=github.com/nats-io/nats.go/encoders/protobuf ./test -run EncProto gocovmerge ./cov/*.out > acc.out rm -rf ./cov diff --git a/test/js_test.go b/test/js_test.go index a7fb03d95..aff8523e7 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -585,14 +585,17 @@ func TestJetStreamSubscribe(t *testing.T) { // Now go ahead and consume these and ack, but not ack+next. for i := 0; i < batch; i++ { m := bmsgs[i] - err = m.Ack() + err = m.AckSync() if err != nil { t.Fatal(err) } } - if info, _ := sub.ConsumerInfo(); info.AckFloor.Consumer != uint64(batch) { - t.Fatalf("Expected ack floor to be %d, got %d", batch, info.AckFloor.Consumer) - } + checkFor(t, time.Second, 15*time.Millisecond, func() error { + if info, _ := sub.ConsumerInfo(); info.AckFloor.Consumer != uint64(batch) { + return fmt.Errorf("Expected ack floor to be %d, got %d", batch, info.AckFloor.Consumer) + } + return nil + }) waitForPending(t, sub, 0) // Make a request for 10 but should only receive a few. @@ -607,7 +610,7 @@ func TestJetStreamSubscribe(t *testing.T) { } for _, msg := range bmsgs { - msg.Ack() + msg.AckSync() } // Now test attaching to a pull based durable. @@ -2746,6 +2749,7 @@ func TestJetStreamPullSubscribe_AckPending(t *testing.T) { } nextMsg := func() *nats.Msg { + t.Helper() msgs, err := sub.Fetch(1) if err != nil { t.Fatal(err) @@ -2754,6 +2758,7 @@ func TestJetStreamPullSubscribe_AckPending(t *testing.T) { } getPending := func() (int, int) { + t.Helper() info, err := sub.ConsumerInfo() if err != nil { t.Fatal(err) @@ -2762,6 +2767,7 @@ func TestJetStreamPullSubscribe_AckPending(t *testing.T) { } getMetadata := func(msg *nats.Msg) *nats.MsgMetadata { + t.Helper() meta, err := msg.Metadata() if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -2770,16 +2776,23 @@ func TestJetStreamPullSubscribe_AckPending(t *testing.T) { } expectedPending := func(inflight int, pending int) { - i, p := getPending() - if i != inflight || p != pending { - t.Errorf("Unexpected inflight/pending msgs: %v/%v", i, p) - } + t.Helper() + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + i, p := getPending() + if i != inflight || p != pending { + return fmt.Errorf("Unexpected inflight/pending msgs: %v/%v", i, p) + } + return nil + }) } - inflight, pending := getPending() - if inflight != 0 || pending != totalMsgs { - t.Errorf("Unexpected inflight/pending msgs: %v/%v", inflight, pending) - } + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + inflight, pending := getPending() + if inflight != 0 || pending != totalMsgs { + return fmt.Errorf("Unexpected inflight/pending msgs: %v/%v", inflight, pending) + } + return nil + }) // Normal Ack should decrease pending msg := nextMsg() @@ -2908,13 +2921,17 @@ func TestJetStreamPullSubscribe_AckPending(t *testing.T) { } // Get rest of messages. - msgs, err := sub.Fetch(5) - if err != nil { - t.Fatal(err) - } - for _, msg := range msgs { - getMetadata(msg) - msg.Ack() + count := 5 + for count > 0 { + msgs, err := sub.Fetch(count) + if err != nil { + t.Fatal(err) + } + for _, msg := range msgs { + count-- + getMetadata(msg) + msg.Ack() + } } expectedPending(0, 0) }