Skip to content

Commit

Permalink
Merge pull request #1263 from nats-io/consumer-name
Browse files Browse the repository at this point in the history
Add ConsumerName option, use similar ephemeral names as server
  • Loading branch information
wallyqs committed May 12, 2023
2 parents d313991 + 1537c0b commit 2765665
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 3 deletions.
30 changes: 27 additions & 3 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -1659,10 +1659,12 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
}

// If we are creating or updating let's process that request.
consName := o.cfg.Name
if shouldCreate {
consName := cfg.Durable
if consName == "" {
consName = nuid.Next()
if cfg.Durable != "" {
consName = cfg.Durable
} else if consName == "" {
consName = getHash(nuid.Next())
}
info, err := js.upsertConsumer(stream, consName, ccreq.Config)
if err != nil {
Expand Down Expand Up @@ -2474,6 +2476,14 @@ func ConsumerMemoryStorage() SubOpt {
})
}

// ConsumerName sets the name for a consumer.
func ConsumerName(name string) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.Name = name
return nil
})
}

func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
Expand Down Expand Up @@ -3646,3 +3656,17 @@ func (st *StorageType) UnmarshalJSON(data []byte) error {
}
return nil
}

// Length of our hash used for named consumers.
const nameHashLen = 8

// Computes a hash for the given `name`.
func getHash(name string) string {
sha := sha256.New()
sha.Write([]byte(name))
b := sha.Sum(nil)
for i := 0; i < nameHashLen; i++ {
b[i] = rdigits[int(b[i]%base)]
}
return string(b[:nameHashLen])
}
92 changes: 92 additions & 0 deletions test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8789,3 +8789,95 @@ func TestJetStreamStreamInfoAlternates(t *testing.T) {
}
})
}

func TestJetStreamSubscribeConsumerName(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

var err error

// Create the stream using our client API.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar", "baz", "foo.*"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
_, err = js.Publish("foo", []byte("first"))
if err != nil {
t.Fatal(err)
}

// Lookup the stream for testing.
_, err = js.StreamInfo("TEST")
if err != nil {
t.Fatalf("stream lookup failed: %v", err)
}

sub, err := js.SubscribeSync("foo", nats.ConsumerName("my-ephemeral"))
if err != nil {
t.Fatal(err)
}
cinfo, err := sub.ConsumerInfo()
if err != nil {
t.Fatal(err)
}
got := cinfo.Config.Name
expected := "my-ephemeral"
if got != expected {
t.Fatalf("Expected: %v, got: %v", expected, got)
}
// Confirm that this is a durable.
got = cinfo.Config.Durable
expected = ""
if got != expected {
t.Fatalf("Expected: %v, got: %v", expected, got)
}
_, err = sub.NextMsg(1 * time.Second)
if err != nil {
t.Fatal(err)
}

// ConsumerName will be ignored in case a durable name has been set.
sub, err = js.SubscribeSync("foo", nats.Durable("durable"), nats.ConsumerName("custom-name"))
if err != nil {
t.Fatal(err)
}
cinfo, err = sub.ConsumerInfo()
if err != nil {
t.Fatal(err)
}
got = cinfo.Config.Name
expected = "durable"
if got != expected {
t.Fatalf("Expected: %v, got: %v", expected, got)
}
got = cinfo.Config.Durable
expected = "durable"
if got != expected {
t.Fatalf("Expected: %v, got: %v", expected, got)
}
_, err = sub.NextMsg(1 * time.Second)
if err != nil {
t.Fatal(err)
}

// Default Ephemeral name should be short like in the server.
sub, err = js.SubscribeSync("foo", nats.ConsumerName(""))
if err != nil {
t.Fatal(err)
}
cinfo, err = sub.ConsumerInfo()
if err != nil {
t.Fatal(err)
}
expectedSize := 8
result := len(cinfo.Config.Name)
if result != expectedSize {
t.Fatalf("Expected: %v, got: %v", expectedSize, result)
}
}

0 comments on commit 2765665

Please sign in to comment.