Skip to content

Commit

Permalink
Merge 9859354 into 5931be0
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrpio committed Feb 27, 2023
2 parents 5931be0 + 9859354 commit 4d0e944
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 18 deletions.
5 changes: 5 additions & 0 deletions example_test.go
Expand Up @@ -670,6 +670,11 @@ func ExampleSubOpt() {
js.Subscribe("foo", func(msg *nats.Msg) {
fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.Durable("FOO"), nats.ConsumerMemoryStorage())

// Use multiple subject filters.
js.Subscribe("", func(msg *nats.Msg) {
fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.Durable("FOO"), nats.ConsumerFilterSubjects("foo", "bar"), nats.BindStream("test_stream"))
}

func ExampleMaxWait() {
Expand Down
7 changes: 3 additions & 4 deletions go_test.mod
Expand Up @@ -4,18 +4,17 @@ go 1.19

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.9.12-0.20230129183410-a505c88d65e8
github.com/nats-io/nats-server/v2 v2.9.15-0.20230217225355-8fd61291be35
github.com/nats-io/nkeys v0.3.1-0.20221215194120-47c7408e7546
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
)

require (
github.com/klauspost/compress v1.15.11 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.3.1-0.20221227170542-bdf40fa3627b // indirect
go.uber.org/automaxprocs v1.5.1 // indirect
golang.org/x/crypto v0.5.0 // indirect
golang.org/x/sys v0.4.0 // indirect
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect
golang.org/x/time v0.3.0 // indirect
)
16 changes: 6 additions & 10 deletions go_test.sum
Expand Up @@ -9,29 +9,25 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c=
github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw=
github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.3.1-0.20221227170542-bdf40fa3627b h1:exHeHbghpBp1JvdYq7muaKFvJgLD93UDcmoIbFu/9PA=
github.com/nats-io/jwt/v2 v2.3.1-0.20221227170542-bdf40fa3627b/go.mod h1:DYujvzCMZzUuqB3i1Pnpf1YtkuTwhdI84Aah9wRXkK0=
github.com/nats-io/nats-server/v2 v2.9.12-0.20230127214148-11ffd695750a h1:iZE9RvG9JCC0TployL6UsxJ3dbwMKBrJakYfvPMYDRc=
github.com/nats-io/nats-server/v2 v2.9.12-0.20230127214148-11ffd695750a/go.mod h1:ibVHvIWZwqnarh51bnfR3zZWtlL3SjG9X49ocsfFUm4=
github.com/nats-io/nats-server/v2 v2.9.12-0.20230129183410-a505c88d65e8 h1:MnuDEDsBCO0yU2MVA7BMbiob17nDN1TklcqlxJi6py8=
github.com/nats-io/nats-server/v2 v2.9.12-0.20230129183410-a505c88d65e8/go.mod h1:ibVHvIWZwqnarh51bnfR3zZWtlL3SjG9X49ocsfFUm4=
github.com/nats-io/nats-server/v2 v2.9.15-0.20230217225355-8fd61291be35 h1:OZ2NYxMQ9KLboqCYmb3IyMIlul0X1lFyilp577zAjHc=
github.com/nats-io/nats-server/v2 v2.9.15-0.20230217225355-8fd61291be35/go.mod h1:AT/C9XuOPGsozg2dfiS+9vK0Ge4jheffj8uL/kMGPtw=
github.com/nats-io/nkeys v0.3.1-0.20221215194120-47c7408e7546 h1:7ZylVLLiDSFqxJTSibNsO2RjVSXj3QWnDc+zKara2HE=
github.com/nats-io/nkeys v0.3.1-0.20221215194120-47c7408e7546/go.mod h1:JOEZlxMfMnmaLwr+mpmP+RGIYSxLNBFsZykCGaI2PvA=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
go.uber.org/automaxprocs v1.5.1 h1:e1YG66Lrk73dn4qhg8WFSvhF0JuFQF0ERIp4rpuV8Qk=
go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU=
golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE=
golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af h1:Yx9k8YCG3dvF87UAn2tu2HQLf2dt/eR1bXxpLMWeH+Y=
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
Expand Down
11 changes: 11 additions & 0 deletions js.go
Expand Up @@ -1077,6 +1077,7 @@ type ConsumerConfig struct {
MaxDeliver int `json:"max_deliver,omitempty"`
BackOff []time.Duration `json:"backoff,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
FilterSubjects []string `json:"filter_subjects,omitempty"`
ReplayPolicy ReplayPolicy `json:"replay_policy"`
RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec
SampleFrequency string `json:"sample_freq,omitempty"`
Expand Down Expand Up @@ -2486,6 +2487,16 @@ func ConsumerMemoryStorage() SubOpt {
})
}

// ConsumerFilterSubjects can be used to set multiple subject filters on the consumer.
// It has to be used in conjunction with [nats.BindStream] and
// with empty 'subject' parameter.
func ConsumerFilterSubjects(subjects ...string) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.FilterSubjects = subjects
return nil
})
}

func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
Expand Down
5 changes: 5 additions & 0 deletions jserrors.go
Expand Up @@ -62,6 +62,11 @@ var (
// ErrConsumerNameRequired is returned when the provided consumer durable name is empty.
ErrConsumerNameRequired JetStreamError = &jsError{message: "consumer name is required"}

// ErrConsumerMultipleFilterSubjectsNotSupported is returned when the connected nats-server version does not support setting
// multiple filter subjects with filter_subjects field. If this error is returned when executing AddConsumer(), the consumer with invalid
// configuration was already created in the server.
ErrConsumerMultipleFilterSubjectsNotSupported JetStreamError = &jsError{message: "multiple consumer filter subjects not supported by nats-server"}

// ErrConsumerConfigRequired is returned when empty consumer consuguration is supplied to add/update consumer.
ErrConsumerConfigRequired JetStreamError = &jsError{message: "consumer configuration is required"}

Expand Down
11 changes: 10 additions & 1 deletion jsm.go
Expand Up @@ -375,9 +375,13 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o
ccSubj = fmt.Sprintf(apiLegacyConsumerCreateT, stream)
} else if err := checkConsumerName(consumerName); err != nil {
return nil, err
} else if !js.nc.serverMinVersion(2, 9, 0) || (cfg.Durable != "" && js.opts.featureFlags.useDurableConsumerCreate) {
} else if !js.nc.serverMinVersion(2, 9, 0) ||
(cfg.Durable != "" && js.opts.featureFlags.useDurableConsumerCreate) {
// if server version is lower than 2.9.0 or user set the useDurableConsumerCreate flag, use the legacy DURABLE.CREATE endpoint
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, consumerName)
} else if len(cfg.FilterSubjects) != 0 {
// if multiple filter subjects are used, we must use legacy DURABLE.CREATE endpoint
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, consumerName)
} else {
// if above server version 2.9.0, use the endpoints with consumer name
if cfg.FilterSubject == _EMPTY_ || cfg.FilterSubject == ">" {
Expand Down Expand Up @@ -408,6 +412,11 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o
}
return nil, info.Error
}

// check whether multiple filter subjects (if used) are reflected in the returned ConsumerInfo
if len(cfg.FilterSubjects) != 0 && len(info.Config.FilterSubjects) == 0 {
return nil, ErrConsumerMultipleFilterSubjectsNotSupported
}
return info.ConsumerInfo, nil
}

Expand Down
96 changes: 93 additions & 3 deletions test/js_test.go
Expand Up @@ -1731,7 +1731,7 @@ func TestJetStreamManagement(t *testing.T) {
// Create the stream using our client API.
var si *nats.StreamInfo
t.Run("create stream", func(t *testing.T) {
si, err := js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"foo", "bar"}})
si, err := js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"foo", "bar", "baz"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -1955,6 +1955,32 @@ func TestJetStreamManagement(t *testing.T) {
}
})

t.Run("consumer with multiple filter subjects", func(t *testing.T) {
sub, err := nc.SubscribeSync("$JS.API.CONSUMER.DURABLE.CREATE.foo.dlc-5")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()
ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{
Durable: "dlc-5",
AckPolicy: nats.AckExplicitPolicy,
FilterSubjects: []string{"foo", "bar"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
msg, err := sub.NextMsg(1 * time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !strings.Contains(string(msg.Data), `"durable_name":"dlc-5"`) {
t.Fatalf("create consumer message is not correct: %q", string(msg.Data))
}
if ci == nil || ci.Config.Durable != "dlc-5" || !reflect.DeepEqual(ci.Config.FilterSubjects, []string{"foo", "bar"}) {
t.Fatalf("ConsumerInfo is not correct %+v", ci)
}
})

t.Run("with invalid consumer name", func(t *testing.T) {
if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "test.durable"}); err != nats.ErrInvalidConsumerName {
t.Fatalf("Expected: %v; got: %v", nats.ErrInvalidConsumerName, err)
Expand Down Expand Up @@ -2061,7 +2087,7 @@ func TestJetStreamManagement(t *testing.T) {
for info := range js.Consumers("foo") {
infos = append(infos, info)
}
if len(infos) != 6 || infos[0].Stream != "foo" {
if len(infos) != 7 || infos[0].Stream != "foo" {
t.Fatalf("ConsumerInfo is not correct %+v", infos)
}
})
Expand All @@ -2073,7 +2099,7 @@ func TestJetStreamManagement(t *testing.T) {
for name := range js.ConsumerNames("foo", nats.Context(ctx)) {
names = append(names, name)
}
if got, want := len(names), 6; got != want {
if got, want := len(names), 7; got != want {
t.Fatalf("Unexpected names, got=%d, want=%d", got, want)
}
})
Expand Down Expand Up @@ -5230,6 +5256,70 @@ func TestJetStreamSubscribe_RateLimit(t *testing.T) {
}
}

func TestJetStreamSubscribe_FilterSubjects(t *testing.T) {
tests := []struct {
name string
durable string
}{
{
name: "ephemeral consumer",
},
{
name: "durable consumer",
durable: "cons",
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

var err error

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar", "baz"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
for i := 0; i < 5; i++ {
js.Publish("foo", []byte("msg"))
}
for i := 0; i < 5; i++ {
js.Publish("bar", []byte("msg"))
}
for i := 0; i < 5; i++ {
js.Publish("baz", []byte("msg"))
}

opts := []nats.SubOpt{nats.BindStream("TEST"), nats.ConsumerFilterSubjects("foo", "baz")}
if test.durable != "" {
opts = append(opts, nats.Durable(test.durable))
}
sub, err := js.SubscribeSync("", opts...)
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}

for i := 0; i < 10; i++ {
msg, err := sub.NextMsg(500 * time.Millisecond)
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
if msg.Subject != "foo" && msg.Subject != "baz" {
t.Fatalf("Unexpected message subject: %s", msg.Subject)
}
}
})
}

}

func TestJetStreamSubscribe_ConfigCantChange(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)
Expand Down

0 comments on commit 4d0e944

Please sign in to comment.