From 3f37c976078d4634f2dacd90a011110ffdbba75e Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Mon, 20 Feb 2023 11:26:48 +0100 Subject: [PATCH 1/4] [ADDED] Support for multiple subject filters on consumers --- example_test.go | 5 ++++ go_test.mod | 7 ++--- go_test.sum | 16 ++++------- js.go | 11 +++++++ jserrors.go | 4 +++ jsm.go | 11 ++++++- test/js_test.go | 76 +++++++++++++++++++++++++++++++++++++++++++++++-- 7 files changed, 112 insertions(+), 18 deletions(-) diff --git a/example_test.go b/example_test.go index f8930dbaa..c3fa23c2a 100644 --- a/example_test.go +++ b/example_test.go @@ -670,6 +670,11 @@ func ExampleSubOpt() { js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.Durable("FOO"), nats.ConsumerMemoryStorage()) + + // Use multiple subject filters. + js.Subscribe("", func(msg *nats.Msg) { + fmt.Printf("Received a message: %s\n", string(msg.Data)) + }, nats.Durable("FOO"), nats.ConsumerFilterSubjects("foo", "bar"), nats.BindStream("test_stream")) } func ExampleMaxWait() { diff --git a/go_test.mod b/go_test.mod index 67396368d..d42ec0ebe 100644 --- a/go_test.mod +++ b/go_test.mod @@ -4,18 +4,17 @@ go 1.19 require ( github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.9.12-0.20230129183410-a505c88d65e8 + github.com/nats-io/nats-server/v2 v2.9.15-0.20230217225355-8fd61291be35 github.com/nats-io/nkeys v0.3.1-0.20221215194120-47c7408e7546 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 ) require ( - github.com/klauspost/compress v1.15.11 // indirect + github.com/klauspost/compress v1.15.15 // indirect github.com/minio/highwayhash v1.0.2 // indirect github.com/nats-io/jwt/v2 v2.3.1-0.20221227170542-bdf40fa3627b // indirect - go.uber.org/automaxprocs v1.5.1 // indirect golang.org/x/crypto v0.5.0 // indirect golang.org/x/sys v0.4.0 // indirect - golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect + golang.org/x/time v0.3.0 // indirect ) diff --git a/go_test.sum b/go_test.sum index 161953aed..961beae04 100644 --- a/go_test.sum +++ b/go_test.sum @@ -9,29 +9,25 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c= -github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= +github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw= +github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.3.1-0.20221227170542-bdf40fa3627b h1:exHeHbghpBp1JvdYq7muaKFvJgLD93UDcmoIbFu/9PA= github.com/nats-io/jwt/v2 v2.3.1-0.20221227170542-bdf40fa3627b/go.mod h1:DYujvzCMZzUuqB3i1Pnpf1YtkuTwhdI84Aah9wRXkK0= -github.com/nats-io/nats-server/v2 v2.9.12-0.20230127214148-11ffd695750a h1:iZE9RvG9JCC0TployL6UsxJ3dbwMKBrJakYfvPMYDRc= -github.com/nats-io/nats-server/v2 v2.9.12-0.20230127214148-11ffd695750a/go.mod h1:ibVHvIWZwqnarh51bnfR3zZWtlL3SjG9X49ocsfFUm4= -github.com/nats-io/nats-server/v2 v2.9.12-0.20230129183410-a505c88d65e8 h1:MnuDEDsBCO0yU2MVA7BMbiob17nDN1TklcqlxJi6py8= -github.com/nats-io/nats-server/v2 v2.9.12-0.20230129183410-a505c88d65e8/go.mod h1:ibVHvIWZwqnarh51bnfR3zZWtlL3SjG9X49ocsfFUm4= +github.com/nats-io/nats-server/v2 v2.9.15-0.20230217225355-8fd61291be35 h1:OZ2NYxMQ9KLboqCYmb3IyMIlul0X1lFyilp577zAjHc= +github.com/nats-io/nats-server/v2 v2.9.15-0.20230217225355-8fd61291be35/go.mod h1:AT/C9XuOPGsozg2dfiS+9vK0Ge4jheffj8uL/kMGPtw= github.com/nats-io/nkeys v0.3.1-0.20221215194120-47c7408e7546 h1:7ZylVLLiDSFqxJTSibNsO2RjVSXj3QWnDc+zKara2HE= github.com/nats-io/nkeys v0.3.1-0.20221215194120-47c7408e7546/go.mod h1:JOEZlxMfMnmaLwr+mpmP+RGIYSxLNBFsZykCGaI2PvA= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -go.uber.org/automaxprocs v1.5.1 h1:e1YG66Lrk73dn4qhg8WFSvhF0JuFQF0ERIp4rpuV8Qk= -go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU= golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE= golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/time v0.0.0-20220922220347-f3bd1da661af h1:Yx9k8YCG3dvF87UAn2tu2HQLf2dt/eR1bXxpLMWeH+Y= -golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/js.go b/js.go index e82de7fe9..3d63a8920 100644 --- a/js.go +++ b/js.go @@ -1077,6 +1077,7 @@ type ConsumerConfig struct { MaxDeliver int `json:"max_deliver,omitempty"` BackOff []time.Duration `json:"backoff,omitempty"` FilterSubject string `json:"filter_subject,omitempty"` + FilterSubjects []string `json:"filter_subjects,omitempty"` ReplayPolicy ReplayPolicy `json:"replay_policy"` RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec SampleFrequency string `json:"sample_freq,omitempty"` @@ -2481,6 +2482,16 @@ func ConsumerMemoryStorage() SubOpt { }) } +// ConsumerFilterSubjects can be used to set multiple subject filters on the consumer. +// It has to be used in conjunction with [nats.BindStream] and +// with empty 'subject' parameter. +func ConsumerFilterSubjects(subjects ...string) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.FilterSubjects = subjects + return nil + }) +} + func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. diff --git a/jserrors.go b/jserrors.go index 4fe5bc930..2389efa01 100644 --- a/jserrors.go +++ b/jserrors.go @@ -62,6 +62,10 @@ var ( // ErrConsumerNameRequired is returned when the provided consumer durable name is empty. ErrConsumerNameRequired JetStreamError = &jsError{message: "consumer name is required"} + // ErrConsumerMultipleFilterSubjectsNotSupported is returned when the connected nats-server version does not support setting + // multiple filter subjects with filter_subjects field. + ErrConsumerMultipleFilterSubjectsNotSupported JetStreamError = &jsError{message: "multiple consumer filter subjects not supported by nats-server"} + // ErrConsumerConfigRequired is returned when empty consumer consuguration is supplied to add/update consumer. ErrConsumerConfigRequired JetStreamError = &jsError{message: "consumer configuration is required"} diff --git a/jsm.go b/jsm.go index c02e757a7..86f8e0564 100644 --- a/jsm.go +++ b/jsm.go @@ -375,9 +375,13 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o ccSubj = fmt.Sprintf(apiLegacyConsumerCreateT, stream) } else if err := checkConsumerName(consumerName); err != nil { return nil, err - } else if !js.nc.serverMinVersion(2, 9, 0) || (cfg.Durable != "" && js.opts.featureFlags.useDurableConsumerCreate) { + } else if !js.nc.serverMinVersion(2, 9, 0) || + (cfg.Durable != "" && js.opts.featureFlags.useDurableConsumerCreate) { // if server version is lower than 2.9.0 or user set the useDurableConsumerCreate flag, use the legacy DURABLE.CREATE endpoint ccSubj = fmt.Sprintf(apiDurableCreateT, stream, consumerName) + } else if cfg.FilterSubjects != nil { + // if multiple filter subjects are used, we must use legacy DURABLE.CREATE endpoint + ccSubj = fmt.Sprintf(apiDurableCreateT, stream, consumerName) } else { // if above server version 2.9.0, use the endpoints with consumer name if cfg.FilterSubject == _EMPTY_ || cfg.FilterSubject == ">" { @@ -408,6 +412,11 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o } return nil, info.Error } + + // check whether multiple filter subjects (if used) are reflected in the returned ConsumerInfo + if cfg.FilterSubjects != nil && info.Config.FilterSubjects == nil { + return nil, ErrConsumerMultipleFilterSubjectsNotSupported + } return info.ConsumerInfo, nil } diff --git a/test/js_test.go b/test/js_test.go index 879b8bc09..f392a161f 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -1731,7 +1731,7 @@ func TestJetStreamManagement(t *testing.T) { // Create the stream using our client API. var si *nats.StreamInfo t.Run("create stream", func(t *testing.T) { - si, err := js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"foo", "bar"}}) + si, err := js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"foo", "bar", "baz"}}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1952,6 +1952,32 @@ func TestJetStreamManagement(t *testing.T) { } }) + t.Run("consumer with multiple filter subjects", func(t *testing.T) { + sub, err := nc.SubscribeSync("$JS.API.CONSUMER.DURABLE.CREATE.foo.dlc-5") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{ + Durable: "dlc-5", + AckPolicy: nats.AckExplicitPolicy, + FilterSubjects: []string{"foo", "bar"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + msg, err := sub.NextMsg(1 * time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !strings.Contains(string(msg.Data), `"durable_name":"dlc-5"`) { + t.Fatalf("create consumer message is not correct: %q", string(msg.Data)) + } + if ci == nil || ci.Config.Durable != "dlc-5" || !reflect.DeepEqual(ci.Config.FilterSubjects, []string{"foo", "bar"}) { + t.Fatalf("ConsumerInfo is not correct %+v", ci) + } + }) + t.Run("with invalid consumer name", func(t *testing.T) { if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "test.durable"}); err != nats.ErrInvalidConsumerName { t.Fatalf("Expected: %v; got: %v", nats.ErrInvalidConsumerName, err) @@ -2055,7 +2081,7 @@ func TestJetStreamManagement(t *testing.T) { for info := range js.Consumers("foo") { infos = append(infos, info) } - if len(infos) != 6 || infos[0].Stream != "foo" { + if len(infos) != 7 || infos[0].Stream != "foo" { t.Fatalf("ConsumerInfo is not correct %+v", infos) } }) @@ -2067,7 +2093,7 @@ func TestJetStreamManagement(t *testing.T) { for name := range js.ConsumerNames("foo", nats.Context(ctx)) { names = append(names, name) } - if got, want := len(names), 6; got != want { + if got, want := len(names), 7; got != want { t.Fatalf("Unexpected names, got=%d, want=%d", got, want) } }) @@ -5224,6 +5250,50 @@ func TestJetStreamSubscribe_RateLimit(t *testing.T) { } } +func TestJetStreamSubscribe_FilterSubjects(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + var err error + + // Create the stream using our client API. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar", "baz"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + for i := 0; i < 5; i++ { + js.Publish("foo", []byte("msg")) + } + for i := 0; i < 5; i++ { + js.Publish("bar", []byte("msg")) + } + for i := 0; i < 5; i++ { + js.Publish("baz", []byte("msg")) + } + + sub, err := js.SubscribeSync("", nats.BindStream("TEST"), nats.ConsumerFilterSubjects("foo", "baz")) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + + for i := 0; i < 10; i++ { + msg, err := sub.NextMsg(500 * time.Millisecond) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + if msg.Subject != "foo" && msg.Subject != "baz" { + t.Fatalf("Unexpected message subject: %s", msg.Subject) + } + } + +} + func TestJetStreamSubscribe_ConfigCantChange(t *testing.T) { s := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, s) From d09f110f6de542caa40192192a51930931e8e29b Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Fri, 24 Feb 2023 15:20:50 +0100 Subject: [PATCH 2/4] Check for empty filter subjects using len() --- jsm.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jsm.go b/jsm.go index 86f8e0564..aea3a860b 100644 --- a/jsm.go +++ b/jsm.go @@ -379,7 +379,7 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o (cfg.Durable != "" && js.opts.featureFlags.useDurableConsumerCreate) { // if server version is lower than 2.9.0 or user set the useDurableConsumerCreate flag, use the legacy DURABLE.CREATE endpoint ccSubj = fmt.Sprintf(apiDurableCreateT, stream, consumerName) - } else if cfg.FilterSubjects != nil { + } else if len(cfg.FilterSubjects) != 0 { // if multiple filter subjects are used, we must use legacy DURABLE.CREATE endpoint ccSubj = fmt.Sprintf(apiDurableCreateT, stream, consumerName) } else { @@ -414,7 +414,7 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o } // check whether multiple filter subjects (if used) are reflected in the returned ConsumerInfo - if cfg.FilterSubjects != nil && info.Config.FilterSubjects == nil { + if len(cfg.FilterSubjects) != 0 && len(info.Config.FilterSubjects) == 0 { return nil, ErrConsumerMultipleFilterSubjectsNotSupported } return info.ConsumerInfo, nil From 41dda201f482744588027b009c1c9f3b5a299172 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Mon, 27 Feb 2023 11:14:48 +0100 Subject: [PATCH 3/4] Fix ErrConsumerMultipleFilterSubjectsNotSupported comment --- jserrors.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/jserrors.go b/jserrors.go index 2389efa01..ed9a83780 100644 --- a/jserrors.go +++ b/jserrors.go @@ -63,7 +63,8 @@ var ( ErrConsumerNameRequired JetStreamError = &jsError{message: "consumer name is required"} // ErrConsumerMultipleFilterSubjectsNotSupported is returned when the connected nats-server version does not support setting - // multiple filter subjects with filter_subjects field. + // multiple filter subjects with filter_subjects field. If this error is returned when executing AddConsumer(), the consumer with invalid + // configuration was already created in the server. ErrConsumerMultipleFilterSubjectsNotSupported JetStreamError = &jsError{message: "multiple consumer filter subjects not supported by nats-server"} // ErrConsumerConfigRequired is returned when empty consumer consuguration is supplied to add/update consumer. From 98593546827da3654dc27c86619ab9d0cbe17b4f Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Mon, 27 Feb 2023 11:37:00 +0100 Subject: [PATCH 4/4] Add test for durable consumer --- test/js_test.go | 88 ++++++++++++++++++++++++++++++------------------- 1 file changed, 54 insertions(+), 34 deletions(-) diff --git a/test/js_test.go b/test/js_test.go index f392a161f..a7450eb09 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -5251,45 +5251,65 @@ func TestJetStreamSubscribe_RateLimit(t *testing.T) { } func TestJetStreamSubscribe_FilterSubjects(t *testing.T) { - s := RunBasicJetStreamServer() - defer shutdownJSServerAndRemoveStorage(t, s) + tests := []struct { + name string + durable string + }{ + { + name: "ephemeral consumer", + }, + { + name: "durable consumer", + durable: "cons", + }, + } - nc, js := jsClient(t, s) - defer nc.Close() + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) - var err error + nc, js := jsClient(t, s) + defer nc.Close() - // Create the stream using our client API. - _, err = js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo", "bar", "baz"}, - }) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - for i := 0; i < 5; i++ { - js.Publish("foo", []byte("msg")) - } - for i := 0; i < 5; i++ { - js.Publish("bar", []byte("msg")) - } - for i := 0; i < 5; i++ { - js.Publish("baz", []byte("msg")) - } + var err error - sub, err := js.SubscribeSync("", nats.BindStream("TEST"), nats.ConsumerFilterSubjects("foo", "baz")) - if err != nil { - t.Fatalf("Unexpected error: %s", err) - } + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar", "baz"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + for i := 0; i < 5; i++ { + js.Publish("foo", []byte("msg")) + } + for i := 0; i < 5; i++ { + js.Publish("bar", []byte("msg")) + } + for i := 0; i < 5; i++ { + js.Publish("baz", []byte("msg")) + } - for i := 0; i < 10; i++ { - msg, err := sub.NextMsg(500 * time.Millisecond) - if err != nil { - t.Fatalf("Unexpected error: %s", err) - } - if msg.Subject != "foo" && msg.Subject != "baz" { - t.Fatalf("Unexpected message subject: %s", msg.Subject) - } + opts := []nats.SubOpt{nats.BindStream("TEST"), nats.ConsumerFilterSubjects("foo", "baz")} + if test.durable != "" { + opts = append(opts, nats.Durable(test.durable)) + } + sub, err := js.SubscribeSync("", opts...) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + + for i := 0; i < 10; i++ { + msg, err := sub.NextMsg(500 * time.Millisecond) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + if msg.Subject != "foo" && msg.Subject != "baz" { + t.Fatalf("Unexpected message subject: %s", msg.Subject) + } + } + }) } }