diff --git a/server/consumer.go b/server/consumer.go index 75d418771e..2240bf2f2f 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -306,6 +306,9 @@ type consumer struct { // Ack queue ackMsgs *ipQueue + + // For stream signaling. + sigSub *subscription } type proposal struct { @@ -1062,6 +1065,9 @@ func (o *consumer) setLeader(isLeader bool) { // Snapshot initial info. o.infoWithSnap(true) + // Register as a leader with our parent stream. + mset.setConsumerAsLeader(o) + // Now start up Go routine to deliver msgs. go o.loopAndGatherMsgs(qch) @@ -1107,6 +1113,11 @@ func (o *consumer) setLeader(isLeader bool) { stopAndClearTimer(&o.gwdtmr) } o.mu.Unlock() + + // Unregister as a leader with our parent stream. + if mset != nil { + mset.removeConsumerAsLeader(o) + } } } @@ -4311,3 +4322,42 @@ func (o *consumer) account() *Account { o.mu.RUnlock() return a } + +func (o *consumer) signalSub() *subscription { + o.mu.Lock() + defer o.mu.Unlock() + + if o.sigSub != nil { + return o.sigSub + } + + subject := o.cfg.FilterSubject + if subject == _EMPTY_ { + subject = fwcs + } + return &subscription{subject: []byte(subject), icb: o.processStreamSignal} +} + +// This is what will be called when our parent stream wants to kick us regarding a new message. +// We know that we are the leader and that this subject matches us by how the parent handles registering +// us with the signaling sublist. +// We do need the sequence of the message however and we use the msg as the encoded seq. +func (o *consumer) processStreamSignal(_ *subscription, _ *client, _ *Account, subject, _ string, seqb []byte) { + var le = binary.LittleEndian + seq := le.Uint64(seqb) + + o.mu.Lock() + defer o.mu.Unlock() + if o.mset == nil { + return + } + if seq > o.npcm { + o.npc++ + } + if seq < o.sseq { + return + } + if o.isPushMode() && o.active || o.isPullMode() && !o.waiting.isEmpty() { + o.signalNewMessages() + } +} diff --git a/server/norace_test.go b/server/norace_test.go index f07583b2fa..baeca3482b 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -5785,6 +5785,26 @@ func TestNoRaceJetStreamDeleteConsumerWithInterestStreamAndHighSeqs(t *testing.T } } +// Bug when we encode a timestamp that upon decode causes an error which causes server to panic. +// This can happen on consumer redelivery since they adjusted timstamps can be in the future, and result +// in a negative encoding. If that encoding was exactly -1 seconds, would cause decodeConsumerState to fail +// and the server to panic. +func TestNoRaceEncodeConsumerStateBug(t *testing.T) { + for i := 0; i < 200_000; i++ { + // Pretend we redelivered and updated the timestamp to reflect the new start time for expiration. + // The bug will trip when time.Now() rounded to seconds in encode is 1 second below the truncated version + // of pending. + pending := Pending{Sequence: 1, Timestamp: time.Now().Add(time.Second).UnixNano()} + state := ConsumerState{ + Delivered: SequencePair{Consumer: 1, Stream: 1}, + Pending: map[uint64]*Pending{1: &pending}, + } + buf := encodeConsumerState(&state) + _, err := decodeConsumerState(buf) + require_NoError(t, err) + } +} + // Performance impact on stream ingress with large number of consumers. func TestJetStreamLargeNumConsumersPerfImpact(t *testing.T) { skip(t) @@ -5880,22 +5900,147 @@ func TestJetStreamLargeNumConsumersPerfImpact(t *testing.T) { fmt.Printf("%.0f msgs/sec\n", float64(toSend)/tt.Seconds()) } -// Bug when we encode a timestamp that upon decode causes an error which causes server to panic. -// This can happen on consumer redelivery since they adjusted timstamps can be in the future, and result -// in a negative encoding. If that encoding was exactly -1 seconds, would cause decodeConsumerState to fail -// and the server to panic. -func TestNoRaceEncodeConsumerStateBug(t *testing.T) { - for i := 0; i < 200_000; i++ { - // Pretend we redelivered and updated the timestamp to reflect the new start time for expiration. - // The bug will trip when time.Now() rounded to seconds in encode is 1 second below the truncated version - // of pending. - pending := Pending{Sequence: 1, Timestamp: time.Now().Add(time.Second).UnixNano()} - state := ConsumerState{ - Delivered: SequencePair{Consumer: 1, Stream: 1}, - Pending: map[uint64]*Pending{1: &pending}, - } - buf := encodeConsumerState(&state) - _, err := decodeConsumerState(buf) +// Performance impact on large number of consumers but sparse delivery. +func TestJetStreamLargeNumConsumersSparseDelivery(t *testing.T) { + skip(t) + + s := RunBasicJetStreamServer() + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + // Client for API requests. + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"ID.*"}, + }) + require_NoError(t, err) + + // Now add in ~10k consumers on different subjects. + for i := 3; i <= 10_000; i++ { + _, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: fmt.Sprintf("d-%d", i), + FilterSubject: fmt.Sprintf("ID.%d", i), + AckPolicy: nats.AckNonePolicy, + }) require_NoError(t, err) } + + toSend := 100_000 + + // Bind a consumer to ID.2. + var received int + done := make(chan bool) + + nc, js = jsClientConnect(t, s) + defer nc.Close() + + mh := func(m *nats.Msg) { + received++ + if received >= toSend { + close(done) + } + } + _, err = js.Subscribe("ID.2", mh) + require_NoError(t, err) + + last := make(chan bool) + _, err = js.Subscribe("ID.1", func(_ *nats.Msg) { close(last) }) + require_NoError(t, err) + + nc, _ = jsClientConnect(t, s) + defer nc.Close() + js, err = nc.JetStream(nats.PublishAsyncMaxPending(8 * 1024)) + require_NoError(t, err) + + start := time.Now() + for i := 0; i < toSend; i++ { + js.PublishAsync("ID.2", []byte("ok")) + } + // Check latency for this one message. + // This will show the issue better than throughput which can bypass signal processing. + js.PublishAsync("ID.1", []byte("ok")) + + select { + case <-done: + break + case <-time.After(10 * time.Second): + t.Fatalf("Failed to receive all messages: %d of %d\n", received, toSend) + } + + tt := time.Since(start) + fmt.Printf("Took %v to receive %d msgs\n", tt, toSend) + fmt.Printf("%.0f msgs/s\n", float64(toSend)/tt.Seconds()) + + select { + case <-last: + break + case <-time.After(30 * time.Second): + t.Fatalf("Failed to receive last message\n") + } + lt := time.Since(start) + + fmt.Printf("Took %v to receive last msg\n", lt) +} + +func TestNoRaceJetStreamEndToEndLatency(t *testing.T) { + // skip(t) + + s := RunBasicJetStreamServer() + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + // Client for API requests. + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + nc, js = jsClientConnect(t, s) + defer nc.Close() + + var sent time.Time + next := make(chan struct{}) + + var ( + total time.Duration + min time.Duration + max time.Duration + ) + mh := func(m *nats.Msg) { + received := time.Now() + tt := received.Sub(sent) + if min == 0 || tt < min { + min = tt + } + if max == 0 || tt > max { + max = tt + } + total += tt + next <- struct{}{} + } + _, err = js.Subscribe("foo", mh) + require_NoError(t, err) + + nc, js = jsClientConnect(t, s) + defer nc.Close() + + toSend := 100_000 + for i := 0; i < toSend; i++ { + sent = time.Now() + js.PublishAsync("foo", []byte("ok")) + <-next + } + + fmt.Printf("AVG: %v\nMIN: %v\nMAX: %v\n", total/time.Duration(toSend), min, max) } diff --git a/server/stream.go b/server/stream.go index bc5cc7ab01..669fd1fb09 100644 --- a/server/stream.go +++ b/server/stream.go @@ -16,6 +16,7 @@ package server import ( "archive/tar" "bytes" + "encoding/binary" "encoding/json" "errors" "fmt" @@ -222,11 +223,12 @@ type stream struct { // For republishing. tr *transform - // For processing consumers as a list without main stream lock. + // For processing consumers without main stream lock. clsMu sync.RWMutex cList []*consumer sch chan struct{} sigq *ipQueue // of *cMsg + csl *Sublist // TODO(dlc) - Hide everything below behind two pointers. // Clustered mode. @@ -4054,23 +4056,16 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, // Signal consumers for new messages. if numConsumers > 0 { - if numConsumers > consumerSignalThreshold { - mset.sigq.push(newCMsg(subject, seq)) - select { - case mset.sch <- struct{}{}: - default: - } - } else { - mset.signalConsumers(subject, seq) + mset.sigq.push(newCMsg(subject, seq)) + select { + case mset.sch <- struct{}{}: + default: } } return nil } -// Number of consumers to consider offloading signal processing. -const consumerSignalThreshold = 10 - // Used to signal inbound message to registered consumers. type cMsg struct { seq uint64 @@ -4134,19 +4129,21 @@ func (mset *stream) signalConsumers(subj string, seq uint64) { mset.clsMu.RLock() defer mset.clsMu.RUnlock() - for _, o := range mset.cList { - o.mu.Lock() - if o.isLeader() && o.isFilteredMatch(subj) { - if seq > o.npcm { - o.npc++ - } - if o.mset != nil { - if o.isPushMode() && o.active || o.isPullMode() && !o.waiting.isEmpty() { - o.signalNewMessages() - } - } - } - o.mu.Unlock() + if mset.csl == nil { + return + } + + r := mset.csl.Match(subj) + if len(r.psubs) == 0 { + return + } + // Encode the sequence here. + var eseq [8]byte + var le = binary.LittleEndian + le.PutUint64(eseq[:], seq) + msg := eseq[:] + for _, sub := range r.psubs { + sub.icb(sub, nil, nil, subj, _EMPTY_, msg) } } @@ -4408,7 +4405,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { obs = append(obs, o) } mset.clsMu.Lock() - mset.consumers, mset.cList = nil, nil + mset.consumers, mset.cList, mset.csl = nil, nil, nil mset.clsMu.Unlock() // Check if we are a mirror. @@ -4589,17 +4586,6 @@ func (mset *stream) numConsumers() int { return len(mset.consumers) } -// Lock should be held -// Don't expect this to be called at high rates. -func (mset *stream) updateConsumerList() { - mset.clsMu.Lock() - defer mset.clsMu.Unlock() - mset.cList = make([]*consumer, 0, len(mset.consumers)) - for _, o := range mset.consumers { - mset.cList = append(mset.cList, o) - } -} - // Lock should be held. func (mset *stream) setConsumer(o *consumer) { mset.consumers[o.name] = o @@ -4610,7 +4596,9 @@ func (mset *stream) setConsumer(o *consumer) { mset.directs++ } // Now update consumers list as well - mset.updateConsumerList() + mset.clsMu.Lock() + mset.cList = append(mset.cList, o) + mset.clsMu.Unlock() } // Lock should be held. @@ -4621,9 +4609,42 @@ func (mset *stream) removeConsumer(o *consumer) { if o.cfg.Direct && mset.directs > 0 { mset.directs-- } - delete(mset.consumers, o.name) - // Now update consumers list as well - mset.updateConsumerList() + if mset.consumers != nil { + delete(mset.consumers, o.name) + // Now update consumers list as well + mset.clsMu.Lock() + for i, ol := range mset.cList { + if ol == o { + mset.cList = append(mset.cList[:i], mset.cList[i+1:]...) + break + } + } + // Always remove from the leader sublist. + if mset.csl != nil { + mset.csl.Remove(o.signalSub()) + } + mset.clsMu.Unlock() + } +} + +// Set the consumer as a leader. This will update signaling sublist. +func (mset *stream) setConsumerAsLeader(o *consumer) { + mset.clsMu.Lock() + defer mset.clsMu.Unlock() + + if mset.csl == nil { + mset.csl = NewSublistWithCache() + } + mset.csl.Insert(o.signalSub()) +} + +// Remove the consumer as a leader. This will update signaling sublist. +func (mset *stream) removeConsumerAsLeader(o *consumer) { + mset.clsMu.Lock() + defer mset.clsMu.Unlock() + if mset.csl != nil { + mset.csl.Remove(o.signalSub()) + } } // lookupConsumer will retrieve a consumer by name.