Skip to content

Commit

Permalink
Add UseLegacyDurableConsumers() option
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrpio committed Sep 16, 2022
1 parent 5c4567f commit 069041d
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 6 deletions.
17 changes: 17 additions & 0 deletions js.go
Expand Up @@ -255,6 +255,9 @@ type jsOpts struct {
directGet bool
// For direct get next message
directNextFor string

// featureFlags are used to enable/disable specific JetStream features
featureFlags featureFlags
}

const (
Expand Down Expand Up @@ -294,6 +297,20 @@ func (opt jsOptFn) configureJSContext(opts *jsOpts) error {
return opt(opts)
}

type featureFlags struct {
useDurableConsumerCreate bool
}

// UseLegacyDurableConsumers makes JetStream use the legacy (pre nats-server v2.9.0) subjects for consumer creation.
// If this option is used when creating JetStremContext, $JS.API.CONSUMER.DURABLE.CREATE.<stream>.<consumer> will be used
// to create a consumer with Durable provided, rather than $JS.API.CONSUMER.CREATE.<stream>.<consumer>.
func UseLegacyDurableConsumers() JSOpt {
return jsOptFn(func(opts *jsOpts) error {
opts.featureFlags.useDurableConsumerCreate = true
return nil
})
}

// ClientTrace can be used to trace API interactions for the JetStream Context.
type ClientTrace struct {
RequestSent func(subj string, payload []byte)
Expand Down
8 changes: 4 additions & 4 deletions jsm.go
Expand Up @@ -320,16 +320,16 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o
ccSubj = fmt.Sprintf(apiLegacyConsumerCreateT, stream)
} else if err := checkConsumerName(consumerName); err != nil {
return nil, err
} else if js.nc.serverMinVersion(2, 9, 0) {
} else if !js.nc.serverMinVersion(2, 9, 0) || (cfg.Durable != "" && js.opts.featureFlags.useDurableConsumerCreate) {
// if server version is lower than 2.9.0 or user set the useDurableConsumerCreate flag, use the legacy DURABLE.CREATE endpoint
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, consumerName)
} else {
// if above server version 2.9.0, use the endpoints with consumer name
if cfg.FilterSubject == _EMPTY_ || cfg.FilterSubject == ">" {
ccSubj = fmt.Sprintf(apiConsumerCreateT, stream, consumerName)
} else {
ccSubj = fmt.Sprintf(apiConsumerCreateWithFilterSubjectT, stream, consumerName, cfg.FilterSubject)
}
} else {
// if consumer name is not empty and the server version is lower than 2.9.0, use the legacy DURABLE.CREATE endpoint
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, consumerName)
}

resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(ccSubj), req)
Expand Down
30 changes: 28 additions & 2 deletions test/js_test.go
Expand Up @@ -1459,6 +1459,32 @@ func TestJetStreamManagement(t *testing.T) {
}
})

t.Run("legacy durable with jetstream context option", func(t *testing.T) {
jsLegacy, err := nc.JetStream(nats.UseLegacyDurableConsumers())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sub, err := nc.SubscribeSync("$JS.API.CONSUMER.DURABLE.CREATE.foo.dlc-4")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()
ci, err := jsLegacy.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dlc-4", AckPolicy: nats.AckExplicitPolicy})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
msg, err := sub.NextMsg(1 * time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !strings.Contains(string(msg.Data), `"durable_name":"dlc-4"`) {
t.Fatalf("create consumer message is not correct: %q", string(msg.Data))
}
if ci == nil || ci.Config.Durable != "dlc-4" || ci.Stream != "foo" {
t.Fatalf("ConsumerInfo is not correct %+v", ci)
}
})

t.Run("with invalid consumer name", func(t *testing.T) {
if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "test.durable"}); err != nats.ErrInvalidConsumerName {
t.Fatalf("Expected: %v; got: %v", nats.ErrInvalidConsumerName, err)
Expand Down Expand Up @@ -1562,7 +1588,7 @@ func TestJetStreamManagement(t *testing.T) {
for info := range js.ConsumersInfo("foo") {
infos = append(infos, info)
}
if len(infos) != 5 || infos[0].Stream != "foo" {
if len(infos) != 6 || infos[0].Stream != "foo" {
t.Fatalf("ConsumerInfo is not correct %+v", infos)
}
})
Expand All @@ -1574,7 +1600,7 @@ func TestJetStreamManagement(t *testing.T) {
for name := range js.ConsumerNames("foo", nats.Context(ctx)) {
names = append(names, name)
}
if got, want := len(names), 5; got != want {
if got, want := len(names), 6; got != want {
t.Fatalf("Unexpected names, got=%d, want=%d", got, want)
}
})
Expand Down

0 comments on commit 069041d

Please sign in to comment.