Skip to content

Commit

Permalink
Move actual sub back to before consumer based on feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Aug 9, 2021
1 parent d295b8b commit c414086
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 60 deletions.
138 changes: 86 additions & 52 deletions js.go
Expand Up @@ -1151,10 +1151,18 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
}
}

var sub *Subscription

// Check if we are manual ack.
if cb != nil && !o.mack {
ocb := cb
cb = func(m *Msg) { ocb(m); m.Ack() }
}

// In case we need to hold onto it for ordered consumers.
var ccreq *createConsumerRequest

// If we are creating or updating let's process that request.
// If we are creating or updating let's update cfg.
if shouldCreate {
if !isPullMode {
cfg.DeliverSubject = deliver
Expand All @@ -1178,14 +1186,63 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
cfg.MaxAckPending = DefaultSubPendingMsgsLimit
}
}

// Create request here.
ccreq = &createConsumerRequest{
Stream: stream,
Config: &cfg,
}
}

jsi := &jsSub{
js: js,
stream: stream,
consumer: consumer,
durable: isDurable,
attached: attached,
deliver: deliver,
hbs: hasHeartbeats,
hbi: o.cfg.Heartbeat,
fc: hasFC,
ordered: o.ordered,
ccreq: ccreq,
dseq: 1,
}

if isPullMode {
sub, err = js.pullSubscribe(subj, jsi)
} else {
sub, err = js.nc.subscribe(deliver, queue, cb, ch, isSync, jsi)
// Since JetStream sends on different subject, make sure this reflects the user's intentions.
sub.mu.Lock()
sub.Subject = subj
sub.mu.Unlock()
}
if err != nil {
return nil, err
}

// With flow control enabled async subscriptions we will disable msgs
// limits, and set a larger pending bytes limit by default.
if !isPullMode && cb != nil && hasFC {
sub.SetPendingLimits(DefaultSubPendingMsgsLimit*16, DefaultSubPendingBytesLimit)
}

// If we fail and we had the sub we need to cleanup, but can't just do a straight Unsubscribe or Drain.
// We need to clear the jsi so we do not remove any durables etc.
cleanUpSub := func() {
if sub != nil {
sub.mu.Lock()
sub.jsi = nil
sub.mu.Unlock()
sub.Unsubscribe()
}
}

// If we are creating or updating let's process that request.
if shouldCreate {
j, err := json.Marshal(ccreq)
if err != nil {
cleanUpSub()
return nil, err
}

Expand All @@ -1198,6 +1255,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync

resp, err := js.nc.Request(js.apiSubj(ccSubj), j, js.opts.wait)
if err != nil {
cleanUpSub()
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
}
Expand All @@ -1206,10 +1264,16 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
var cinfo consumerResponse
err = json.Unmarshal(resp.Data, &cinfo)
if err != nil {
cleanUpSub()
return nil, err
}
info = cinfo.ConsumerInfo

if cinfo.Error != nil {
// We will not be using this sub here if we were push based.
if !isPullMode {
cleanUpSub()
}
// Multiple subscribers could compete in creating the first consumer
// that will be shared using the same durable name. If this happens, then
// do a lookup of the consumer info and resubscribe using the latest info.
Expand All @@ -1228,71 +1292,41 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
return nil, ErrSubjectMismatch
}

// Update attached status.
jsi.attached = true

// Use the deliver subject from latest consumer config to attach.
if ccfg.DeliverSubject != _EMPTY_ {
if info.Config.DeliverSubject != _EMPTY_ {
// We can't reuse the channel, so if one was passed, we need to create a new one.
if ch != nil {
ch = make(chan *Msg, cap(ch))
}
jsi.deliver = info.Config.DeliverSubject
// Recreate the subscription here.
sub, err = js.nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi)
if err != nil {
return nil, err
}
// Since JetStream sends on different subject, make sure this reflects the user's intentions.
sub.mu.Lock()
sub.Subject = subj
sub.mu.Unlock()
}
attached = true
} else {
if cinfo.Error.Code == 404 {
return nil, ErrStreamNotFound
}
return nil, fmt.Errorf("nats: %s", cinfo.Error.Description)
}
} else if consumer == _EMPTY_ {
// Update our consumer name here which is filled in when we create the consumer.
sub.mu.Lock()
sub.jsi.consumer = info.Name
sub.mu.Unlock()
}
stream = info.Stream
consumer = info.Name
deliver = info.Config.DeliverSubject
}

