From 08cdb2d2ea2bc4a735c74cfa4075ee8893730882 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 15 Jun 2021 04:44:05 -0700 Subject: [PATCH 1/4] Make filtered consumers in large mixed streams more efficient. Allow wider scoped filtered subjects. We introduce a per subject information tracking to filestore to optimize for large mux'd streams and more efficient filtered consumers. Signed-off-by: Derek Collison --- server/consumer.go | 69 +++-- server/filestore.go | 515 ++++++++++++++++++++++++------- server/jetstream.go | 1 - server/jetstream_cluster.go | 3 +- server/jetstream_cluster_test.go | 8 +- server/jetstream_test.go | 59 +++- server/memstore.go | 27 +- server/norace_test.go | 151 +++++++++ server/store.go | 9 +- server/stream.go | 10 - 10 files changed, 680 insertions(+), 172 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index f43cd063e0..66a06134b2 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -345,7 +345,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri config.MaxAckPending = JsDefaultMaxAckPending } - // Make sure any partition subject is also a literal. + // As best we can make sure the filtered subject is valid. if config.FilterSubject != _EMPTY_ { subjects, hasExt := mset.allSubjects() if !validFilteredSubject(config.FilterSubject, subjects) && !hasExt { @@ -393,7 +393,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri } sampleFreq := 0 - if config.SampleFrequency != "" { + if config.SampleFrequency != _EMPTY_ { s := strings.TrimSuffix(config.SampleFrequency, "%") sampleFreq, err = strconv.Atoi(s) if err != nil { @@ -724,8 +724,8 @@ func (o *consumer) setLeader(isLeader bool) { } } - // Setup initial pending. - o.setInitialPending() + // Setup initial pending and proper start sequence. + o.setInitialPendingAndStart() // If push mode, register for notifications on interest. if o.isPushMode() { @@ -1195,6 +1195,9 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) { } } + // In case we have anything pending on entry. + forwardProposals() + for { select { case <-qch: @@ -2560,28 +2563,6 @@ func (o *consumer) nextSeq() uint64 { return dseq } -// This will select the store seq to start with based on the -// partition subject. -func (o *consumer) selectSubjectLast() { - stats := o.mset.store.State() - if stats.LastSeq == 0 { - o.sseq = stats.LastSeq - return - } - // FIXME(dlc) - this is linear and can be optimized by store layer. - for seq := stats.LastSeq; seq >= stats.FirstSeq; seq-- { - subj, _, _, _, err := o.mset.store.LoadMsg(seq) - if err == ErrStoreMsgNotFound { - continue - } - if o.isFilteredMatch(subj) { - o.sseq = seq - o.updateSkipped() - return - } - } -} - // Will select the starting sequence. func (o *consumer) selectStartingSeqNo() { if o.mset == nil || o.mset.store == nil { @@ -2593,16 +2574,16 @@ func (o *consumer) selectStartingSeqNo() { o.sseq = stats.FirstSeq } else if o.cfg.DeliverPolicy == DeliverLast { o.sseq = stats.LastSeq - // If we are partitioned here we may need to walk backwards. + // If we are partitioned here this will be properly set when we become leader. if o.cfg.FilterSubject != _EMPTY_ { - o.selectSubjectLast() + ss := o.mset.store.FilteredState(1, o.cfg.FilterSubject) + o.sseq = ss.Last } } else if o.cfg.OptStartTime != nil { // If we are here we are time based. // TODO(dlc) - Once clustered can't rely on this. o.sseq = o.mset.store.GetSeqFromTime(*o.cfg.OptStartTime) } else { - // Default is deliver new only. o.sseq = stats.LastSeq + 1 } } else { @@ -2868,11 +2849,22 @@ func (mset *stream) deliveryFormsCycle(deliverySubject string) bool { return false } +// Check that the filtered subject is valid given a set of stream subjects. func validFilteredSubject(filteredSubject string, subjects []string) bool { + if !IsValidSubject(filteredSubject) { + return false + } + hasWC := subjectHasWildcard(filteredSubject) + for _, subject := range subjects { if subjectIsSubsetMatch(filteredSubject, subject) { return true } + // If we have a wildcard as the filtered subject check to see if we are + // a wider scope but do match a subject. + if hasWC && subjectIsSubsetMatch(subject, filteredSubject) { + return true + } } return false } @@ -2920,9 +2912,9 @@ func (o *consumer) requestNextMsgSubject() string { return o.nextMsgSubj } -// Will set the initial pending. +// Will set the initial pending and start sequence. // mset lock should be held. -func (o *consumer) setInitialPending() { +func (o *consumer) setInitialPendingAndStart() { mset := o.mset if mset == nil || mset.store == nil { return @@ -2945,7 +2937,20 @@ func (o *consumer) setInitialPending() { } } else { // Here we are filtered. - o.sgap = o.mset.store.NumFilteredPending(o.sseq, o.cfg.FilterSubject) + ss := mset.store.FilteredState(o.sseq, o.cfg.FilterSubject) + if ss.Msgs > 0 { + o.sgap = ss.Msgs + // See if we should update our starting sequence. + if dp := o.cfg.DeliverPolicy; dp == DeliverLast { + o.sseq = ss.Last + } else if dp == DeliverNew { + o.sseq = ss.Last + 1 + } else { + // DeliverAll, DeliverByStartSequence, DeliverByStartTime + o.sseq = ss.First + } + } + o.updateSkipped() } } diff --git a/server/filestore.go b/server/filestore.go index 4036e5bf66..374f85e6b3 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -84,11 +84,10 @@ type fileStore struct { hh hash.Hash64 qch chan struct{} cfs []*consumerFileStore - fsi map[string]seqSlice - fsis *simpleState + sips int closed bool fip bool - sips int + tms bool } // Represents a message store block and its data. @@ -103,10 +102,12 @@ type msgBlock struct { ifn string ifd *os.File liwsz int64 - index uint64 // User visible message count. + index uint64 bytes uint64 // User visible bytes count. rbytes uint64 // Total bytes (raw) including deleted. Used for rolling to new blk. - msgs uint64 + msgs uint64 // User visible message count. + fss map[string]*SimpleState + sfn string lwits int64 lwts int64 llts int64 @@ -170,6 +171,8 @@ const ( blkScan = "%d.blk" // used to scan index file names. indexScan = "%d.idx" + // used to load per subject meta information. + fssScan = "%d.fss" // This is where we keep state on consumers. consumerDir = "obs" // Index file for a consumer. @@ -270,23 +273,19 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim return nil, fmt.Errorf("could not create hash: %v", err) } + // Determine if we should be tracking multiple subjects etc. + fs.tms = !(len(cfg.Subjects) == 1 && subjectIsLiteral(cfg.Subjects[0])) + + // Raft usage of filestore has no subjects or mirrors or sources. No need to track. + if fs.tms && len(cfg.Subjects) == 0 && cfg.Mirror == nil && len(cfg.Sources) == 0 { + fs.tms = false + } + // Recover our message state. if err := fs.recoverMsgs(); err != nil { return nil, err } - // Check to see if we have lots of messages and existing consumers. - // If they could be filtered we should generate an index here. - const lowWaterMarkMsgs = 8192 - if fs.state.Msgs > lowWaterMarkMsgs { - // If we have one subject that is not a wildcard we can skip. - if !(len(cfg.Subjects) == 1 && subjectIsLiteral(cfg.Subjects[0])) { - if ofis, _ := ioutil.ReadDir(odir); len(ofis) > 0 { - fs.genFilterIndex() - } - } - } - // Write our meta data iff does not exist. meta := path.Join(fcfg.StoreDir, JetStreamMetaFile) if _, err := os.Stat(meta); err != nil && os.IsNotExist(err) { @@ -403,6 +402,7 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) *msgBlock { mdir := path.Join(fs.fcfg.StoreDir, msgDir) mb.mfn = path.Join(mdir, fi.Name()) mb.ifn = path.Join(mdir, fmt.Sprintf(indexScan, index)) + mb.sfn = path.Join(mdir, fmt.Sprintf(fssScan, index)) if mb.hh == nil { key := sha256.Sum256(fs.hashKeyForBlock(index)) @@ -422,22 +422,24 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) *msgBlock { } else { return nil } + // Grab last checksum from main block file. + var lchk [8]byte + file.ReadAt(lchk[:], fi.Size()-8) + file.Close() // Read our index file. Use this as source of truth if possible. if err := mb.readIndexInfo(); err == nil { // Quick sanity check here. // Note this only checks that the message blk file is not newer then this file. - var lchk [8]byte - file.ReadAt(lchk[:], fi.Size()-8) if bytes.Equal(lchk[:], mb.lchk[:]) { + if fs.tms { + mb.readPerSubjectInfo() + } fs.blks = append(fs.blks, mb) return mb } } - // Close here since we need to rebuild state. - file.Close() - // If we get data loss rebuilding the message block state record that with the fs itself. if ld, _ := mb.rebuildState(); ld != nil { fs.rebuildState(ld) @@ -492,7 +494,7 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) { startLastSeq := mb.last.seq // Clear state we need to rebuild. - mb.msgs, mb.bytes, mb.rbytes = 0, 0, 0 + mb.msgs, mb.bytes, mb.rbytes, mb.fss = 0, 0, 0, nil mb.last.seq, mb.last.ts = 0, 0 firstNeedsSet := true @@ -547,6 +549,11 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) { return &ld } + // Rebuild per subject info. + if mb.fs.tms { + mb.fss = make(map[string]*SimpleState) + } + for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; { if index+msgHdrSize >= lbuf { truncate(index) @@ -600,8 +607,8 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) { } if !deleted { + data := buf[index+msgHdrSize : index+rl] if hh := mb.hh; hh != nil { - data := buf[index+msgHdrSize : index+rl] hh.Reset() hh.Write(hdr[4:20]) hh.Write(data[:slen]) @@ -629,6 +636,17 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) { mb.msgs++ mb.bytes += uint64(rl) mb.rbytes += uint64(rl) + + // Do per subject info. + if mb.fss != nil { + subj := string(data[:slen]) + if ss := mb.fss[subj]; ss != nil { + ss.Msgs++ + ss.Last = seq + } else { + mb.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq} + } + } } index += rl @@ -738,106 +756,205 @@ func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 { return 0 } -type seqSlice []uint64 +// This will traverse a message block and generate the filtered pending. +func (mb *msgBlock) filteredPending(subj string, wc bool, seq uint64) (total, first, last uint64) { + mb.mu.Lock() + defer mb.mu.Unlock() -func (x seqSlice) Len() int { return len(x) } -func (x seqSlice) Less(i, j int) bool { return x[i] < x[j] } -func (x seqSlice) Swap(i, j int) { x[i], x[j] = x[j], x[i] } + if mb.fss == nil { + return 0, 0, 0 + } -func (x seqSlice) Search(n uint64) int { - return sort.Search(len(x), func(i int) bool { return x[i] >= n }) -} + subs := []string{subj} + // If we have a wildcard match against all tracked subjects we know about. + if wc { + subs = subs[:0] + for fsubj := range mb.fss { + if subjectIsSubsetMatch(fsubj, subj) { + subs = append(subs, fsubj) + } + } + } + // If we load the cache for a linear scan we want to expire that cache upon exit. + var shouldExpire bool -type simpleState struct { - msgs, first, last uint64 -} + update := func(ss *SimpleState) { + total += ss.Msgs + if first == 0 || ss.First < first { + first = ss.First + } + if ss.Last > last { + last = ss.Last + } + } -// This will generate an index for us on startup to determine num pending for -// filtered consumers easier. -func (fs *fileStore) genFilterIndex() { - fs.mu.Lock() - defer fs.mu.Unlock() + for i, subj := range subs { + // If the starting seq is less then or equal that means we want all and we do not need to load any messages. + ss := mb.fss[subj] + if ss == nil { + continue + } - fsi := make(map[string]seqSlice) + // If the seq we are starting at is less then the simple state's first sequence we can just return the total msgs. + if seq <= ss.First { + update(ss) + continue + } - for _, mb := range fs.blks { - mb.loadMsgs() - mb.mu.Lock() - fseq, lseq := mb.first.seq, mb.last.seq - for seq := fseq; seq <= lseq; seq++ { - if sm, err := mb.cacheLookupWithLock(seq); sm != nil && err == nil { - fsi[sm.subj] = append(fsi[sm.subj], seq) + // We may need to scan this one block since we have a partial set to consider. + // If we are all inclusive then we can do simple math and avoid the scan. + if allInclusive := ss.Msgs == ss.Last-ss.First+1; allInclusive { + update(ss) + // Make sure to compensate for the diff from the head. + if seq > ss.First { + first, total = seq, total-(seq-ss.First) } + continue } - // Expire this cache before moving on. - mb.llts = 0 - mb.expireCacheLocked() - mb.mu.Unlock() - } - fs.fsi = fsi - fs.fsis = &simpleState{fs.state.Msgs, fs.state.FirstSeq, fs.state.LastSeq} -} + // We need to scan this block to compute the correct number of pending for this block. + // We want to only do this once so we will adjust subs and test against them all here. -// Clears out the filter index. -func (fs *fileStore) clearFilterIndex() { - fs.mu.Lock() - fs.fsi, fs.fsis = nil, nil - fs.mu.Unlock() -} + if !mb.cacheAlreadyLoaded() { + mb.loadMsgsWithLock() + shouldExpire = true + } -// Fetch our num filtered pending from our index. -// Lock should be held. -func (fs *fileStore) getNumFilteredPendingFromIndex(sseq uint64, subj string) (uint64, error) { - cstate := simpleState{fs.state.Msgs, fs.state.FirstSeq, fs.state.LastSeq} - if fs.fsis == nil || *fs.fsis != cstate { - fs.fsi, fs.fsis = nil, nil - return 0, errors.New("state changed, index not valid") - } - var total uint64 - for tsubj, seqs := range fs.fsi { - if subjectIsSubsetMatch(tsubj, subj) { - total += uint64(len(seqs[seqs.Search(sseq):])) + subs = subs[i:] + var all, lseq uint64 + // Grab last applicable sequence as a union of all applicable subjects. + for _, subj := range subs { + if ss := mb.fss[subj]; ss != nil { + all += ss.Msgs + if ss.Last > lseq { + lseq = ss.Last + } + } + } + numScanIn, numScanOut := lseq-seq, seq-mb.first.seq + + isMatch := func(seq uint64) bool { + if sm, _ := mb.cacheLookupWithLock(seq); sm != nil { + if len(subs) == 1 && sm.subj == subs[0] { + return true + } + for _, subj := range subs { + if sm.subj == subj { + return true + } + } + } + return false + } + + // Decide on whether to scan those included or those excluded based on which scan amount is less. + if numScanIn < numScanOut { + for tseq := seq; tseq <= lseq; tseq++ { + if isMatch(tseq) { + total++ + if first == 0 || tseq < first { + first = tseq + } + last = tseq + } + } + } else { + // Here its more efficient to scan the out nodes. + var discard uint64 + for tseq := mb.first.seq; tseq < seq; tseq++ { + if isMatch(tseq) { + discard++ + } + } + total += (all - discard) + // Now make sure we match our first + for tseq := seq; tseq <= lseq; tseq++ { + if isMatch(tseq) { + first = tseq + break + } + } } + // We can bail since we scanned all remaining in this pass. + break + } + + // If we loaded this block for this operation go ahead and expire it here. + if shouldExpire { + mb.expireCacheLocked() } - return total, nil + + return total, first, last } -// Returns number of messages matching the subject starting at sequence sseq. -func (fs *fileStore) NumFilteredPending(sseq uint64, subj string) (total uint64) { +// FilteredState will return the SimpleState associated with the filtered subject and a proposed starting sequence. +func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState { fs.mu.RLock() lseq := fs.state.LastSeq if sseq < fs.state.FirstSeq { sseq = fs.state.FirstSeq } - if fs.fsi != nil { - if np, err := fs.getNumFilteredPendingFromIndex(sseq, subj); err == nil { - fs.mu.RUnlock() - return np - } - } fs.mu.RUnlock() - if subj == _EMPTY_ { - if sseq <= lseq { - return lseq - sseq + var ss SimpleState + + // If past the end no results. + if sseq > lseq { + return ss + } + + // If subj is empty or we are not tracking multiple subjects. + if subj == _EMPTY_ || subj == ">" || !fs.tms { + total := lseq - sseq + 1 + if state := fs.State(); len(state.Deleted) > 0 { + for _, dseq := range state.Deleted { + if dseq >= sseq && dseq <= lseq { + total-- + } + } } - return 0 + ss.Msgs, ss.First, ss.Last = total, sseq, lseq + return ss } - var eq func(string, string) bool - if subjectHasWildcard(subj) { - eq = subjectIsSubsetMatch + wc := subjectHasWildcard(subj) + // Are we tracking multiple subject states? + if fs.tms { + for _, mb := range fs.blks { + // Skip blocks that are less than our starting sequence. + if sseq > atomic.LoadUint64(&mb.last.seq) { + continue + } + t, f, l := mb.filteredPending(subj, wc, sseq) + ss.Msgs += t + if ss.First == 0 || (f > 0 && f < ss.First) { + ss.First = f + } + if l > ss.Last { + ss.Last = l + } + } } else { - eq = func(a, b string) bool { return a == b } - } + // Fallback to linear scan. + var eq func(string, string) bool + if wc { + eq = subjectIsSubsetMatch + } else { + eq = func(a, b string) bool { return a == b } + } - for seq := sseq; seq <= lseq; seq++ { - if sm, _ := fs.msgForSeq(seq); sm != nil && eq(sm.subj, subj) { - total++ + for seq := sseq; seq <= lseq; seq++ { + if sm, _ := fs.msgForSeq(seq); sm != nil && eq(sm.subj, subj) { + ss.Msgs++ + if ss.First == 0 { + ss.First = seq + } + ss.Last = seq + } } } - return total + + return ss } // RegisterStorageUpdates registers a callback for updates to storage changes. @@ -905,6 +1022,9 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { // Lock should be held to quiet race detector. mb.mu.Lock() mb.setupWriteCache(mbuf) + if fs.tms { + mb.fss = make(map[string]*SimpleState) + } mb.mu.Unlock() // Now do local hash. @@ -932,6 +1052,9 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { } mb.ifd = ifd + // For subject based info. + mb.sfn = path.Join(mdir, fmt.Sprintf(fssScan, mb.index)) + // Set cache time to creation time to start. ts := time.Now().UnixNano() // Race detector wants these protected. @@ -1033,11 +1156,6 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in fs.startAgeChk() } - // If we had an index cache wipe that out. - if fs.fsi != nil { - fs.fsi, fs.fsis = nil, nil - } - return nil } @@ -1263,15 +1381,23 @@ func (fs *fileStore) removeMsg(seq uint64, secure bool) (bool, error) { fs.state.Msgs-- fs.state.Bytes -= msz - // If we had an index cache wipe that out. - if fs.fsi != nil { - fs.fsi, fs.fsis = nil, nil - } - // Now local mb updates. mb.msgs-- mb.bytes -= msz + // If we are tracking multiple subjects here make sure we update that accounting. + if mb.fss != nil { + if sm == nil { + if !mb.cacheAlreadyLoaded() { + mb.loadMsgsWithLock() + } + sm, _ = mb.cacheLookupWithLock(seq) + } + if sm != nil { + mb.removeSeqPerSubject(sm.subj, seq) + } + } + var shouldWriteIndex, firstSeqNeedsUpdate bool if secure { @@ -1920,11 +2046,21 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte // Set cache timestamp for last store. mb.lwts = ts // Decide if we write index info if flushing in place. - writeIndex := ts-mb.lwits > int64(time.Second) + writeIndex := ts-mb.lwits > int64(2*time.Second) // Accounting mb.updateAccounting(seq, ts, rl) + // Check if we are tracking per subject for our simple state. + if len(subj) > 0 && mb.fss != nil { + if ss := mb.fss[subj]; ss != nil { + ss.Msgs++ + ss.Last = seq + } else { + mb.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq} + } + } + fch, werr := mb.fch, mb.werr mb.mu.Unlock() @@ -2359,7 +2495,15 @@ func (mb *msgBlock) loadMsgs() error { // We hold the lock here the whole time by design. mb.mu.Lock() defer mb.mu.Unlock() + return mb.loadMsgsWithLock() +} +// Lock should be held. +func (mb *msgBlock) cacheAlreadyLoaded() bool { + return mb.cache != nil && len(mb.cache.idx) == int(mb.msgs) && mb.cache.off == 0 && len(mb.cache.buf) > 0 +} + +func (mb *msgBlock) loadMsgsWithLock() error { // Check to see if we are loading already. if mb.loading { return nil @@ -2378,7 +2522,7 @@ checkCache: } // Check to see if we have a full cache. - if mb.cache != nil && len(mb.cache.idx) == int(mb.msgs) && mb.cache.off == 0 && len(mb.cache.buf) > 0 { + if mb.cacheAlreadyLoaded() { return nil } @@ -2720,7 +2864,7 @@ func fileStoreMsgSizeEstimate(slen, maxPayload int) uint64 { // Write index info to the appropriate file. func (mb *msgBlock) writeIndexInfo() error { - // HEADER: magic version msgs bytes fseq fts lseq lts checksum + // HEADER: magic version msgs bytes fseq fts lseq lts ndel checksum var hdr [indexHdrSize]byte // Write header @@ -3201,6 +3345,156 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) { } } +// Remove a seq from the fss and select new first. +// Lock should be held. +func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) { + ss := mb.fss[subj] + if ss == nil { + return + } + if ss.Msgs == 1 { + delete(mb.fss, subj) + return + } + + ss.Msgs-- + if seq != ss.First { + return + } + // TODO(dlc) - Might want to optimize this. + for tseq := seq + 1; tseq < ss.Last; tseq++ { + if sm, _ := mb.cacheLookupWithLock(tseq); sm != nil { + if sm.subj == subj { + ss.First = tseq + return + } + } + } +} + +// generatePerSubjectInfo will generate the per subject info via the raw msg block. +func (mb *msgBlock) generatePerSubjectInfo() error { + var shouldExpire bool + + mb.mu.Lock() + defer mb.mu.Unlock() + + if !mb.cacheAlreadyLoaded() { + mb.loadMsgsWithLock() + shouldExpire = true + } + if mb.fss == nil { + mb.fss = make(map[string]*SimpleState) + } + fseq, lseq := mb.first.seq, mb.last.seq + for seq := fseq; seq <= lseq; seq++ { + if sm, _ := mb.cacheLookupWithLock(seq); sm != nil { + if ss := mb.fss[sm.subj]; ss != nil { + ss.Msgs++ + ss.Last = seq + } else { + mb.fss[sm.subj] = &SimpleState{Msgs: 1, First: seq, Last: seq} + } + } + } + if shouldExpire { + // Expire this cache before moving on. + mb.llts = 0 + mb.expireCacheLocked() + } + return nil +} + +// readPerSubjectInfo will attempt to restore the per subject information. +func (mb *msgBlock) readPerSubjectInfo() error { + // Remove after processing regardless. + defer os.Remove(mb.sfn) + + const ( + fileHashIndex = 16 + mbHashIndex = 8 + minFileSize = 24 + ) + + buf, err := ioutil.ReadFile(mb.sfn) + if err != nil || len(buf) < minFileSize { + return mb.generatePerSubjectInfo() + } + + if err := checkHeader(buf); err != nil { + return mb.generatePerSubjectInfo() + } + + // Check that we did not have any bit flips. + mb.hh.Reset() + mb.hh.Write(buf[0 : len(buf)-fileHashIndex]) + fhash := buf[len(buf)-fileHashIndex : len(buf)-mbHashIndex] + if checksum := mb.hh.Sum(nil); !bytes.Equal(checksum, fhash) { + return mb.generatePerSubjectInfo() + } + + if !bytes.Equal(buf[len(buf)-mbHashIndex:], mb.lchk[:]) { + return mb.generatePerSubjectInfo() + } + + fss := make(map[string]*SimpleState) + + bi := hdrLen + readU64 := func() uint64 { + if bi < 0 { + return 0 + } + num, n := binary.Uvarint(buf[bi:]) + if n <= 0 { + bi = -1 + return 0 + } + bi += n + return num + } + + for i, numEntries := uint64(0), readU64(); i < numEntries; i++ { + lsubj := readU64() + subj := buf[bi : bi+int(lsubj)] + bi += int(lsubj) + msgs, first, last := readU64(), readU64(), readU64() + fss[string(subj)] = &SimpleState{Msgs: msgs, First: first, Last: last} + } + mb.mu.Lock() + mb.fss = fss + mb.mu.Unlock() + return nil +} + +// writePerSubjectInfo will write out per subject information if we are tracking per subject. +// Lock should be held. +func (mb *msgBlock) writePerSubjectInfo() error { + var scratch [4 * binary.MaxVarintLen64]byte + var b bytes.Buffer + b.WriteByte(magic) + b.WriteByte(version) + n := binary.PutUvarint(scratch[0:], uint64(len(mb.fss))) + b.Write(scratch[0:n]) + for subj, ss := range mb.fss { + n := binary.PutUvarint(scratch[0:], uint64(len(subj))) + b.Write(scratch[0:n]) + b.WriteString(subj) + // Encode all three parts of our simple state into same scratch buffer. + n = binary.PutUvarint(scratch[0:], ss.Msgs) + n += binary.PutUvarint(scratch[n:], ss.First) + n += binary.PutUvarint(scratch[n:], ss.Last) + b.Write(scratch[0:n]) + } + // Calculate hash for this information. + mb.hh.Reset() + mb.hh.Write(b.Bytes()) + b.Write(mb.hh.Sum(nil)) + // Now copy over checksum from the block itself, this allows us to know if we are in sync. + b.Write(mb.lchk[:]) + + return ioutil.WriteFile(mb.sfn, b.Bytes(), defaultFilePerms) +} + func (mb *msgBlock) close(sync bool) { if mb == nil { return @@ -3213,6 +3507,11 @@ func (mb *msgBlock) close(sync bool) { } mb.closed = true + // Check if we are tracking by subject. + if mb.fss != nil { + mb.writePerSubjectInfo() + } + // Close cache mb.clearCacheAndOffset() // Quit our loops. diff --git a/server/jetstream.go b/server/jetstream.go index 3ae418fbbc..2e2315d031 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1043,7 +1043,6 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { s.Warnf(" Error restoring Consumer state: %v", err) } } - mset.clearFilterIndex() } // Make sure to cleanup and old remaining snapshots. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index f985625e20..e4bc13e7fc 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2848,7 +2848,6 @@ func (o *consumer) processReplicatedAck(dseq, sseq uint64) { o.store.UpdateAcks(dseq, sseq) o.mu.RLock() - mset := o.mset if mset == nil || mset.cfg.Retention == LimitsPolicy { o.mu.RUnlock() @@ -4457,7 +4456,7 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) { for _, o := range mset.consumers { o.mu.Lock() if o.isLeader() { - o.setInitialPending() + o.setInitialPendingAndStart() } o.mu.Unlock() } diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 2ecde5828e..f6a2870d2e 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -2087,7 +2087,7 @@ func TestJetStreamClusterWorkQueueRetention(t *testing.T) { } // Make sure the messages are removed. - checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { si, err := js.StreamInfo("FOO") if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -4790,6 +4790,10 @@ func TestJetStreamClusterMirrorAndSourcesFilteredConsumers(t *testing.T) { expectFail("M", "baz.1.2") expectFail("M", "apple") + // Make sure wider scoped subjects work as well. + createConsumer("M", "*") + createConsumer("M", ">") + // Now do some sources. if _, err := js.AddStream(&nats.StreamConfig{Name: "O1", Subjects: []string{"foo.*"}}); err != nil { t.Fatalf("Unexpected error: %v", err) @@ -7975,7 +7979,7 @@ func jsClientConnect(t *testing.T, s *Server, opts ...nats.Option) (*nats.Conn, func checkSubsPending(t *testing.T, sub *nats.Subscription, numExpected int) { t.Helper() - checkFor(t, 5*time.Second, 20*time.Millisecond, func() error { + checkFor(t, 10*time.Second, 20*time.Millisecond, func() error { if nmsgs, _, err := sub.Pending(); err != nil || nmsgs != numExpected { return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, numExpected) } diff --git a/server/jetstream_test.go b/server/jetstream_test.go index be8d1f25c6..80acbd4dc2 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -2310,9 +2310,6 @@ func TestJetStreamWorkQueueRetentionStream(t *testing.T) { defer o2.delete() // Anything that would overlap should fail though. - if _, err := mset.addConsumer(pConfig(">")); err == nil { - t.Fatalf("Expected an error on attempt for partitioned consumer for a workqueue") - } if _, err := mset.addConsumer(pConfig("MY_WORK_QUEUE.A")); err == nil { t.Fatalf("Expected an error on attempt for partitioned consumer for a workqueue") } @@ -10790,6 +10787,62 @@ func TestJetStreamConfigReloadWithGlobalAccount(t *testing.T) { checkJSAccount() } +func TestJetStreamFilteredConsumersWithWiderFilter(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + + // Client for API requests. + nc, js := jsClientConnect(t, s) + defer nc.Close() + + // Origin + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar", "baz", "N.*"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Add in some messages. + js.Publish("foo", []byte("OK")) + js.Publish("bar", []byte("OK")) + js.Publish("baz", []byte("OK")) + for i := 0; i < 12; i++ { + js.Publish(fmt.Sprintf("N.%d", i+1), []byte("OK")) + } + + checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { + si, err := js.StreamInfo("TEST") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.State.Msgs != 15 { + return fmt.Errorf("Expected 15 msgs, got state: %+v", si.State) + } + return nil + }) + + checkWider := func(subj string, numExpected int) { + sub, err := js.SubscribeSync(subj) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + checkSubsPending(t, sub, numExpected) + } + + checkWider("*", 3) + checkWider("N.*", 12) + checkWider("*.*", 12) + checkWider("N.>", 12) + checkWider(">", 15) +} + func TestJetStreamMirrorAndSourcesFilteredConsumers(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown() diff --git a/server/memstore.go b/server/memstore.go index bd63a3a85f..e79e5be20a 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -222,35 +222,36 @@ func (ms *memStore) GetSeqFromTime(t time.Time) uint64 { return uint64(index) + ms.state.FirstSeq } -// Returns number of messages matching the subject starting at sequence sseq. -func (ms *memStore) NumFilteredPending(sseq uint64, subj string) (total uint64) { +// FilteredState will return the SimpleState associated with the filtered subject and a proposed starting sequence. +func (ms *memStore) FilteredState(sseq uint64, subj string) SimpleState { ms.mu.RLock() defer ms.mu.RUnlock() + var ss SimpleState + if sseq < ms.state.FirstSeq { sseq = ms.state.FirstSeq } - if subj == _EMPTY_ { - if sseq <= ms.state.LastSeq { - return ms.state.LastSeq - sseq - } - return 0 - } - + // FIXME(dlc) - Optimize like filestore. var eq func(string, string) bool - if subjectHasWildcard(subj) { + if subj == _EMPTY_ { + eq = func(a, b string) bool { return true } + } else if subjectHasWildcard(subj) { eq = subjectIsSubsetMatch } else { eq = func(a, b string) bool { return a == b } } - for seq := sseq; seq <= ms.state.LastSeq; seq++ { if sm, ok := ms.msgs[seq]; ok && eq(sm.subj, subj) { - total++ + ss.Msgs++ + if ss.First == 0 { + ss.First = seq + } + ss.Last = seq } } - return total + return ss } // Will check the msg limit and drop firstSeq msg if needed. diff --git a/server/norace_test.go b/server/norace_test.go index bb5f021fe7..5f642ad892 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -2000,3 +2000,154 @@ func TestNoRaceJetStreamClusterSuperClusterRIPStress(t *testing.T) { time.Sleep(10 * time.Millisecond) } } + +func TestJetStreamSlowFilteredInititalPendingAndFirstMsg(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + + // Create directly here to force multiple blocks, etc. + a, err := s.LookupAccount("$G") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + mset, err := a.addStreamWithStore( + &StreamConfig{ + Name: "S", + Subjects: []string{"foo", "bar", "baz", "foo.bar.baz", "foo.*"}, + }, + &FileStoreConfig{ + BlockSize: 4 * 1024 * 1024, + AsyncFlush: true, + }, + ) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + toSend := 100_000 // 500k total though. + + // Messages will be 'foo' 'bar' 'baz' repeated 100k times. + // Then 'foo.bar.baz' all contigous for 100k. + // Then foo.N for 1-100000 + for i := 0; i < toSend; i++ { + js.PublishAsync("foo", []byte("HELLO")) + js.PublishAsync("bar", []byte("WORLD")) + js.PublishAsync("baz", []byte("AGAIN")) + } + // Make contiguous block of same subject. + for i := 0; i < toSend; i++ { + js.PublishAsync("foo.bar.baz", []byte("ALL-TOGETHER")) + } + // Now add some more at the end. + for i := 0; i < toSend; i++ { + js.PublishAsync(fmt.Sprintf("foo.%d", i+1), []byte("LATER")) + } + + checkFor(t, 10*time.Second, 250*time.Millisecond, func() error { + si, err := js.StreamInfo("S") + if err != nil { + return err + } + if si.State.Msgs != uint64(5*toSend) { + return fmt.Errorf("Expected %d msgs, got %d", 5*toSend, si.State.Msgs) + } + return nil + }) + + // Threshold for taking too long. + const thresh = 20 * time.Millisecond + + var dindex int + testConsumerCreate := func(subj string, startSeq, expectedNumPending uint64) { + t.Helper() + dindex++ + dname := fmt.Sprintf("dur-%d", dindex) + cfg := ConsumerConfig{FilterSubject: subj, Durable: dname, AckPolicy: AckExplicit} + if startSeq > 1 { + cfg.OptStartSeq, cfg.DeliverPolicy = startSeq, DeliverByStartSequence + } + start := time.Now() + o, err := mset.addConsumer(&cfg) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if delta := time.Since(start); delta > thresh { + t.Fatalf("Creating consumer for %q and start: %d took too long: %v", subj, startSeq, delta) + } + if ci := o.info(); ci.NumPending != expectedNumPending { + t.Fatalf("Expected NumPending of %d, got %d", expectedNumPending, ci.NumPending) + } + } + + testConsumerCreate("foo.100000", 1, 1) + testConsumerCreate("foo.100000", 222_000, 1) + testConsumerCreate("foo", 1, 100_000) + testConsumerCreate("foo", 4, 100_000-1) + testConsumerCreate("foo.bar.baz", 1, 100_000) + testConsumerCreate("foo.bar.baz", 350_001, 50_000) + testConsumerCreate("*", 1, 300_000) + testConsumerCreate("*", 4, 300_000-3) + testConsumerCreate(">", 1, 500_000) + testConsumerCreate(">", 50_000, 500_000-50_000+1) + testConsumerCreate("foo.10", 1, 1) + + // Also test that we do not take long if the start sequence is later in the stream. + sub, err := js.PullSubscribe("foo.100000", "dlc") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + start := time.Now() + fetchMsgs(t, sub, 1, time.Second) + if delta := time.Since(start); delta > thresh { + t.Fatalf("Took too long for pull subscriber to fetch the message: %v", delta) + } + + // Now do some deletes and make sure these are handled correctly. + // Delete 3 foo messages. + mset.removeMsg(1) + mset.removeMsg(4) + mset.removeMsg(7) + testConsumerCreate("foo", 1, 100_000-3) + + // Make sure wider scoped subjects do the right thing from a pending perspective. + o, err := mset.addConsumer(&ConsumerConfig{FilterSubject: ">", Durable: "cat", AckPolicy: AckExplicit}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + ci, expected := o.info(), uint64(500_000-3) + if ci.NumPending != expected { + t.Fatalf("Expected NumPending of %d, got %d", expected, ci.NumPending) + } + // Send another and make sure its captured by our wide scope consumer. + js.Publish("foo", []byte("HELLO AGAIN")) + if ci = o.info(); ci.NumPending != expected+1 { + t.Fatalf("Expected the consumer to recognize the wide scoped consumer, wanted pending of %d, got %d", expected+1, ci.NumPending) + } + + // Stop current server and test restart.. + sd := s.JetStreamConfig().StoreDir + s.Shutdown() + // Restart. + s = RunJetStreamServerOnPort(-1, sd) + defer s.Shutdown() + + a, err = s.LookupAccount("$G") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + mset, err = a.lookupStream("S") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Make sure we recovered our per subject state on restart. + testConsumerCreate("foo.100000", 1, 1) + testConsumerCreate("foo", 1, 100_000-2) +} diff --git a/server/store.go b/server/store.go index 623d33a99c..9dd4a8c6cd 100644 --- a/server/store.go +++ b/server/store.go @@ -75,7 +75,7 @@ type StreamStore interface { Compact(seq uint64) (uint64, error) Truncate(seq uint64) error GetSeqFromTime(t time.Time) uint64 - NumFilteredPending(sseq uint64, subject string) uint64 + FilteredState(sseq uint64, subject string) SimpleState State() StreamState FastState(*StreamState) Type() StorageType @@ -125,6 +125,13 @@ type StreamState struct { Consumers int `json:"consumer_count"` } +// SimpleState for filtered subject specific state. +type SimpleState struct { + Msgs uint64 `json:"messages"` + First uint64 `json:"first_seq"` + Last uint64 `json:"last_seq"` +} + // LostStreamData indicates msgs that have been lost. type LostStreamData struct { Msgs []uint64 `json:"msgs"` diff --git a/server/stream.go b/server/stream.go index ae6a0d47c4..61b8635d37 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2322,16 +2322,6 @@ func (mset *stream) setupStore(fsCfg *FileStoreConfig) error { return nil } -// Clears out any filtered index from filestores. -func (mset *stream) clearFilterIndex() { - mset.mu.Lock() - defer mset.mu.Unlock() - - if fs, ok := mset.store.(*fileStore); ok { - fs.clearFilterIndex() - } -} - // Called for any updates to the underlying stream. We pass through the bytes to the // jetstream account. We do local processing for stream pending for consumers, but only // for removals. From d0ac1a40ca5a39b247fed5243d11fd49709057b5 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 15 Jun 2021 06:36:34 -0700 Subject: [PATCH 2/4] Added in per subject limits for streams. Signed-off-by: Derek Collison --- server/filestore.go | 91 +++++++++++++++++++++++++++++++++------- server/jetstream_test.go | 76 +++++++++++++++++++++++++++++++++ server/memstore.go | 83 +++++++++++++++++++++++++++++++++++- server/store.go | 2 + server/stream.go | 3 +- 5 files changed, 238 insertions(+), 17 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 374f85e6b3..8390ec99ce 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -957,6 +957,26 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState { return ss } +// Will gather complete filtered state for the subject. +// Lock should be held. +func (fs *fileStore) perSubjectState(subj string) (total, first, last uint64) { + if !fs.tms { + return + } + wc := subjectHasWildcard(subj) + for _, mb := range fs.blks { + t, f, l := mb.filteredPending(subj, wc, 1) + total += t + if first == 0 || (f > 0 && f < first) { + first = f + } + if l > last { + last = l + } + } + return total, first, last +} + // RegisterStorageUpdates registers a callback for updates to storage changes. // It will present number of messages and bytes as a signed integer and an // optional sequence number of the message if a single. @@ -1117,6 +1137,11 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in if fs.cfg.MaxBytes > 0 && fs.state.Bytes+uint64(len(msg)+len(hdr)) >= uint64(fs.cfg.MaxBytes) { return ErrMaxBytes } + if fs.cfg.MaxMsgsPer > 0 && len(subj) > 0 { + if msgs, _, _ := fs.perSubjectState(subj); msgs >= uint64(fs.cfg.MaxMsgsPer) { + return ErrMaxMsgsPerSubject + } + } } // Check sequence. @@ -1145,6 +1170,11 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in fs.state.LastSeq = seq fs.state.LastTime = now + // Enforce per message limits. + if fs.cfg.MaxMsgsPer > 0 && len(subj) > 0 { + fs.enforcePerSubjectLimit(subj) + } + // Limits checks and enforcement. // If they do any deletions they will update the // byte count on their own, so no need to compensate. @@ -1262,6 +1292,23 @@ func (fs *fileStore) rebuildFirst() { } } +// Will check the msg limit for this tracked subject. +// Lock should be held. +func (fs *fileStore) enforcePerSubjectLimit(subj string) { + if fs.closed || fs.sips > 0 || fs.cfg.MaxMsgsPer < 0 || !fs.tms { + return + } + for { + msgs, first, _ := fs.perSubjectState(subj) + if msgs <= uint64(fs.cfg.MaxMsgsPer) { + return + } + if ok, _ := fs.removeMsg(first, false, false); !ok { + break + } + } +} + // Will check the msg limit and drop firstSeq msg if needed. // Lock should be held. func (fs *fileStore) enforceMsgLimit() { @@ -1294,29 +1341,40 @@ func (fs *fileStore) enforceBytesLimit() { func (fs *fileStore) deleteFirstMsg() (bool, error) { fs.mu.Unlock() defer fs.mu.Lock() - return fs.removeMsg(fs.state.FirstSeq, false) + return fs.removeMsg(fs.state.FirstSeq, false, true) } // RemoveMsg will remove the message from this store. // Will return the number of bytes removed. func (fs *fileStore) RemoveMsg(seq uint64) (bool, error) { - return fs.removeMsg(seq, false) + return fs.removeMsg(seq, false, true) } func (fs *fileStore) EraseMsg(seq uint64) (bool, error) { - return fs.removeMsg(seq, true) + return fs.removeMsg(seq, true, true) } // Remove a message, optionally rewriting the mb file. -func (fs *fileStore) removeMsg(seq uint64, secure bool) (bool, error) { - fs.mu.Lock() +func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error) { + fsLock := func() { + if needFSLock { + fs.mu.Lock() + } + } + fsUnlock := func() { + if needFSLock { + fs.mu.Unlock() + } + } + + fsLock() if fs.closed { - fs.mu.Unlock() + fsUnlock() return false, ErrStoreClosed } if fs.sips > 0 { - fs.mu.Unlock() + fsUnlock() return false, ErrStoreSnapshotInProgress } @@ -1326,7 +1384,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure bool) (bool, error) { if seq <= fs.state.LastSeq { err = ErrStoreMsgNotFound } - fs.mu.Unlock() + fsUnlock() return false, err } @@ -1345,18 +1403,18 @@ func (fs *fileStore) removeMsg(seq uint64, secure bool) (bool, error) { // Check cache. This should be very rare. if mb.cache == nil || mb.cache.idx == nil || seq < mb.cache.fseq && mb.cache.off > 0 { mb.mu.Unlock() - fs.mu.Unlock() + fsUnlock() if err := mb.loadMsgs(); err != nil { return false, err } - fs.mu.Lock() + fsLock() mb.mu.Lock() } // See if the sequence numbers is still relevant. Check first and cache first. if seq < mb.first.seq || seq < mb.cache.fseq || (seq-mb.cache.fseq) >= uint64(len(mb.cache.idx)) { mb.mu.Unlock() - fs.mu.Unlock() + fsUnlock() return false, nil } @@ -1364,7 +1422,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure bool) (bool, error) { if mb.dmap != nil { if _, ok := mb.dmap[seq]; ok { mb.mu.Unlock() - fs.mu.Unlock() + fsUnlock() return false, nil } } @@ -1428,8 +1486,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure bool) (bool, error) { var qch, fch chan struct{} if shouldWriteIndex { - qch = mb.qch - fch = mb.fch + qch, fch = mb.qch, mb.fch } cb := fs.scb mb.mu.Unlock() @@ -1470,6 +1527,10 @@ func (fs *fileStore) removeMsg(seq uint64, secure bool) (bool, error) { cb(-1, -delta, seq, subj) } + if !needFSLock { + fs.mu.Lock() + } + return true, nil } @@ -1937,7 +1998,7 @@ func (fs *fileStore) expireMsgs() { var sm *fileStoredMsg minAge := time.Now().UnixNano() - int64(fs.cfg.MaxAge) for sm, _ = fs.msgForSeq(0); sm != nil && sm.ts <= minAge; sm, _ = fs.msgForSeq(0) { - fs.removeMsg(sm.seq, false) + fs.removeMsg(sm.seq, false, true) } fs.mu.Lock() diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 80acbd4dc2..a0bd7b85d2 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -10787,6 +10787,82 @@ func TestJetStreamConfigReloadWithGlobalAccount(t *testing.T) { checkJSAccount() } +// Test that we properly enfore per subject msg limits. +func TestJetStreamMaxMsgsPerSubject(t *testing.T) { + const maxPer = 5 + msc := StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar", "baz.*"}, + Storage: MemoryStorage, + MaxMsgsPer: maxPer, + } + fsc := msc + fsc.Storage = FileStorage + + cases := []struct { + name string + mconfig *StreamConfig + }{ + {"MemoryStore", &msc}, + {"FileStore", &fsc}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + + mset, err := s.GlobalAccount().addStream(c.mconfig) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + defer mset.delete() + + // Client for API requests. + nc, js := jsClientConnect(t, s) + defer nc.Close() + + pubAndCheck := func(subj string, num int, expectedNumMsgs uint64) { + t.Helper() + for i := 0; i < num; i++ { + if _, err = js.Publish(subj, []byte("TSLA")); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + } + si, err := js.StreamInfo("TEST") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.State.Msgs != expectedNumMsgs { + t.Fatalf("Expected %d msgs, got %d", expectedNumMsgs, si.State.Msgs) + } + } + + pubAndCheck("foo", 1, 1) + pubAndCheck("foo", 4, 5) + // Now make sure our per subject limits kick in.. + pubAndCheck("foo", 2, 5) + pubAndCheck("baz.22", 5, 10) + pubAndCheck("baz.33", 5, 15) + // We are maxed so totals should be same no matter what we add here. + pubAndCheck("baz.22", 5, 15) + pubAndCheck("baz.33", 5, 15) + + // Now purge and make sure all is still good. + mset.purge() + pubAndCheck("foo", 1, 1) + pubAndCheck("foo", 4, 5) + pubAndCheck("baz.22", 5, 10) + pubAndCheck("baz.33", 5, 15) + }) + } + +} + func TestJetStreamFilteredConsumersWithWiderFilter(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown() diff --git a/server/memstore.go b/server/memstore.go index e79e5be20a..4d0063e859 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -28,6 +28,8 @@ type memStore struct { state StreamState msgs map[uint64]*storedMsg dmap map[uint64]struct{} + fss map[string]*SimpleState + maxp int64 scb StorageUpdateHandler ageChk *time.Timer consumers int @@ -48,7 +50,15 @@ func newMemStore(cfg *StreamConfig) (*memStore, error) { if cfg.Storage != MemoryStorage { return nil, fmt.Errorf("memStore requires memory storage type in config") } - return &memStore{msgs: make(map[uint64]*storedMsg), dmap: make(map[uint64]struct{}), cfg: *cfg}, nil + ms := &memStore{ + msgs: make(map[uint64]*storedMsg), + fss: make(map[string]*SimpleState), + dmap: make(map[uint64]struct{}), + maxp: cfg.MaxMsgsPer, + cfg: *cfg, + } + + return ms, nil } func (ms *memStore) UpdateConfig(cfg *StreamConfig) error { @@ -95,6 +105,11 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int if ms.cfg.MaxBytes > 0 && ms.state.Bytes+uint64(len(msg)+len(hdr)) >= uint64(ms.cfg.MaxBytes) { return ErrMaxBytes } + if ms.maxp > 0 && len(subj) > 0 { + if ss := ms.fss[subj]; ss != nil && ss.Msgs >= uint64(ms.maxp) { + return ErrMaxMsgsPerSubject + } + } } if seq != ms.state.LastSeq+1 { @@ -126,6 +141,20 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int ms.state.LastSeq = seq ms.state.LastTime = now + // Track per subject. + if len(subj) > 0 { + if ss := ms.fss[subj]; ss != nil { + ss.Msgs++ + ss.Last = seq + // Check per subject limits. + if ms.maxp > 0 && ss.Msgs > uint64(ms.maxp) { + ms.enforcePerSubjectLimit(ss) + } + } else { + ms.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq} + } + } + // Limits checks and enforcement. ms.enforceMsgLimit() ms.enforceBytesLimit() @@ -233,6 +262,17 @@ func (ms *memStore) FilteredState(sseq uint64, subj string) SimpleState { sseq = ms.state.FirstSeq } + // If past the end no results. + if sseq > ms.state.LastSeq { + return ss + } + + // If we want everything. + if subj == _EMPTY_ || subj == ">" { + ss.Msgs, ss.First, ss.Last = ms.state.Msgs, ms.state.FirstSeq, ms.state.LastSeq + return ss + } + // FIXME(dlc) - Optimize like filestore. var eq func(string, string) bool if subj == _EMPTY_ { @@ -254,6 +294,19 @@ func (ms *memStore) FilteredState(sseq uint64, subj string) SimpleState { return ss } +// Will check the msg limit for this tracked subject. +// Lock should be held. +func (ms *memStore) enforcePerSubjectLimit(ss *SimpleState) { + if ms.maxp <= 0 { + return + } + for nmsgs := ss.Msgs; nmsgs > uint64(ms.maxp); nmsgs = ss.Msgs { + if !ms.removeMsg(ss.First, false) { + break + } + } +} + // Will check the msg limit and drop firstSeq msg if needed. // Lock should be held. func (ms *memStore) enforceMsgLimit() { @@ -325,6 +378,7 @@ func (ms *memStore) Purge() (uint64, error) { ms.state.Bytes = 0 ms.state.Msgs = 0 ms.msgs = make(map[uint64]*storedMsg) + ms.fss = make(map[string]*SimpleState) ms.dmap = make(map[uint64]struct{}) ms.mu.Unlock() @@ -495,6 +549,30 @@ func (ms *memStore) updateFirstSeq(seq uint64) { } } +// Remove a seq from the fss and select new first. +// Lock should be held. +func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) { + ss := ms.fss[subj] + if ss == nil { + return + } + if ss.Msgs == 1 { + delete(ms.fss, subj) + return + } + ss.Msgs-- + if seq != ss.First { + return + } + // TODO(dlc) - Might want to optimize this. + for tseq := seq + 1; tseq < ss.Last; tseq++ { + if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj { + ss.First = tseq + return + } + } +} + // Removes the message referenced by seq. // Lock should he held. func (ms *memStore) removeMsg(seq uint64, secure bool) bool { @@ -523,6 +601,9 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool { sm.seq, sm.ts = 0, 0 } + // Remove any per subject tracking. + ms.removeSeqPerSubject(sm.subj, seq) + if ms.scb != nil { // We do not want to hold any locks here. ms.mu.Unlock() diff --git a/server/store.go b/server/store.go index 9dd4a8c6cd..38b9a61555 100644 --- a/server/store.go +++ b/server/store.go @@ -45,6 +45,8 @@ var ( ErrMaxMsgs = errors.New("maximum messages exceeded") // ErrMaxBytes is returned when we have discard new as a policy and we reached the bytes limit. ErrMaxBytes = errors.New("maximum bytes exceeded") + // ErrMaxMsgsPerSubject is returned when we have discard new as a policy and we reached the message limit per subject. + ErrMaxMsgsPerSubject = errors.New("maximum messages per subject exceeded") // ErrStoreSnapshotInProgress is returned when RemoveMsg or EraseMsg is called // while a snapshot is in progress. ErrStoreSnapshotInProgress = errors.New("snapshot in progress") diff --git a/server/stream.go b/server/stream.go index 61b8635d37..934b09ee66 100644 --- a/server/stream.go +++ b/server/stream.go @@ -44,9 +44,10 @@ type StreamConfig struct { MaxConsumers int `json:"max_consumers"` MaxMsgs int64 `json:"max_msgs"` MaxBytes int64 `json:"max_bytes"` - Discard DiscardPolicy `json:"discard"` MaxAge time.Duration `json:"max_age"` + MaxMsgsPer int64 `json:"max_msgs_per_subject"` MaxMsgSize int32 `json:"max_msg_size,omitempty"` + Discard DiscardPolicy `json:"discard"` Storage StorageType `json:"storage"` Replicas int `json:"num_replicas"` NoAck bool `json:"no_ack,omitempty"` From d9a0ff904c5ce52801d5fc29f9e3b5b723b4b1a3 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 15 Jun 2021 08:53:11 -0700 Subject: [PATCH 3/4] Bump timeout threshold Signed-off-by: Derek Collison --- server/norace_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/norace_test.go b/server/norace_test.go index 5f642ad892..3ae4e1f694 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -2062,7 +2062,7 @@ func TestJetStreamSlowFilteredInititalPendingAndFirstMsg(t *testing.T) { }) // Threshold for taking too long. - const thresh = 20 * time.Millisecond + const thresh = 30 * time.Millisecond var dindex int testConsumerCreate := func(subj string, startSeq, expectedNumPending uint64) { From 6219f0381d213601439b460c188fea6a48c62d09 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 15 Jun 2021 09:41:11 -0700 Subject: [PATCH 4/4] Test rename for no race versions Signed-off-by: Derek Collison --- server/norace_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/norace_test.go b/server/norace_test.go index 3ae4e1f694..7d785a7736 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -1756,7 +1756,7 @@ func TestNoRaceJetStreamClusterSourcesMuxd(t *testing.T) { } -func TestJetStreamClusterMirrorExpirationAndMissingSequences(t *testing.T) { +func TestNoRaceJetStreamClusterMirrorExpirationAndMissingSequences(t *testing.T) { c := createJetStreamClusterExplicit(t, "MMS", 9) defer c.shutdown() @@ -2001,7 +2001,7 @@ func TestNoRaceJetStreamClusterSuperClusterRIPStress(t *testing.T) { } } -func TestJetStreamSlowFilteredInititalPendingAndFirstMsg(t *testing.T) { +func TestNoRaceJetStreamSlowFilteredInititalPendingAndFirstMsg(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown()