Skip to content

Commit

Permalink
Merge 0a5fe4c into 19f305b
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrpio committed Mar 23, 2023
2 parents 19f305b + 0a5fe4c commit d7cc64e
Showing 1 changed file with 34 additions and 5 deletions.
39 changes: 34 additions & 5 deletions js.go
Expand Up @@ -1610,6 +1610,8 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if isPullMode {
nms = fmt.Sprintf(js.apiSubj(apiRequestNextT), stream, consumer)
deliver = nc.NewInbox()
// for pull consumers, create a wildcard subscription to differentiate pull requests
deliver += ".*"
}

// In case this has a context, then create a child context that
Expand Down Expand Up @@ -2629,7 +2631,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {

nc := sub.conn
nms := sub.jsi.nms
rply := sub.jsi.deliver
rply, reqID := newFetchInbox(jsi.deliver)
js := sub.jsi.js
pmc := len(sub.mch) > 0

Expand Down Expand Up @@ -2751,10 +2753,18 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
// wait this time.
noWait = false
err = sendReq()
} else if err == ErrTimeout && len(msgs) == 0 {
} else if err == ErrTimeout {
// If we get a 408, we will bail if we already collected some
// messages, otherwise ignore and go back calling NextMsg.
err = nil
// messages, otherwise ignore and go back calling nextMsg.
if len(msgs) == 0 {
err = nil
continue
}
subjectParts := strings.Split(msg.Subject, ".")
// ignore timeout message from server if it comes from a different pull request
if reqID != "" && subjectParts[len(subjectParts)-1] != reqID {
err = nil
}
}
}
}
Expand All @@ -2766,6 +2776,20 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
return msgs, nil
}

// newFetchInbox returns subject used as reply subject when sending pull requests
// as well as request ID. For non-wildcard subject, request ID is empty and
// passed subject is not transformed
func newFetchInbox(subj string) (string, string) {
if !strings.HasSuffix(subj, ".*") {
return subj, ""
}
reqID := nuid.Next()
var sb strings.Builder
sb.WriteString(subj[:len(subj)-1])
sb.WriteString(reqID)
return sb.String(), reqID
}

// MessageBatch provides methods to retrieve messages consumed using [Subscribe.FetchBatch].
type MessageBatch interface {
// Messages returns a channel on which messages will be published.
Expand Down Expand Up @@ -2835,7 +2859,7 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e

nc := sub.conn
nms := sub.jsi.nms
rply := sub.jsi.deliver
rply, reqID := newFetchInbox(sub.jsi.deliver)
js := sub.jsi.js
pmc := len(sub.mch) > 0

Expand Down Expand Up @@ -2964,6 +2988,11 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
usrMsg, err = checkMsg(msg, true, false)
if err != nil {
if err == ErrTimeout {
subjectParts := strings.Split(msg.Subject, ".")
if reqID != "" && subjectParts[len(subjectParts)-1] != reqID {
// ignore timeout message from server if it comes from a different pull request
continue
}
err = nil
}
break
Expand Down

0 comments on commit d7cc64e

Please sign in to comment.