Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix async publish PAF id #1476

Merged
merged 2 commits into from Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 18 additions & 3 deletions jetstream/jetstream.go
Expand Up @@ -144,9 +144,11 @@ type (
JetStreamOpt func(*jsOpts) error

jsOpts struct {
publisherOpts asyncPublisherOpts
apiPrefix string
clientTrace *ClientTrace
publisherOpts asyncPublisherOpts
apiPrefix string
replyPrefix string
replyPrefixLen int
clientTrace *ClientTrace
}

// ClientTrace can be used to trace API interactions for the JetStream Context.
Expand Down Expand Up @@ -229,6 +231,7 @@ func New(nc *nats.Conn, opts ...JetStreamOpt) (JetStream, error) {
maxpa: defaultAsyncPubAckInflight,
},
}
setReplyPrefix(nc, &jsOpts)
for _, opt := range opts {
if err := opt(&jsOpts); err != nil {
return nil, err
Expand All @@ -248,6 +251,16 @@ const (
defaultAsyncPubAckInflight = 4000
)

func setReplyPrefix(nc *nats.Conn, jsOpts *jsOpts) {
jsOpts.replyPrefix = nats.InboxPrefix
if nc.Opts.InboxPrefix != "" {
jsOpts.replyPrefix = nc.Opts.InboxPrefix + "."
}
// Add 1 for the dot separator.
jsOpts.replyPrefixLen = len(jsOpts.replyPrefix) + aReplyTokensize + 1

}

// NewWithAPIPrefix returns a new JetStream instance and sets the API prefix to be used in requests to JetStream API
//
// Available options:
Expand All @@ -261,6 +274,7 @@ func NewWithAPIPrefix(nc *nats.Conn, apiPrefix string, opts ...JetStreamOpt) (Je
maxpa: defaultAsyncPubAckInflight,
},
}
setReplyPrefix(nc, &jsOpts)
for _, opt := range opts {
if err := opt(&jsOpts); err != nil {
return nil, err
Expand Down Expand Up @@ -293,6 +307,7 @@ func NewWithDomain(nc *nats.Conn, domain string, opts ...JetStreamOpt) (JetStrea
maxpa: defaultAsyncPubAckInflight,
},
}
setReplyPrefix(nc, &jsOpts)
for _, opt := range opts {
if err := opt(&jsOpts); err != nil {
return nil, err
Expand Down
15 changes: 5 additions & 10 deletions jetstream/publish.go
Expand Up @@ -268,7 +268,7 @@ func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFut
if err != nil {
return nil, fmt.Errorf("nats: error creating async reply handler: %s", err)
}
id = m.Reply[aReplyPreLen:]
id = m.Reply[js.replyPrefixLen:]
paf = &pubAckFuture{msg: m, jsClient: js.publisher, maxRetries: o.retryAttempts, retryWait: o.retryWait}
numPending, maxPending := js.registerPAF(id, paf)

Expand All @@ -282,7 +282,7 @@ func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFut
}
} else {
// when retrying, get the ID from existing reply subject
id = m.Reply[aReplyPreLen:]
id = m.Reply[js.replyPrefixLen:]
}

if err := js.conn.PublishMsg(m); err != nil {
Expand All @@ -295,7 +295,6 @@ func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFut

// For quick token lookup etc.
const (
aReplyPreLen = 14
aReplyTokensize = 6
)

Expand All @@ -309,11 +308,7 @@ func (js *jetStream) newAsyncReply() (string, error) {
for i := 0; i < aReplyTokensize; i++ {
b[i] = rdigits[int(b[i]%base)]
}
inboxPrefix := "_INBOX"
if js.conn.Opts.InboxPrefix != "" {
inboxPrefix = js.conn.Opts.InboxPrefix
}
js.publisher.replyPrefix = fmt.Sprintf("%s.%s.", inboxPrefix, b[:aReplyTokensize])
js.publisher.replyPrefix = fmt.Sprintf("%s%s.", js.replyPrefix, b[:aReplyTokensize])
sub, err := js.conn.Subscribe(fmt.Sprintf("%s*", js.publisher.replyPrefix), js.handleAsyncReply)
if err != nil {
js.publisher.Unlock()
Expand Down Expand Up @@ -341,10 +336,10 @@ func (js *jetStream) newAsyncReply() (string, error) {

// Handle an async reply from PublishAsync.
func (js *jetStream) handleAsyncReply(m *nats.Msg) {
if len(m.Subject) <= aReplyPreLen {
if len(m.Subject) <= js.replyPrefixLen {
return
}
id := m.Subject[aReplyPreLen:]
id := m.Subject[js.replyPrefixLen:]

js.publisher.Lock()

Expand Down
37 changes: 20 additions & 17 deletions js.go
Expand Up @@ -227,14 +227,16 @@ type js struct {
opts *jsOpts

// For async publish context.
mu sync.RWMutex
rpre string
rsub *Subscription
pafs map[string]*pubAckFuture
stc chan struct{}
dch chan struct{}
rr *rand.Rand
connStatusCh chan (Status)
mu sync.RWMutex
rpre string
rsub *Subscription
pafs map[string]*pubAckFuture
stc chan struct{}
dch chan struct{}
rr *rand.Rand
connStatusCh chan (Status)
replyPrefix string
replyPrefixLen int
}

type jsOpts struct {
Expand Down Expand Up @@ -283,6 +285,12 @@ func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
maxpa: defaultAsyncPubAckInflight,
},
}
inboxPrefix := InboxPrefix
if js.nc.Opts.InboxPrefix != _EMPTY_ {
inboxPrefix = js.nc.Opts.InboxPrefix + "."
}
js.replyPrefix = inboxPrefix
js.replyPrefixLen = len(js.replyPrefix) + aReplyTokensize + 1

for _, opt := range opts {
if err := opt.configureJSContext(js.opts); err != nil {
Expand Down Expand Up @@ -641,7 +649,6 @@ func (paf *pubAckFuture) Msg() *Msg {
}

// For quick token lookup etc.
const aReplyPreLen = 14
const aReplyTokensize = 6

func (js *js) newAsyncReply() string {
Expand All @@ -654,11 +661,7 @@ func (js *js) newAsyncReply() string {
for i := 0; i < aReplyTokensize; i++ {
b[i] = rdigits[int(b[i]%base)]
}
inboxPrefix := InboxPrefix
if js.nc.Opts.InboxPrefix != _EMPTY_ {
inboxPrefix = js.nc.Opts.InboxPrefix + "."
}
js.rpre = fmt.Sprintf("%s%s.", inboxPrefix, b[:aReplyTokensize])
js.rpre = fmt.Sprintf("%s%s.", js.replyPrefix, b[:aReplyTokensize])
sub, err := js.nc.Subscribe(fmt.Sprintf("%s*", js.rpre), js.handleAsyncReply)
if err != nil {
js.mu.Unlock()
Expand Down Expand Up @@ -767,10 +770,10 @@ func (js *js) asyncStall() <-chan struct{} {

// Handle an async reply from PublishAsync.
func (js *js) handleAsyncReply(m *Msg) {
if len(m.Subject) <= aReplyPreLen {
if len(m.Subject) <= js.replyPrefixLen {
return
}
id := m.Subject[aReplyPreLen:]
id := m.Subject[js.replyPrefixLen:]

js.mu.Lock()
paf := js.getPAF(id)
Expand Down Expand Up @@ -916,7 +919,7 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
return nil, errors.New("nats: error creating async reply handler")
}

id := m.Reply[aReplyPreLen:]
id := m.Reply[js.replyPrefixLen:]
paf := &pubAckFuture{msg: m, st: time.Now()}
numPending, maxPending := js.registerPAF(id, paf)

Expand Down