// Recent servers are more tolerant for ephemerals not immediately being present so we can do sub here.
var sub *Subscription

// Check if we are manual ack.
if cb != nil && !o.mack {
ocb := cb
cb = func(m *Msg) { ocb(m); m.Ack() }
}

jsi := &jsSub{
js: js,
stream: stream,
consumer: consumer,
durable: isDurable,
attached: attached,
deliver: deliver,
hbs: hasHeartbeats,
hbi: o.cfg.Heartbeat,
fc: hasFC,
ordered: o.ordered,
dseq: 1,
}

if isPullMode {
sub, err = js.pullSubscribe(subj, jsi)
} else {
if o.ordered {
jsi.ccreq = ccreq
}
sub, err = js.nc.subscribe(deliver, queue, cb, ch, isSync, jsi)
// Since JetStream sends on different subject, make sure this reflects the user's intentions.
sub.mu.Lock()
sub.Subject = subj
sub.mu.Unlock()
}
if err != nil {
return nil, err
}

// With flow control enabled async subscriptions we will disable msgs
// limits, and set a larger pending bytes limit by default.
if !isPullMode && cb != nil && hasFC {
sub.SetPendingLimits(DefaultSubPendingMsgsLimit*16, DefaultSubPendingBytesLimit)
}

// Do heartbeats last if needed.
if hasHeartbeats {
sub.scheduleHeartbeatCheck()
}
Expand Down
67 changes: 67 additions & 0 deletions js_test.go
Expand Up @@ -23,6 +23,7 @@ import (
"math/rand"
"os"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -342,3 +343,69 @@ func TestJetStreamOrderedConsumerWithErrors(t *testing.T) {
createStream()
testSubError(deleteConsumer)
}

// We want to make sure we do the right thing with lots of concurrent durable consumer requests.
// One should win and the others should share the delivery subject with the first one who wins.
func TestJetStreamConcurrentDurablePushConsumers(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

nc, err := Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Create stream.
_, err = js.AddStream(&StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Now create 10 durables concurrently.
subs := make(chan *Subscription, 10)
var wg sync.WaitGroup

for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
sub, _ := js.SubscribeSync("foo", Durable("dlc"))
subs <- sub
}()
}
// Wait for all the consumers.
wg.Wait()
close(subs)

si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Consumers != 1 {
t.Fatalf("Expected exactly one consumer, got %d", si.State.Consumers)
}

// Now send one message and make sure all subs get it.
js.Publish("foo", []byte("Hello"))
time.Sleep(250 * time.Millisecond) // Allow time for delivery.

for sub := range subs {
pending, _, _ := sub.Pending()
if pending != 1 {
t.Fatalf("Expected each durable to receive 1 msg, this sub got %d", pending)
}
}
}
20 changes: 12 additions & 8 deletions nats.go
Expand Up @@ -3931,12 +3931,22 @@ func (s *Subscription) AutoUnsubscribe(max int) error {
// unsubscribe performs the low level unsubscribe to the server.
// Use Subscription.Unsubscribe()
func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error {
var maxStr string
if max > 0 {
sub.mu.Lock()
sub.max = uint64(max)
if sub.delivered < sub.max {
maxStr = strconv.Itoa(max)
}
sub.mu.Unlock()
}

// For JetStream consumers, need to clean up ephemeral consumers
// or delete durable ones if called with Unsubscribe.
sub.mu.Lock()
jsi := sub.jsi
sub.mu.Unlock()
if jsi != nil {
if jsi != nil && maxStr == _EMPTY_ {
err := jsi.unsubscribe(drainMode)
if err != nil {
return err
Expand All @@ -3959,13 +3969,7 @@ func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error {
return nil
}

maxStr := _EMPTY_
if max > 0 {
s.mu.Lock()
s.max = uint64(max)
s.mu.Unlock()
maxStr = strconv.Itoa(max)
} else if !drainMode {
if maxStr == _EMPTY_ && !drainMode {
nc.removeSub(s)
}

Expand Down

0 comments on commit c414086

Please sign in to comment.