diff --git a/server/consumer.go b/server/consumer.go index d53a30a730..cb6647a308 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -3501,21 +3501,11 @@ func (o *consumer) streamNumPendingLocked() uint64 { func (o *consumer) streamNumPending() uint64 { if o.mset == nil || o.mset.store == nil { o.npc, o.npf = 0, 0 - } else if o.cfg.DeliverPolicy == DeliverLastPerSubject { - o.npc, o.npf = 0, 0 - for _, ss := range o.mset.store.SubjectsState(o.cfg.FilterSubject) { - if o.sseq <= ss.Last { - o.npc++ - if ss.Last > o.npf { - // Set our num pending sequence floor. - o.npf = ss.Last - } - } - } } else { - ss := o.mset.store.FilteredState(o.sseq, o.cfg.FilterSubject) - // Set our num pending and sequence floor. - o.npc, o.npf = int64(ss.Msgs), ss.Last + isLastPerSubject := o.cfg.DeliverPolicy == DeliverLastPerSubject + // Set our num pending and valid sequence floor. + npc, npf := o.mset.store.NumPending(o.sseq, o.cfg.FilterSubject, isLastPerSubject) + o.npc, o.npf = int64(npc), npf } return o.numPending() diff --git a/server/filestore.go b/server/filestore.go index 869eb8afcd..00fa70abee 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -71,6 +71,7 @@ type StoreCipher int const ( ChaCha StoreCipher = iota AES + NoCipher ) func (cipher StoreCipher) String() string { @@ -79,6 +80,8 @@ func (cipher StoreCipher) String() string { return "ChaCha20-Poly1305" case AES: return "AES-GCM" + case NoCipher: + return "None" default: return "Unknown StoreCipher" } @@ -1526,31 +1529,15 @@ func (mb *msgBlock) filteredPending(subj string, wc bool, seq uint64) (total, fi // This will traverse a message block and generate the filtered pending. // Lock should be held. -func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, seq uint64) (total, first, last uint64) { +func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) (total, first, last uint64) { isAll := filter == _EMPTY_ || filter == fwcs // First check if we can optimize this part. // This means we want all and the starting sequence was before this block. - if isAll && seq <= mb.first.seq { + if isAll && sseq <= mb.first.seq { return mb.msgs, mb.first.seq, mb.last.seq } - // Make sure we have fss loaded. - mb.ensurePerSubjectInfoLoaded() - - subs := []string{filter} - // If we have a wildcard match against all tracked subjects we know about. - if wc || isAll { - subs = subs[:0] - for subj := range mb.fss { - if isAll || subjectIsSubsetMatch(subj, filter) { - subs = append(subs, subj) - } - } - } - // If we load the cache for a linear scan we want to expire that cache upon exit. - var shouldExpire bool - update := func(ss *SimpleState) { total += ss.Msgs if first == 0 || ss.First < first { @@ -1561,98 +1548,70 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, seq uint64) (t } } - for i, subj := range subs { - // If the starting seq is less then or equal that means we want all and we do not need to load any messages. - ss := mb.fss[subj] - if ss == nil || seq > ss.Last { - continue - } - - // If the seq we are starting at is less then the simple state's first sequence we can just return the total msgs. - if seq <= ss.First { - update(ss) - continue - } + // Make sure we have fss loaded. + mb.ensurePerSubjectInfoLoaded() - // We may need to scan this one block since we have a partial set to consider. - // If we are all inclusive then we can do simple math and avoid the scan. - if allInclusive := ss.Msgs == ss.Last-ss.First+1; allInclusive { - update(ss) - // Make sure to compensate for the diff from the head. - if seq > ss.First { - first, total = seq, total-(seq-ss.First) - } - continue - } + tsa := [32]string{} + fsa := [32]string{} + fts := tokenizeSubjectIntoSlice(fsa[:0], filter) - // We need to scan this block to compute the correct number of pending for this block. - // We want to only do this once so we will adjust subs and test against them all here. + // 1. See if we match any subs from fss. + // 2. If we match and the sseq is past ss.Last then we can use meta only. + // 3. If we match and we need to do a partial, break and clear any totals and do a full scan like num pending. - if mb.cacheNotLoaded() { - mb.loadMsgsWithLock() - shouldExpire = true + isMatch := func(subj string) bool { + if !wc { + return subj == filter } + tts := tokenizeSubjectIntoSlice(tsa[:0], subj) + return isSubsetMatchTokenized(tts, fts) + } - var all, lseq uint64 - // Grab last applicable sequence as a union of all applicable subjects. - for _, subj := range subs[i:] { - if ss := mb.fss[subj]; ss != nil { - all += ss.Msgs - if ss.Last > lseq { - lseq = ss.Last - } + var havePartial bool + for subj, ss := range mb.fss { + if isAll || isMatch(subj) { + if sseq <= ss.First { + update(ss) + } else if sseq <= ss.Last { + // We matched but its a partial. + havePartial = true + break } } - numScanIn, numScanOut := lseq-seq, seq-mb.first.seq + } - var smv StoreMsg + // If we did not encounter any partials we can return here. + if !havePartial { + return total, first, last + } - isMatch := func(seq uint64) bool { - if sm, _ := mb.cacheLookup(seq, &smv); sm != nil { - if len(subs) == 1 && sm.subj == subs[0] { - return true - } - for _, subj := range subs { - if sm.subj == subj { - return true - } - } - } - return false - } + // If we are here we need to scan the msgs. + // Clear what we had. + total, first, last = 0, 0, 0 - // Decide on whether to scan those included or those excluded based on which scan amount is less. - if numScanIn < numScanOut { - for tseq := seq; tseq <= lseq; tseq++ { - if isMatch(tseq) { - total++ - if first == 0 || tseq < first { - first = tseq - } - last = tseq - } - } - } else { - // Here its more efficient to scan the out nodes. - var discard uint64 - for tseq := mb.first.seq; tseq < seq; tseq++ { - if isMatch(tseq) { - discard++ - } + // If we load the cache for a linear scan we want to expire that cache upon exit. + var shouldExpire bool + if mb.cacheNotLoaded() { + mb.loadMsgsWithLock() + shouldExpire = true + } + + var smv StoreMsg + for seq := sseq; seq <= mb.last.seq; seq++ { + sm, _ := mb.cacheLookup(seq, &smv) + if sm == nil { + continue + } + if isAll || isMatch(sm.subj) { + total++ + if first == 0 || seq < first { + first = seq } - total += (all - discard) - // Now make sure we match our first - for tseq := seq; tseq <= lseq; tseq++ { - if isMatch(tseq) { - first = tseq - break - } + if seq > last { + last = seq } } - // We can bail since we scanned all remaining in this pass. - break } - // If we loaded this block for this operation go ahead and expire it here. if shouldExpire { mb.tryForceExpireCacheLocked() @@ -1828,8 +1787,231 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { return fss } +// NumPending will return the number of pending messages matching the filter subject starting at sequence. +// Optimized for stream num pending calculations for consumers. +func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) (total, validThrough uint64) { + fs.mu.RLock() + defer fs.mu.RUnlock() + + // This can always be last for these purposes. + validThrough = fs.state.LastSeq + + if fs.state.Msgs == 0 || sseq > fs.state.LastSeq { + return 0, validThrough + } + + // Track starting for both block for the sseq and staring block that matches any subject. + var seqStart, subjStart int + + // See if we need to figure out starting block per sseq. + if sseq > fs.state.FirstSeq { + seqStart, _ = fs.selectMsgBlockWithIndex(sseq) + } + + tsa := [32]string{} + fsa := [32]string{} + fts := tokenizeSubjectIntoSlice(fsa[:0], filter) + isAll := filter == _EMPTY_ || filter == fwcs + wc := subjectHasWildcard(filter) + + isMatch := func(subj string) bool { + if isAll { + return true + } + if !wc { + return subj == filter + } + tts := tokenizeSubjectIntoSlice(tsa[:0], subj) + return isSubsetMatchTokenized(tts, fts) + } + + // If we would need to scan more from the beginning, revert back to calculating directly here. + // TODO(dlc) - Redo properly with sublists etc for subject-based filtering. + if lastPerSubject || seqStart >= (len(fs.blks)/2) { + // If we need to track seen for last per subject. + var seen map[string]bool + if lastPerSubject { + seen = make(map[string]bool) + } + + for i := seqStart; i < len(fs.blks); i++ { + mb := fs.blks[i] + mb.mu.Lock() + var t uint64 + if isAll && sseq <= mb.first.seq { + if lastPerSubject { + for subj := range mb.fss { + if !seen[subj] { + total++ + seen[subj] = true + } + } + } else { + total += mb.msgs + } + mb.mu.Unlock() + continue + } + + // If we are here we need to at least scan the subject fss. + // Make sure we have fss loaded. + mb.ensurePerSubjectInfoLoaded() + var havePartial bool + for subj, ss := range mb.fss { + if !seen[subj] && isMatch(subj) { + if lastPerSubject { + // Can't have a partials with last by subject. + if sseq <= ss.Last { + t++ + seen[subj] = true + } + } else { + if sseq <= ss.First { + t += ss.Msgs + } else if sseq <= ss.Last { + // We matched but its a partial. + havePartial = true + break + } + } + } + } + // See if we need to scan msgs here. + if havePartial { + // Clear on partial. + t = 0 + // If we load the cache for a linear scan we want to expire that cache upon exit. + var shouldExpire bool + if mb.cacheNotLoaded() { + mb.loadMsgsWithLock() + shouldExpire = true + } + var smv StoreMsg + for seq := sseq; seq <= mb.last.seq; seq++ { + if sm, _ := mb.cacheLookup(seq, &smv); sm != nil && (isAll || isMatch(sm.subj)) { + t++ + } + } + // If we loaded this block for this operation go ahead and expire it here. + if shouldExpire { + mb.tryForceExpireCacheLocked() + } + } + mb.mu.Unlock() + total += t + } + return total, validThrough + } + + // If we are here its better to calculate totals from psim and adjust downward by scanning less blocks. + // TODO(dlc) - Eventually when sublist uses generics, make this sublist driven instead. + start := uint32(math.MaxUint32) + for subj, psi := range fs.psim { + if isMatch(subj) { + if lastPerSubject { + total++ + // Keep track of start index for this subject. + // Use last block in this case. + if psi.lblk < start { + start = psi.lblk + } + } else { + total += psi.total + // Keep track of start index for this subject. + if psi.fblk < start { + start = psi.fblk + } + } + } + } + // See if we were asked for all, if so we are done. + if sseq <= fs.state.FirstSeq { + return total, validThrough + } + + // If we are here we need to calculate partials for the first blocks. + subjStart = int(start) + firstSubjBlk := fs.bim[uint32(subjStart)] + var firstSubjBlkFound bool + var smv StoreMsg + + // Adjust in case not found. + if firstSubjBlk == nil { + firstSubjBlkFound = true + } + + // Track how many we need to adjust against the total. + var adjust uint64 + + for i := 0; i <= seqStart; i++ { + mb := fs.blks[i] + + // We can skip blks if we know they are below the first one that has any subject matches. + if !firstSubjBlkFound { + if mb == firstSubjBlk { + firstSubjBlkFound = true + } else { + continue + } + } + + // We need to scan this block. + var shouldExpire bool + mb.mu.Lock() + // Check if we should include all of this block in adjusting. If so work with metadata. + if sseq > mb.last.seq { + // We need to adjust for all matches in this block. + // We will scan fss state vs messages themselves. + // Make sure we have fss loaded. + mb.ensurePerSubjectInfoLoaded() + for subj, ss := range mb.fss { + if isMatch(subj) { + if lastPerSubject { + adjust++ + } else { + adjust += ss.Msgs + } + } + } + } else { + // This is the last block. We need to scan per message here. + if mb.cacheNotLoaded() { + if err := mb.loadMsgsWithLock(); err != nil { + mb.mu.Unlock() + return 0, 0 + } + shouldExpire = true + } + + var last = mb.last.seq + if sseq < last { + last = sseq + } + for seq := mb.first.seq; seq < last; seq++ { + sm, _ := mb.cacheLookup(seq, &smv) + if sm == nil { + continue + } + // Check if it matches our filter. + if isMatch(sm.subj) && sm.seq < sseq { + adjust++ + } + } + } + // If we loaded the block try to force expire. + if shouldExpire { + mb.tryForceExpireCacheLocked() + } + mb.mu.Unlock() + } + // Make final adjustment. + total -= adjust + + return total, validThrough +} + // SubjectsTotal return message totals per subject. -func (fs *fileStore) SubjectsTotals(filterSubject string) map[string]uint64 { +func (fs *fileStore) SubjectsTotals(filter string) map[string]uint64 { fs.mu.RLock() defer fs.mu.RUnlock() @@ -1839,17 +2021,22 @@ func (fs *fileStore) SubjectsTotals(filterSubject string) map[string]uint64 { tsa := [32]string{} fsa := [32]string{} - fts := tokenizeSubjectIntoSlice(fsa[:0], filterSubject) - isAll := filterSubject == _EMPTY_ || filterSubject == fwcs + fts := tokenizeSubjectIntoSlice(fsa[:0], filter) + isAll := filter == _EMPTY_ || filter == fwcs + wc := subjectHasWildcard(filter) + + isMatch := func(subj string) bool { + if !wc { + return subj == filter + } + tts := tokenizeSubjectIntoSlice(tsa[:0], subj) + return isSubsetMatchTokenized(tts, fts) + } fst := make(map[string]uint64) for subj, psi := range fs.psim { - if isAll { + if isAll || isMatch(subj) { fst[subj] = psi.total - } else { - if tts := tokenizeSubjectIntoSlice(tsa[:0], subj); isSubsetMatchTokenized(tts, fts) { - fst[subj] = psi.total - } } } return fst @@ -3618,14 +3805,20 @@ func (fs *fileStore) syncBlocks() { // Return nil if not in the set. // Read lock should be held. func (fs *fileStore) selectMsgBlock(seq uint64) *msgBlock { + _, mb := fs.selectMsgBlockWithIndex(seq) + return mb +} + +func (fs *fileStore) selectMsgBlockWithIndex(seq uint64) (int, *msgBlock) { // Check for out of range. if seq < fs.state.FirstSeq || seq > fs.state.LastSeq { - return nil + return -1, nil } // Starting index, defaults to beginning. si := 0 + // TODO(dlc) - Use new AVL and make this real for anything beyond certain size. // Max threshold before we probe for a starting block to start our linear search. const maxl = 256 if nb := len(fs.blks); nb > maxl { @@ -3643,11 +3836,11 @@ func (fs *fileStore) selectMsgBlock(seq uint64) *msgBlock { for i := si; i < len(fs.blks); i++ { mb := fs.blks[i] if seq <= atomic.LoadUint64(&mb.last.seq) { - return mb + return i, mb } } - return nil + return -1, nil } // Select the message block where this message should be found. diff --git a/server/filestore_test.go b/server/filestore_test.go index 718d90a864..92ccfd2f3c 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -39,6 +39,7 @@ import ( func testFileStoreAllPermutations(t *testing.T, fn func(t *testing.T, fcfg FileStoreConfig)) { for _, fcfg := range []FileStoreConfig{ + {Cipher: NoCipher}, {Cipher: ChaCha}, {Cipher: AES}, } { @@ -800,7 +801,9 @@ func TestFileStoreCompact(t *testing.T) { } return h.Sum(nil), nil } - + if fcfg.Cipher == NoCipher { + prf = nil + } fs, err := newFileStoreWithCreated( fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, @@ -989,6 +992,9 @@ func TestFileStoreStreamTruncate(t *testing.T) { } return h.Sum(nil), nil } + if fcfg.Cipher == NoCipher { + prf = nil + } fs, err := newFileStoreWithCreated( fcfg, @@ -3460,6 +3466,9 @@ func TestFileStoreSparseCompaction(t *testing.T) { } return h.Sum(nil), nil } + if fcfg.Cipher == NoCipher { + prf = nil + } fs, err = newFileStoreWithCreated(fcfg, cfg, time.Now(), prf) if err != nil { @@ -3772,6 +3781,9 @@ func TestFileStoreCompactReclaimHeadSpace(t *testing.T) { } return h.Sum(nil), nil } + if fcfg.Cipher == NoCipher { + prf = nil + } fs, err = newFileStoreWithCreated( fcfg, @@ -4024,6 +4036,9 @@ func TestFileStoreShortIndexWriteBug(t *testing.T) { } return h.Sum(nil), nil } + if fcfg.Cipher == NoCipher { + prf = nil + } created := time.Now() @@ -4087,6 +4102,9 @@ func TestFileStoreDoubleCompactWithWriteInBetweenEncryptedBug(t *testing.T) { } return h.Sum(nil), nil } + if fcfg.Cipher == NoCipher { + prf = nil + } fs, err := newFileStoreWithCreated( fcfg, @@ -5276,81 +5294,204 @@ func TestFileStoreOnlyWritePerSubjectInfoOnExpireWithUpdate(t *testing.T) { } func TestFileStoreSubjectsTotals(t *testing.T) { - testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { - fs, err := newFileStore( - fcfg, - StreamConfig{Name: "zzz", Subjects: []string{"*.*"}, Storage: FileStorage}, - ) - require_NoError(t, err) - defer fs.Stop() + // No need for all permutations here. + storeDir := t.TempDir() + fcfg := FileStoreConfig{ + StoreDir: storeDir, + } + fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"*.*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() - fmap := make(map[int]int) - bmap := make(map[int]int) + fmap := make(map[int]int) + bmap := make(map[int]int) - var m map[int]int - var ft string + var m map[int]int + var ft string - for i := 0; i < 10_000; i++ { - // Flip coin for prefix - if rand.Intn(2) == 0 { - ft, m = "foo", fmap - } else { - ft, m = "bar", bmap - } - dt := rand.Intn(100) - subj := fmt.Sprintf("%s.%d", ft, dt) - m[dt]++ + for i := 0; i < 10_000; i++ { + // Flip coin for prefix + if rand.Intn(2) == 0 { + ft, m = "foo", fmap + } else { + ft, m = "bar", bmap + } + dt := rand.Intn(100) + subj := fmt.Sprintf("%s.%d", ft, dt) + m[dt]++ - _, _, err := fs.StoreMsg(subj, nil, []byte("Hello World")) - require_NoError(t, err) + _, _, err := fs.StoreMsg(subj, nil, []byte("Hello World")) + require_NoError(t, err) + } + + // Now test SubjectsTotal + for dt, total := range fmap { + subj := fmt.Sprintf("foo.%d", dt) + m := fs.SubjectsTotals(subj) + if m[subj] != uint64(total) { + t.Fatalf("Expected %q to have %d total, got %d", subj, total, m[subj]) } + } - // Now test SubjectsTotal - for dt, total := range fmap { - subj := fmt.Sprintf("foo.%d", dt) - m := fs.SubjectsTotals(subj) - if m[subj] != uint64(total) { - t.Fatalf("Expected %q to have %d total, got %d", subj, total, m[subj]) - } + // Check fmap. + if st := fs.SubjectsTotals("foo.*"); len(st) != len(fmap) { + t.Fatalf("Expected %d subjects for %q, got %d", len(fmap), "foo.*", len(st)) + } else { + expected := 0 + for _, n := range fmap { + expected += n + } + received := uint64(0) + for _, n := range st { + received += n } + if received != uint64(expected) { + t.Fatalf("Expected %d total but got %d", expected, received) + } + } - // Check fmap. - if st := fs.SubjectsTotals("foo.*"); len(st) != len(fmap) { - t.Fatalf("Expected %d subjects for %q, got %d", len(fmap), "foo.*", len(st)) - } else { - expected := 0 - for _, n := range fmap { - expected += n - } - received := uint64(0) - for _, n := range st { - received += n - } - if received != uint64(expected) { - t.Fatalf("Expected %d total but got %d", expected, received) - } + // Check bmap. + if st := fs.SubjectsTotals("bar.*"); len(st) != len(bmap) { + t.Fatalf("Expected %d subjects for %q, got %d", len(bmap), "bar.*", len(st)) + } else { + expected := 0 + for _, n := range bmap { + expected += n + } + received := uint64(0) + for _, n := range st { + received += n } + if received != uint64(expected) { + t.Fatalf("Expected %d total but got %d", expected, received) + } + } - // Check bmap. - if st := fs.SubjectsTotals("bar.*"); len(st) != len(bmap) { - t.Fatalf("Expected %d subjects for %q, got %d", len(bmap), "bar.*", len(st)) - } else { - expected := 0 - for _, n := range bmap { - expected += n + // All with pwc match. + if st, expected := fs.SubjectsTotals("*.*"), len(bmap)+len(fmap); len(st) != expected { + t.Fatalf("Expected %d subjects for %q, got %d", expected, "*.*", len(st)) + } +} + +func TestFileStoreNumPending(t *testing.T) { + // No need for all permutations here. + storeDir := t.TempDir() + fcfg := FileStoreConfig{ + StoreDir: storeDir, + BlockSize: 2 * 1024, // Create many blocks on purpose. + } + fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"*.*.*.*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + tokens := []string{"foo", "bar", "baz"} + genSubj := func() string { + return fmt.Sprintf("%s.%s.%s.%s", + tokens[rand.Intn(len(tokens))], + tokens[rand.Intn(len(tokens))], + tokens[rand.Intn(len(tokens))], + tokens[rand.Intn(len(tokens))], + ) + } + + for i := 0; i < 50_000; i++ { + subj := genSubj() + _, _, err := fs.StoreMsg(subj, nil, []byte("Hello World")) + require_NoError(t, err) + } + + state := fs.State() + + // Scan one by one for sanity check against other calculations. + sanityCheck := func(sseq uint64, filter string) SimpleState { + t.Helper() + var ss SimpleState + var smv StoreMsg + // For here we know 0 is invalid, set to 1. + if sseq == 0 { + sseq = 1 + } + for seq := sseq; seq <= state.LastSeq; seq++ { + sm, err := fs.LoadMsg(seq, &smv) + if err != nil { + t.Logf("Encountered error %v loading sequence: %d", err, seq) + continue + } + if subjectIsSubsetMatch(sm.subj, filter) { + ss.Msgs++ + ss.Last = seq + if ss.First == 0 || seq < ss.First { + ss.First = seq + } } - received := uint64(0) - for _, n := range st { - received += n + } + return ss + } + + check := func(sseq uint64, filter string) { + t.Helper() + np, lvs := fs.NumPending(sseq, filter, false) + ss := fs.FilteredState(sseq, filter) + sss := sanityCheck(sseq, filter) + if lvs != state.LastSeq { + t.Fatalf("Expected NumPending to return valid through last of %d but got %d", state.LastSeq, lvs) + } + if ss.Msgs != np { + t.Fatalf("NumPending of %d did not match ss.Msgs of %d", np, ss.Msgs) + } + if ss != sss { + t.Fatalf("Failed sanity check, expected %+v got %+v", sss, ss) + } + } + + sanityCheckLastOnly := func(sseq uint64, filter string) SimpleState { + t.Helper() + var ss SimpleState + var smv StoreMsg + // For here we know 0 is invalid, set to 1. + if sseq == 0 { + sseq = 1 + } + seen := make(map[string]bool) + for seq := state.LastSeq; seq >= sseq; seq-- { + sm, err := fs.LoadMsg(seq, &smv) + if err != nil { + t.Logf("Encountered error %v loading sequence: %d", err, seq) + continue } - if received != uint64(expected) { - t.Fatalf("Expected %d total but got %d", expected, received) + if !seen[sm.subj] && subjectIsSubsetMatch(sm.subj, filter) { + ss.Msgs++ + if ss.Last == 0 { + ss.Last = seq + } + if ss.First == 0 || seq < ss.First { + ss.First = seq + } + seen[sm.subj] = true } } + return ss + } - // All with pwc match. - if st, expected := fs.SubjectsTotals("*.*"), len(bmap)+len(fmap); len(st) != expected { - t.Fatalf("Expected %d subjects for %q, got %d", expected, "*.*", len(st)) + checkLastOnly := func(sseq uint64, filter string) { + t.Helper() + np, lvs := fs.NumPending(sseq, filter, true) + ss := sanityCheckLastOnly(sseq, filter) + if lvs != state.LastSeq { + t.Fatalf("Expected NumPending to return valid through last of %d but got %d", state.LastSeq, lvs) } - }) + if ss.Msgs != np { + t.Fatalf("NumPending of %d did not match ss.Msgs of %d", np, ss.Msgs) + } + } + + startSeqs := []uint64{0, 1, 2, 200, 444, 555, 2222, 8888, 12_345, 28_222, 33_456, 44_400, 49_999} + checkSubs := []string{"foo.>", "*.bar.>", "foo.bar.*.baz", "*.bar.>", "*.foo.bar.*", "foo.foo.bar.baz"} + + for _, filter := range checkSubs { + for _, start := range startSeqs { + check(start, filter) + checkLastOnly(start, filter) + } + } } diff --git a/server/memstore.go b/server/memstore.go index 7adb59e75c..c36acd9374 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -289,10 +289,10 @@ func (ms *memStore) GetSeqFromTime(t time.Time) uint64 { func (ms *memStore) FilteredState(sseq uint64, subj string) SimpleState { ms.mu.RLock() defer ms.mu.RUnlock() - return ms.filteredStateLocked(sseq, subj) + return ms.filteredStateLocked(sseq, subj, false) } -func (ms *memStore) filteredStateLocked(sseq uint64, subj string) SimpleState { +func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubject bool) SimpleState { var ss SimpleState if sseq < ms.state.FirstSeq { @@ -304,49 +304,139 @@ func (ms *memStore) filteredStateLocked(sseq uint64, subj string) SimpleState { return ss } - // Empty same as everything. - if subj == _EMPTY_ { - subj = fwcs + isAll := filter == _EMPTY_ || filter == fwcs + + // First check if we can optimize this part. + // This means we want all and the starting sequence was before this block. + if isAll && sseq <= ms.state.FirstSeq { + total := ms.state.Msgs + if lastPerSubject { + total = uint64(len(ms.fss)) + } + return SimpleState{ + Msgs: total, + First: ms.state.FirstSeq, + Last: ms.state.LastSeq, + } } - wc := subjectHasWildcard(subj) - subs := []string{subj} - if wc { - subs = subs[:0] - for fsubj := range ms.fss { - if subjectIsSubsetMatch(fsubj, subj) { - subs = append(subs, fsubj) - } + tsa := [32]string{} + fsa := [32]string{} + fts := tokenizeSubjectIntoSlice(fsa[:0], filter) + wc := subjectHasWildcard(filter) + + // 1. See if we match any subs from fss. + // 2. If we match and the sseq is past ss.Last then we can use meta only. + // 3. If we match and we need to do a partial, break and clear any totals and do a full scan like num pending. + + isMatch := func(subj string) bool { + if isAll { + return true + } + if !wc { + return subj == filter } + tts := tokenizeSubjectIntoSlice(tsa[:0], subj) + return isSubsetMatchTokenized(tts, fts) } - fseq, lseq := ms.state.LastSeq, uint64(0) - for _, subj := range subs { - ss := ms.fss[subj] - if ss == nil { - continue + + update := func(fss *SimpleState) { + msgs, first, last := fss.Msgs, fss.First, fss.Last + if lastPerSubject { + msgs, first = 1, last } - if ss.First < fseq { - fseq = ss.First + ss.Msgs += msgs + if ss.First == 0 || first < ss.First { + ss.First = first } - if ss.Last > lseq { - lseq = ss.Last + if last > ss.Last { + ss.Last = last + } + } + + var havePartial bool + // We will track start and end sequences as we go. + for subj, fss := range ms.fss { + if isMatch(subj) { + if sseq <= fss.First { + update(fss) + } else if sseq <= fss.Last { + // We matched but its a partial. + havePartial = true + // Don't break here, we will update to keep tracking last. + update(fss) + } } } - if fseq < sseq { - fseq = sseq + + // If we did not encounter any partials we can return here. + if !havePartial { + return ss } - // FIXME(dlc) - Optimize better like filestore. - eq := compareFn(subj) - for seq := fseq; seq <= lseq; seq++ { - if sm, ok := ms.msgs[seq]; ok && eq(sm.subj, subj) { - ss.Msgs++ - if ss.First == 0 { - ss.First = seq + // If we are here we need to scan the msgs. + // Capture first and last sequences for scan and then clear what we had. + first, last := ss.First, ss.Last + // To track if we decide to exclude and we need to calculate first. + var needScanFirst bool + if first < sseq { + first = sseq + needScanFirst = true + } + + // Now we want to check if it is better to scan inclusive and recalculate that way + // or leave and scan exclusive and adjust our totals. + // ss.Last is always correct here. + toScan, toExclude := last-first, first-ms.state.FirstSeq+ms.state.LastSeq-ss.Last + var seen map[string]bool + if lastPerSubject { + seen = make(map[string]bool) + } + if toScan < toExclude { + ss.Msgs, ss.First = 0, 0 + for seq := first; seq <= last; seq++ { + if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && isMatch(sm.subj) { + ss.Msgs++ + if ss.First == 0 { + ss.First = seq + } + if seen != nil { + seen[sm.subj] = true + } + } + } + } else { + // We will adjust from the totals above by scanning what we need to exclude. + ss.First = first + var adjust uint64 + for seq := ms.state.FirstSeq; seq < first; seq++ { + if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && isMatch(sm.subj) { + adjust++ + if seen != nil { + seen[sm.subj] = true + } + } + } + // Now do range at end. + for seq := last + 1; seq < ms.state.LastSeq; seq++ { + if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && isMatch(sm.subj) { + adjust++ + if seen != nil { + seen[sm.subj] = true + } + } + } + ss.Msgs -= adjust + if needScanFirst { + for seq := first; seq < last; seq++ { + if sm, ok := ms.msgs[seq]; ok && isMatch(sm.subj) { + ss.First = seq + break + } } - ss.Last = seq } } + return ss } @@ -402,6 +492,15 @@ func (ms *memStore) SubjectsTotals(filterSubject string) map[string]uint64 { return fst } +// NumPending will return the number of pending messages matching the filter subject starting at sequence. +func (ms *memStore) NumPending(sseq uint64, filter string, lastPerSubject bool) (total, validThrough uint64) { + ms.mu.RLock() + defer ms.mu.RUnlock() + + ss := ms.filteredStateLocked(sseq, filter, lastPerSubject) + return ss.Msgs, ms.state.LastSeq +} + // Will check the msg limit for this tracked subject. // Lock should be held. func (ms *memStore) enforcePerSubjectLimit(ss *SimpleState) { @@ -745,7 +844,7 @@ func (ms *memStore) LoadLastMsg(subject string, smp *StoreMsg) (*StoreMsg, error if subject == _EMPTY_ || subject == fwcs { sm, ok = ms.msgs[ms.state.LastSeq] - } else if ss := ms.filteredStateLocked(1, subject); ss.Msgs > 0 { + } else if ss := ms.filteredStateLocked(1, subject, true); ss.Msgs > 0 { sm, ok = ms.msgs[ss.Last] } if !ok || sm == nil { diff --git a/server/memstore_test.go b/server/memstore_test.go index db09633e81..78adbe433f 100644 --- a/server/memstore_test.go +++ b/server/memstore_test.go @@ -599,5 +599,125 @@ func TestMemStoreSubjectsTotals(t *testing.T) { if st, expected := ms.SubjectsTotals("*.*"), len(bmap)+len(fmap); len(st) != expected { t.Fatalf("Expected %d subjects for %q, got %d", expected, "*.*", len(st)) } +} + +func TestMemStoreNumPending(t *testing.T) { + cfg := &StreamConfig{ + Name: "TEST", + Storage: MemoryStorage, + Subjects: []string{"*.*.*.*"}, + } + ms, err := newMemStore(cfg) + require_NoError(t, err) + + tokens := []string{"foo", "bar", "baz"} + genSubj := func() string { + return fmt.Sprintf("%s.%s.%s.%s", + tokens[rand.Intn(len(tokens))], + tokens[rand.Intn(len(tokens))], + tokens[rand.Intn(len(tokens))], + tokens[rand.Intn(len(tokens))], + ) + } + + for i := 0; i < 50_000; i++ { + subj := genSubj() + _, _, err := ms.StoreMsg(subj, nil, []byte("Hello World")) + require_NoError(t, err) + } + + state := ms.State() + + // Scan one by one for sanity check against other calculations. + sanityCheck := func(sseq uint64, filter string) SimpleState { + t.Helper() + var ss SimpleState + var smv StoreMsg + // For here we know 0 is invalid, set to 1. + if sseq == 0 { + sseq = 1 + } + for seq := sseq; seq <= state.LastSeq; seq++ { + sm, err := ms.LoadMsg(seq, &smv) + if err != nil { + t.Logf("Encountered error %v loading sequence: %d", err, seq) + continue + } + if subjectIsSubsetMatch(sm.subj, filter) { + ss.Msgs++ + ss.Last = seq + if ss.First == 0 || seq < ss.First { + ss.First = seq + } + } + } + return ss + } + + check := func(sseq uint64, filter string) { + t.Helper() + np, lvs := ms.NumPending(sseq, filter, false) + ss := ms.FilteredState(sseq, filter) + sss := sanityCheck(sseq, filter) + if lvs != state.LastSeq { + t.Fatalf("Expected NumPending to return valid through last of %d but got %d", state.LastSeq, lvs) + } + if ss.Msgs != np { + t.Fatalf("NumPending of %d did not match ss.Msgs of %d", np, ss.Msgs) + } + if ss != sss { + t.Fatalf("Failed sanity check, expected %+v got %+v", sss, ss) + } + } + + sanityCheckLastOnly := func(sseq uint64, filter string) SimpleState { + t.Helper() + var ss SimpleState + var smv StoreMsg + // For here we know 0 is invalid, set to 1. + if sseq == 0 { + sseq = 1 + } + seen := make(map[string]bool) + for seq := state.LastSeq; seq >= sseq; seq-- { + sm, err := ms.LoadMsg(seq, &smv) + if err != nil { + t.Logf("Encountered error %v loading sequence: %d", err, seq) + continue + } + if !seen[sm.subj] && subjectIsSubsetMatch(sm.subj, filter) { + ss.Msgs++ + if ss.Last == 0 { + ss.Last = seq + } + if ss.First == 0 || seq < ss.First { + ss.First = seq + } + seen[sm.subj] = true + } + } + return ss + } + checkLastOnly := func(sseq uint64, filter string) { + t.Helper() + np, lvs := ms.NumPending(sseq, filter, true) + ss := sanityCheckLastOnly(sseq, filter) + if lvs != state.LastSeq { + t.Fatalf("Expected NumPending to return valid through last of %d but got %d", state.LastSeq, lvs) + } + if ss.Msgs != np { + t.Fatalf("NumPending of %d did not match ss.Msgs of %d", np, ss.Msgs) + } + } + + startSeqs := []uint64{0, 1, 2, 200, 444, 555, 2222, 8888, 12_345, 28_222, 33_456, 44_400, 49_999} + checkSubs := []string{"foo.>", "*.bar.>", "foo.bar.*.baz", "*.bar.>", "*.foo.bar.*", "foo.foo.bar.baz"} + + for _, filter := range checkSubs { + for _, start := range startSeqs { + check(start, filter) + checkLastOnly(start, filter) + } + } } diff --git a/server/store.go b/server/store.go index 77477ba570..e03ad10af8 100644 --- a/server/store.go +++ b/server/store.go @@ -96,6 +96,7 @@ type StreamStore interface { FilteredState(seq uint64, subject string) SimpleState SubjectsState(filterSubject string) map[string]SimpleState SubjectsTotals(filterSubject string) map[string]uint64 + NumPending(sseq uint64, filter string, lastPerSubject bool) (total, validThrough uint64) State() StreamState FastState(*StreamState) Type() StorageType