Skip to content

Commit

Permalink
Fix async pub paf id for old js API
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
Jarema committed Nov 28, 2023
1 parent 88b4982 commit bb64e1b
Showing 1 changed file with 20 additions and 17 deletions.
37 changes: 20 additions & 17 deletions js.go
Expand Up @@ -227,14 +227,16 @@ type js struct {
opts *jsOpts

// For async publish context.
mu sync.RWMutex
rpre string
rsub *Subscription
pafs map[string]*pubAckFuture
stc chan struct{}
dch chan struct{}
rr *rand.Rand
connStatusCh chan (Status)
mu sync.RWMutex
rpre string
rsub *Subscription
pafs map[string]*pubAckFuture
stc chan struct{}
dch chan struct{}
rr *rand.Rand
connStatusCh chan (Status)
replyPrefix string
replyPrefixLen int
}

type jsOpts struct {
Expand Down Expand Up @@ -283,6 +285,12 @@ func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
maxpa: defaultAsyncPubAckInflight,
},
}
inboxPrefix := InboxPrefix
if js.nc.Opts.InboxPrefix != _EMPTY_ {
inboxPrefix = js.nc.Opts.InboxPrefix + "."
}
js.replyPrefix = inboxPrefix
js.replyPrefixLen = len(js.replyPrefix) + aReplyTokensize + 1

for _, opt := range opts {
if err := opt.configureJSContext(js.opts); err != nil {
Expand Down Expand Up @@ -641,7 +649,6 @@ func (paf *pubAckFuture) Msg() *Msg {
}

// For quick token lookup etc.
const aReplyPreLen = 14
const aReplyTokensize = 6

func (js *js) newAsyncReply() string {
Expand All @@ -654,11 +661,7 @@ func (js *js) newAsyncReply() string {
for i := 0; i < aReplyTokensize; i++ {
b[i] = rdigits[int(b[i]%base)]
}
inboxPrefix := InboxPrefix
if js.nc.Opts.InboxPrefix != _EMPTY_ {
inboxPrefix = js.nc.Opts.InboxPrefix + "."
}
js.rpre = fmt.Sprintf("%s%s.", inboxPrefix, b[:aReplyTokensize])
js.rpre = fmt.Sprintf("%s%s.", js.replyPrefix, b[:aReplyTokensize])
sub, err := js.nc.Subscribe(fmt.Sprintf("%s*", js.rpre), js.handleAsyncReply)
if err != nil {
js.mu.Unlock()
Expand Down Expand Up @@ -767,10 +770,10 @@ func (js *js) asyncStall() <-chan struct{} {

// Handle an async reply from PublishAsync.
func (js *js) handleAsyncReply(m *Msg) {
if len(m.Subject) <= aReplyPreLen {
if len(m.Subject) <= js.replyPrefixLen {
return
}
id := m.Subject[aReplyPreLen:]
id := m.Subject[js.replyPrefixLen:]

js.mu.Lock()
paf := js.getPAF(id)
Expand Down Expand Up @@ -916,7 +919,7 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
return nil, errors.New("nats: error creating async reply handler")
}

id := m.Reply[aReplyPreLen:]
id := m.Reply[js.replyPrefixLen:]
paf := &pubAckFuture{msg: m, st: time.Now()}
numPending, maxPending := js.registerPAF(id, paf)

Expand Down

0 comments on commit bb64e1b

Please sign in to comment.