From 1f00d0e3f2e008c736aec854f6676f4a15ca7f9f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 5 Aug 2023 12:32:29 -0700 Subject: [PATCH 1/3] Track deleted with single avl.SeqSet dmap for now vs old method. Size of encoding may be a bit bigger then we wanted, but still way better then old method and very fast. Signed-off-by: Derek Collison --- server/memstore.go | 53 +++++++++++++++++++++-------- server/memstore_test.go | 38 +++++++++++++++++++++ server/norace_test.go | 75 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 152 insertions(+), 14 deletions(-) diff --git a/server/memstore.go b/server/memstore.go index 8dfbffce33..29fb292441 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -31,6 +31,7 @@ type memStore struct { state StreamState msgs map[uint64]*StoreMsg fss map[string]*SimpleState + dmap avl.SequenceSet maxp int64 scb StorageUpdateHandler ageChk *time.Timer @@ -1005,6 +1006,7 @@ func (ms *memStore) updateFirstSeq(seq uint64) { break } } + oldFirst := ms.state.FirstSeq if nsm != nil { ms.state.FirstSeq = nsm.seq ms.state.FirstTime = time.Unix(0, nsm.ts).UTC() @@ -1013,6 +1015,17 @@ func (ms *memStore) updateFirstSeq(seq uint64) { ms.state.FirstSeq = ms.state.LastSeq + 1 ms.state.FirstTime = time.Time{} } + + if oldFirst == ms.state.FirstSeq-1 { + ms.dmap.Delete(oldFirst) + } else { + for seq := oldFirst; seq < ms.state.FirstSeq; seq++ { + ms.dmap.Delete(seq) + } + } + if ms.dmap.IsEmpty() { + ms.dmap.SetInitialMin(ms.state.FirstSeq) + } } // Remove a seq from the fss and select new first. @@ -1071,6 +1084,7 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool { } ms.state.Bytes -= ss } + ms.dmap.Insert(seq) ms.updateFirstSeq(seq) if secure { @@ -1230,29 +1244,30 @@ func (ms *memStore) Snapshot(_ time.Duration, _, _ bool) (*SnapshotResult, error // Binary encoded state snapshot, >= v2.10 server. func (ms *memStore) EncodedStreamState(failed uint64) ([]byte, error) { - // FIXME(dlc) - Don't calculate deleted on the fly, keep delete blocks. - state := ms.State() + ms.mu.RLock() + defer ms.mu.RUnlock() + + // Quick calculate num deleted. + numDeleted := int((ms.state.LastSeq - ms.state.FirstSeq + 1) - ms.state.Msgs) + if numDeleted < 0 { + numDeleted = 0 + } // Encoded is Msgs, Bytes, FirstSeq, LastSeq, Failed, NumDeleted and optional DeletedBlocks var buf [1024]byte buf[0], buf[1] = streamStateMagic, streamStateVersion n := hdrLen - n += binary.PutUvarint(buf[n:], state.Msgs) - n += binary.PutUvarint(buf[n:], state.Bytes) - n += binary.PutUvarint(buf[n:], state.FirstSeq) - n += binary.PutUvarint(buf[n:], state.LastSeq) + n += binary.PutUvarint(buf[n:], ms.state.Msgs) + n += binary.PutUvarint(buf[n:], ms.state.Bytes) + n += binary.PutUvarint(buf[n:], ms.state.FirstSeq) + n += binary.PutUvarint(buf[n:], ms.state.LastSeq) n += binary.PutUvarint(buf[n:], failed) - n += binary.PutUvarint(buf[n:], uint64(state.NumDeleted)) + n += binary.PutUvarint(buf[n:], uint64(numDeleted)) b := buf[0:n] - if state.NumDeleted > 0 { - var ss avl.SequenceSet - ss.SetInitialMin(state.Deleted[0]) - for _, seq := range state.Deleted { - ss.Insert(seq) - } - buf, err := ss.Encode(nil) + if numDeleted > 0 { + buf, err := ms.dmap.Encode(nil) if err != nil { return nil, err } @@ -1264,6 +1279,16 @@ func (ms *memStore) EncodedStreamState(failed uint64) ([]byte, error) { // SyncDeleted will make sure this stream has same deleted state as dbs. func (ms *memStore) SyncDeleted(dbs DeleteBlocks) { + // For now we share one dmap, so if we have one entry here check if states are the same. + // Note this will work for any DeleteBlock type, but we expect this to be a dmap too. + if len(dbs) == 1 { + ms.mu.RLock() + min, max, num := ms.dmap.State() + ms.mu.RUnlock() + if pmin, pmax, pnum := dbs[0].State(); pmin == min && pmax == max && pnum == num { + return + } + } for _, db := range dbs { db.Range(func(dseq uint64) bool { ms.RemoveMsg(dseq) diff --git a/server/memstore_test.go b/server/memstore_test.go index b3c59d468b..afc17df37b 100644 --- a/server/memstore_test.go +++ b/server/memstore_test.go @@ -754,3 +754,41 @@ func TestMemStoreInitialFirstSeq(t *testing.T) { t.Fatalf("Expected last seq 1001, got %d", state.LastSeq) } } + +func TestMemStoreDeleteBlocks(t *testing.T) { + cfg := &StreamConfig{ + Name: "zzz", + Subjects: []string{"*"}, + Storage: MemoryStorage, + } + ms, err := newMemStore(cfg) + require_NoError(t, err) + + // Put in 10_000 msgs. + total := 10_000 + for i := 0; i < total; i++ { + _, _, err := ms.StoreMsg("A", nil, []byte("OK")) + require_NoError(t, err) + } + + // Now pick 5k random sequences. + delete := 5000 + deleteMap := make(map[int]struct{}, delete) + for len(deleteMap) < delete { + deleteMap[rand.Intn(total)+1] = struct{}{} + } + // Now remove? + for seq := range deleteMap { + ms.RemoveMsg(uint64(seq)) + } + + var state StreamState + ms.FastState(&state) + + // For now we just track via one dmap. + ms.mu.RLock() + dmap := ms.dmap.Clone() + ms.mu.RUnlock() + + require_True(t, dmap.Size() == state.NumDeleted) +} diff --git a/server/norace_test.go b/server/norace_test.go index 868c2b133e..dadde35adb 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -8856,3 +8856,78 @@ func TestNoRaceJetStreamClusterStreamSnapshotCatchup(t *testing.T) { require_True(t, state.LastSeq == 51_001) require_True(t, state.NumDeleted == 51_001-3) } + +func TestNoRaceStoreStreamEncoderDecoder(t *testing.T) { + cfg := &StreamConfig{ + Name: "zzz", + Subjects: []string{"*"}, + MaxMsgsPer: 1, + Storage: MemoryStorage, + } + ms, err := newMemStore(cfg) + require_NoError(t, err) + + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, + StreamConfig{Name: "zzz", Subjects: []string{"*"}, MaxMsgsPer: 1, Storage: FileStorage}, + ) + require_NoError(t, err) + defer fs.Stop() + + const seed = 2222222 + msg := bytes.Repeat([]byte("ABC"), 33) // ~100bytes + + maxEncodeTime := 2 * time.Second + maxEncodeSize := 700 * 1024 + + test := func(t *testing.T, gs StreamStore) { + t.Parallel() + prand := rand.New(rand.NewSource(seed)) + tick := time.NewTicker(time.Second) + defer tick.Stop() + done := time.NewTimer(10 * time.Second) + + for running := true; running; { + select { + case <-tick.C: + var state StreamState + gs.FastState(&state) + if state.NumDeleted == 0 { + continue + } + start := time.Now() + snap, err := gs.EncodedStreamState(0) + require_NoError(t, err) + elapsed := time.Since(start) + // Should take <1ms without race but if CI/CD is slow we will give it a bit of room. + if elapsed > maxEncodeTime { + t.Logf("Encode took longer then expected: %v", elapsed) + } + if len(snap) > maxEncodeSize { + t.Fatalf("Expected snapshot size < %v got %v", friendlyBytes(maxEncodeSize), friendlyBytes(len(snap))) + } + ss, err := DecodeStreamState(snap) + require_True(t, len(ss.Deleted) > 0) + require_NoError(t, err) + case <-done.C: + running = false + default: + key := strconv.Itoa(prand.Intn(256_000)) + gs.StoreMsg(key, nil, msg) + } + } + } + + for _, gs := range []StreamStore{ms, fs} { + switch gs.(type) { + case *memStore: + t.Run("MemStore", func(t *testing.T) { + test(t, gs) + }) + case *fileStore: + t.Run("FileStore", func(t *testing.T) { + test(t, gs) + }) + } + } +} From 3b235059fa313a63c1113ec485e0dd59299a50cd Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 5 Aug 2023 12:33:30 -0700 Subject: [PATCH 2/3] We were trying to be too smart to save space at the expense of encoding time for filestore. Revert back to very simple but way faster method. Sometimes 100x faster and only ~8% size increase. Signed-off-by: Derek Collison --- server/filestore.go | 93 ++++++++++++++++------------------------ server/filestore_test.go | 37 ---------------- 2 files changed, 37 insertions(+), 93 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index b73b349774..7836db016a 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -6887,8 +6887,21 @@ func (fs *fileStore) fileStoreConfig() FileStoreConfig { return fs.fcfg } -// When we will write a run length encoded record vs adding to the existing avl.SequenceSet. -const rlThresh = 4096 +// Read lock all existing message blocks. +// Lock held on entry. +func (fs *fileStore) readLockAllMsgBlocks() { + for _, mb := range fs.blks { + mb.mu.RLock() + } +} + +// Read unlock all existing message blocks. +// Lock held on entry. +func (fs *fileStore) readUnlockAllMsgBlocks() { + for _, mb := range fs.blks { + mb.mu.RUnlock() + } +} // Binary encoded state snapshot, >= v2.10 server. func (fs *fileStore) EncodedStreamState(failed uint64) ([]byte, error) { @@ -6919,6 +6932,10 @@ func (fs *fileStore) EncodedStreamState(failed uint64) ([]byte, error) { if numDeleted > 0 { var scratch [4 * 1024]byte + + fs.readLockAllMsgBlocks() + defer fs.readUnlockAllMsgBlocks() + for _, db := range fs.deleteBlocks() { switch db := db.(type) { case *DeleteRange: @@ -6943,68 +6960,23 @@ func (fs *fileStore) EncodedStreamState(failed uint64) ([]byte, error) { return b, nil } -// Lock should be held. +// We used to be more sophisticated to save memory, but speed is more important. +// All blocks should be at least read locked. func (fs *fileStore) deleteBlocks() DeleteBlocks { - var ( - dbs DeleteBlocks - adm *avl.SequenceSet - prevLast uint64 - ) + var dbs DeleteBlocks + var prevLast uint64 + for _, mb := range fs.blks { - mb.mu.RLock() // Detect if we have a gap between these blocks. if prevLast > 0 && prevLast+1 != mb.first.seq { - // Detect if we need to encode a run length encoding here. - if gap := mb.first.seq - prevLast - 1; gap > rlThresh { - // Check if we have a running adm, if so write that out first, or if contigous update rle params. - if min, max, num := adm.State(); num > 0 { - // Check if we are all contingous. - if num == max-min+1 { - prevLast, gap = min-1, mb.first.seq-min - } else { - dbs = append(dbs, adm) - } - // Always nil out here. - adm = nil - } - dbs = append(dbs, &DeleteRange{First: prevLast + 1, Num: gap}) - } else { - // Common dmap - if adm == nil { - adm = &avl.SequenceSet{} - adm.SetInitialMin(prevLast + 1) - } - for seq := prevLast + 1; seq < mb.first.seq; seq++ { - adm.Insert(seq) - } - } + gap := mb.first.seq - prevLast - 1 + dbs = append(dbs, &DeleteRange{First: prevLast + 1, Num: gap}) } - if min, max, num := mb.dmap.State(); num > 0 { - // Check in case the mb's dmap is contiguous and over our threshold. - if num == max-min+1 && num > rlThresh { - // Need to write out adm if it exists. - if adm != nil && adm.Size() > 0 { - dbs = append(dbs, adm) - adm = nil - } - dbs = append(dbs, &DeleteRange{First: min, Num: max - min + 1}) - } else { - // Aggregated dmap - if adm == nil { - adm = mb.dmap.Clone() - } else { - adm.Union(&mb.dmap) - } - } + if mb.dmap.Size() > 0 { + dbs = append(dbs, &mb.dmap) } prevLast = mb.last.seq - mb.mu.RUnlock() } - - if adm != nil { - dbs = append(dbs, adm) - } - return dbs } @@ -7013,9 +6985,13 @@ func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) { if len(dbs) == 0 { return } + fs.mu.Lock() defer fs.mu.Unlock() + var needsCheck DeleteBlocks + + fs.readLockAllMsgBlocks() mdbs := fs.deleteBlocks() for i, db := range dbs { // If the block is same as what we have we can skip. @@ -7027,6 +7003,11 @@ func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) { } } // Need to insert these. + needsCheck = append(needsCheck, db) + } + fs.readUnlockAllMsgBlocks() + + for _, db := range needsCheck { db.Range(func(dseq uint64) bool { fs.removeMsg(dseq, false, true, false) return true diff --git a/server/filestore_test.go b/server/filestore_test.go index 24fa033c92..f4a3d57acf 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -30,7 +30,6 @@ import ( "os" "path/filepath" "reflect" - "strconv" "strings" "testing" "time" @@ -5707,39 +5706,3 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) { // Make sure it was update properly. require_True(t, *ss == SimpleState{Msgs: 1, First: 3, Last: 3, firstNeedsUpdate: false}) } - -func TestFileStoreStreamEncoderDecoder(t *testing.T) { - fs, err := newFileStore( - FileStoreConfig{StoreDir: t.TempDir()}, - StreamConfig{Name: "zzz", Subjects: []string{"*"}, MaxMsgsPer: 2, Storage: FileStorage}, - ) - require_NoError(t, err) - defer fs.Stop() - - const seed = 2222222 - prand := rand.New(rand.NewSource(seed)) - - tick := time.NewTicker(time.Second) - defer tick.Stop() - done := time.NewTimer(10 * time.Second) - - msg := bytes.Repeat([]byte("ABC"), 33) // ~100bytes - - for running := true; running; { - select { - case <-tick.C: - var state StreamState - fs.FastState(&state) - snap, err := fs.EncodedStreamState(0) - require_NoError(t, err) - ss, err := DecodeStreamState(snap) - require_True(t, len(ss.Deleted) > 0) - require_NoError(t, err) - case <-done.C: - running = false - default: - key := strconv.Itoa(prand.Intn(256_000)) - fs.StoreMsg(key, nil, msg) - } - } -} From 75e1171bddf64f4b3e7fe8499a20ae2df9aaf049 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 5 Aug 2023 13:20:38 -0700 Subject: [PATCH 3/3] No longer compacting multiple blocks, so remove test check Signed-off-by: Derek Collison --- server/norace_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/norace_test.go b/server/norace_test.go index dadde35adb..84e257fafd 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -8729,8 +8729,6 @@ func TestNoRaceBinaryStreamSnapshotEncodingBasic(t *testing.T) { require_True(t, ss.FirstSeq == 1) require_True(t, ss.LastSeq == 3000) require_True(t, ss.Msgs == 1000) - // We should have collapsed all these into 2 delete blocks. - require_True(t, len(ss.Deleted) <= 2) require_True(t, ss.Deleted.NumDeleted() == 2000) }