Skip to content

Commit

Permalink
Merge pull request #721 from nats-io/js-pull-subscribe-fetch-sub
Browse files Browse the repository at this point in the history
js: Add async sub for PullSubscribe to make fetch requests
  • Loading branch information
wallyqs committed Apr 30, 2021
2 parents 5e87f99 + 37f9684 commit 793562c
Showing 1 changed file with 119 additions and 8 deletions.
127 changes: 119 additions & 8 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -400,6 +401,27 @@ func (paf *pubAckFuture) Msg() *Msg {
return paf.msg
}

// pullSubscribe creates the wildcard subscription used per pull subscriber
// to make fetch requests.
func (js *js) pullSubscribe(subj string) (*Subscription, error) {
jsi := &jsSub{js: js, pull: true}

// Similar to async request handler we create a wildcard subscription for making requests,
// though we do not use the token based approach since we cannot match the response to
// the requestor due to JS subject being remapped on delivery. Instead, we just use an array
// of channels similar to how ping/pong interval is handled and send the message to the first
// available requestor via a channel.
jsi.rr = rand.New(rand.NewSource(time.Now().UnixNano()))
jsi.rpre = fmt.Sprintf("%s.", NewInbox())
sub, err := js.nc.Subscribe(fmt.Sprintf("%s*", jsi.rpre), jsi.handleFetch)
if err != nil {
return nil, err
}
jsi.psub = sub

return &Subscription{Subject: subj, conn: js.nc, typ: PullSubscription, jsi: jsi}, nil
}

// For quick token lookup etc.
const aReplyPreLen = 14
const aReplyTokensize = 6
Expand Down Expand Up @@ -810,7 +832,15 @@ type nextRequest struct {

// jsSub includes JetStream subscription info.
type jsSub struct {
js *js
js *js

// To setup request mux handler for pull subscribers.
mu sync.RWMutex
psub *Subscription
rpre string
rr *rand.Rand
freqs []chan *Msg

consumer string
stream string
deliver string
Expand All @@ -826,6 +856,80 @@ type jsSub struct {
cmeta atomic.Value
}

// newFetchReply generates a unique inbox used for a fetch request.
func (jsi *jsSub) newFetchReply() string {
jsi.mu.Lock()
rpre := jsi.rpre
rn := jsi.rr.Int63()
jsi.mu.Unlock()
var sb strings.Builder
sb.WriteString(rpre)
var b [aReplyTokensize]byte
for i, l := 0, rn; i < len(b); i++ {
b[i] = rdigits[l%base]
l /= base
}
sb.Write(b[:])
return sb.String()
}

// handleFetch is delivered a message requested by pull subscribers
// when calling Fetch.
func (jsi *jsSub) handleFetch(m *Msg) {
jsi.mu.Lock()
if len(jsi.freqs) == 0 {
nc := jsi.js.nc
sub := jsi.psub
nc.mu.Lock()
errCB := nc.Opts.AsyncErrorCB
err := fmt.Errorf("nats: fetch response delivered but requestor has gone away")
if errCB != nil {
nc.ach.push(func() { errCB(nc, sub, err) })
}
nc.mu.Unlock()
jsi.mu.Unlock()
return
}
mch := jsi.freqs[0]
if len(jsi.freqs) > 1 {
jsi.freqs = append(jsi.freqs[:0], jsi.freqs[1:]...)
} else {
jsi.freqs = jsi.freqs[:0]
}
jsi.mu.Unlock()
mch <- m
}

// fetchNoWait makes a request to get a single message using no wait.
func (jsi *jsSub) fetchNoWait(ctx context.Context, subj string, payload []byte) (*Msg, error) {
nc := jsi.js.nc
m := NewMsg(subj)
m.Reply = jsi.newFetchReply()
m.Data = payload

mch := make(chan *Msg, 1)
jsi.mu.Lock()
jsi.freqs = append(jsi.freqs, mch)
jsi.mu.Unlock()
if err := nc.PublishMsg(m); err != nil {
return nil, err
}

var ok bool
var msg *Msg

select {
case msg, ok = <-mch:
if !ok {
return nil, ErrConnectionClosed
}
case <-ctx.Done():
return nil, ctx.Err()
}

return msg, nil
}

// controlMetadata is metadata used to be able to detect sequence mismatch
// errors in push based consumers that have heartbeats enabled.
type controlMetadata struct {
Expand All @@ -838,6 +942,11 @@ func (jsi *jsSub) unsubscribe(drainMode bool) error {
// consumers when using drain mode.
return nil
}
// Clear the extra async pull subscription used for fetch requests.
if jsi.psub != nil {
jsi.psub.Drain()
}

js := jsi.js
return js.DeleteConsumer(jsi.stream, jsi.consumer)
}
Expand Down Expand Up @@ -978,12 +1087,12 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
}

if isPullMode {
sub = &Subscription{Subject: subj, conn: js.nc, typ: PullSubscription, jsi: &jsSub{js: js, pull: isPullMode}}
sub, err = js.pullSubscribe(subj)
} else {
sub, err = js.nc.subscribe(deliver, queue, cb, ch, isSync, &jsSub{js: js, hbs: hasHeartbeats, fc: hasFC})
if err != nil {
return nil, err
}
}
if err != nil {
return nil, err
}

// If we are creating or updating let's process that request.
Expand Down Expand Up @@ -1362,7 +1471,8 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
}

sub.mu.Lock()
if sub.jsi == nil || sub.typ != PullSubscription {
jsi := sub.jsi
if jsi == nil || sub.typ != PullSubscription {
sub.mu.Unlock()
return nil, ErrTypeSubscription
}
Expand Down Expand Up @@ -1435,9 +1545,10 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
msgs = make([]*Msg, 0)
)

// In case of only one message, then can already handle with built-in request functions.
if batch == 1 {
resp, err := nc.oldRequestWithContext(ctx, reqNext, nil, req)
// To optimize single message no wait fetch, we use a shared wildcard
// subscription per pull subscriber to wait for the response.
resp, err := jsi.fetchNoWait(ctx, reqNext, req)
if err != nil {
return nil, checkCtxErr(err)
}
Expand Down

0 comments on commit 793562c

Please sign in to comment.