From ae39c82d043b43e52d17519d35177861679ffb2c Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Wed, 8 Oct 2025 17:06:40 -0700 Subject: [PATCH 1/4] Fix async flush logic and flaky unit test --- config/config.go | 4 ---- ss/pebbledb/db.go | 41 ++++++++++++++++++++-------------------- ss/pebbledb/hash_test.go | 11 +++++------ ss/store.go | 39 +++++++++++++++++++------------------- ss/store_test.go | 13 ++++++------- 5 files changed, 52 insertions(+), 56 deletions(-) diff --git a/config/config.go b/config/config.go index 81af657e..ae571022 100644 --- a/config/config.go +++ b/config/config.go @@ -62,9 +62,6 @@ type StateStoreConfig struct { // default to empty DBDirectory string `mapstructure:"db-directory"` - // DedicatedChangelog defines if we should use a separate changelog for SS store other than sharing with SC - DedicatedChangelog bool `mapstructure:"dedicated-changelog"` - // Backend defines the backend database used for state-store // Supported backends: pebbledb, rocksdb // defaults to pebbledb @@ -101,7 +98,6 @@ func DefaultStateCommitConfig() StateCommitConfig { return StateCommitConfig{ Enable: true, AsyncCommitBuffer: DefaultAsyncCommitBuffer, - CacheSize: DefaultCacheSize, SnapshotInterval: DefaultSnapshotInterval, SnapshotKeepRecent: DefaultSnapshotKeepRecent, } diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index b55fdb40..d09910f7 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -152,34 +152,35 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { } database.lastRangeHashedCache = lastHashed - if config.DedicatedChangelog { - if config.KeepRecent < 0 { - return nil, errors.New("KeepRecent must be non-negative") - } - streamHandler, _ := changelog.NewStream( - logger.NewNopLogger(), - utils.GetChangelogPath(dataDir), - changelog.Config{ - DisableFsync: true, - ZeroCopy: true, - KeepRecent: uint64(config.KeepRecent), - PruneInterval: 300 * time.Second, - }, - ) - database.streamHandler = streamHandler - go database.writeAsyncInBackground() - } + if config.KeepRecent < 0 { + return nil, errors.New("KeepRecent must be non-negative") + } + streamHandler, _ := changelog.NewStream( + logger.NewNopLogger(), + utils.GetChangelogPath(dataDir), + changelog.Config{ + DisableFsync: true, + ZeroCopy: true, + KeepRecent: uint64(config.KeepRecent), + PruneInterval: time.Duration(config.PruneIntervalSeconds) * time.Second, + }, + ) + database.streamHandler = streamHandler + go database.writeAsyncInBackground() + return database, nil } func (db *Database) Close() error { + // First, stop accepting new pending changes and drain the worker + close(db.pendingChanges) + // Wait for the async writes to finish + db.asyncWriteWG.Wait() + // Now close the WAL stream if db.streamHandler != nil { _ = db.streamHandler.Close() db.streamHandler = nil - close(db.pendingChanges) } - // Wait for the async writes to finish - db.asyncWriteWG.Wait() err := db.storage.Close() db.storage = nil return err diff --git a/ss/pebbledb/hash_test.go b/ss/pebbledb/hash_test.go index 76e0d239..f61039b8 100644 --- a/ss/pebbledb/hash_test.go +++ b/ss/pebbledb/hash_test.go @@ -22,12 +22,11 @@ func setupTestDB(t *testing.T) (*Database, string) { // Set up config with hash range enabled cfg := config.StateStoreConfig{ - HashRange: 10, // 10 blocks per hash range - AsyncWriteBuffer: 100, - KeepRecent: 100, - KeepLastVersion: true, - ImportNumWorkers: 4, - DedicatedChangelog: false, + HashRange: 10, // 10 blocks per hash range + AsyncWriteBuffer: 100, + KeepRecent: 100, + KeepLastVersion: true, + ImportNumWorkers: 4, } db, err := New(tempDir, cfg) diff --git a/ss/store.go b/ss/store.go index b48f3247..cd64d3e5 100644 --- a/ss/store.go +++ b/ss/store.go @@ -41,17 +41,16 @@ func NewStateStore(logger logger.Logger, homeDir string, ssConfig config.StateSt if err != nil { return nil, err } + // Handle auto recovery for DB running with async mode - if ssConfig.DedicatedChangelog { - changelogPath := utils.GetChangelogPath(utils.GetStateStorePath(homeDir, ssConfig.Backend)) - if ssConfig.DBDirectory != "" { - changelogPath = utils.GetChangelogPath(ssConfig.DBDirectory) - } - err := RecoverStateStore(logger, changelogPath, stateStore) - if err != nil { - return nil, err - } + changelogPath := utils.GetChangelogPath(utils.GetStateStorePath(homeDir, ssConfig.Backend)) + if ssConfig.DBDirectory != "" { + changelogPath = utils.GetChangelogPath(ssConfig.DBDirectory) + } + if err := RecoverStateStore(logger, changelogPath, stateStore); err != nil { + return nil, err } + // Start the pruning manager for DB pruningManager := pruning.NewPruningManager(logger, stateStore, int64(ssConfig.KeepRecent), int64(ssConfig.PruneIntervalSeconds)) pruningManager.Start() @@ -62,9 +61,6 @@ func NewStateStore(logger logger.Logger, homeDir string, ssConfig config.StateSt func RecoverStateStore(logger logger.Logger, changelogPath string, stateStore types.StateStore) error { ssLatestVersion := stateStore.GetLatestVersion() logger.Info(fmt.Sprintf("Recovering from changelog %s with latest SS version %d", changelogPath, ssLatestVersion)) - if ssLatestVersion <= 0 { - return nil - } streamHandler, err := changelog.NewStream(logger, changelogPath, changelog.Config{}) if err != nil { return err @@ -84,15 +80,20 @@ func RecoverStateStore(logger logger.Logger, changelogPath string, stateStore ty // Look backward to find where we should start replay from curVersion := lastEntry.Version curOffset := lastOffset - for curVersion > ssLatestVersion && curOffset > firstOffset { - curOffset-- - curEntry, errRead := streamHandler.ReadAt(curOffset) - if errRead != nil { - return err + if ssLatestVersion > 0 { + for curVersion > ssLatestVersion && curOffset > firstOffset { + curOffset-- + curEntry, errRead := streamHandler.ReadAt(curOffset) + if errRead != nil { + return err + } + curVersion = curEntry.Version } - curVersion = curEntry.Version + } else { + // Fresh store (or no applied versions) – start from the first offset + curOffset = firstOffset } - // Replay from the offset where the offset where the version is larger than SS store latest version + // Replay from the offset where the version is larger than SS store latest version targetStartOffset := curOffset logger.Info(fmt.Sprintf("Start replaying changelog to recover StateStore from offset %d to %d", targetStartOffset, lastOffset)) if targetStartOffset < lastOffset { diff --git a/ss/store_test.go b/ss/store_test.go index 081e8a9f..225efc3f 100644 --- a/ss/store_test.go +++ b/ss/store_test.go @@ -15,16 +15,15 @@ import ( func TestNewStateStore(t *testing.T) { tempDir := os.TempDir() - homeDir := filepath.Join(tempDir, "seidb") + homeDir := filepath.Join(tempDir, "pebbledb") ssConfig := config.StateStoreConfig{ - DedicatedChangelog: true, - Backend: string(PebbleDBBackend), - AsyncWriteBuffer: 50, - KeepRecent: 500, + Backend: string(PebbleDBBackend), + AsyncWriteBuffer: 100, + KeepRecent: 500, } stateStore, err := NewStateStore(logger.NewNopLogger(), homeDir, ssConfig) require.NoError(t, err) - for i := 1; i < 20; i++ { + for i := 1; i < 50; i++ { var changesets []*proto.NamedChangeSet kvPair := &iavl.KVPair{ Delete: false, @@ -51,7 +50,7 @@ func TestNewStateStore(t *testing.T) { require.NoError(t, err) // Make sure key and values can be found - for i := 1; i < 20; i++ { + for i := 1; i < 50; i++ { value, err := stateStore.Get("storeA", int64(i), []byte(fmt.Sprintf("key%d", i))) require.NoError(t, err) require.Equal(t, fmt.Sprintf("value%d", i), string(value)) From c88ff8673aaea5b0e32452e577160eb7234b0509 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Wed, 8 Oct 2025 17:30:21 -0700 Subject: [PATCH 2/4] Fix for rocksdb --- ss/rocksdb/db.go | 35 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/ss/rocksdb/db.go b/ss/rocksdb/db.go index e16c05ab..3581fd51 100644 --- a/ss/rocksdb/db.go +++ b/ss/rocksdb/db.go @@ -110,20 +110,18 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer), } - if config.DedicatedChangelog { - streamHandler, _ := changelog.NewStream( - logger.NewNopLogger(), - utils.GetChangelogPath(dataDir), - changelog.Config{ - DisableFsync: true, - ZeroCopy: true, - KeepRecent: uint64(config.KeepRecent), - PruneInterval: 300 * time.Second, - }, - ) - database.streamHandler = streamHandler - go database.writeAsyncInBackground() - } + streamHandler, _ := changelog.NewStream( + logger.NewNopLogger(), + utils.GetChangelogPath(dataDir), + changelog.Config{ + DisableFsync: true, + ZeroCopy: true, + KeepRecent: uint64(config.KeepRecent), + PruneInterval: time.Duration(config.PruneIntervalSeconds) * time.Second, + }, + ) + database.streamHandler = streamHandler + go database.writeAsyncInBackground() return database, nil } @@ -494,17 +492,16 @@ func (db *Database) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlo } func (db *Database) Close() error { + // Close the pending changes channel to signal the background goroutine to stop + close(db.pendingChanges) + // Wait for the async writes to finish processing all buffered items + db.asyncWriteWG.Wait() if db.streamHandler != nil { // Close the changelog stream first db.streamHandler.Close() - // Close the pending changes channel to signal the background goroutine to stop - close(db.pendingChanges) - // Wait for the async writes to finish processing all buffered items - db.asyncWriteWG.Wait() // Only set to nil after background goroutine has finished db.streamHandler = nil } - db.storage.Close() db.storage = nil db.cfHandle = nil From 42ec3bc07c84453182ed412ff82d2f6c2dbb031e Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Thu, 9 Oct 2025 08:44:51 -0700 Subject: [PATCH 3/4] Address comment --- ss/pebbledb/db.go | 10 +++++----- ss/rocksdb/db.go | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index d09910f7..280c35a6 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -172,12 +172,12 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { } func (db *Database) Close() error { - // First, stop accepting new pending changes and drain the worker - close(db.pendingChanges) - // Wait for the async writes to finish - db.asyncWriteWG.Wait() - // Now close the WAL stream if db.streamHandler != nil { + // First, stop accepting new pending changes and drain the worker + close(db.pendingChanges) + // Wait for the async writes to finish + db.asyncWriteWG.Wait() + // Now close the WAL stream _ = db.streamHandler.Close() db.streamHandler = nil } diff --git a/ss/rocksdb/db.go b/ss/rocksdb/db.go index 3581fd51..62fd244c 100644 --- a/ss/rocksdb/db.go +++ b/ss/rocksdb/db.go @@ -492,13 +492,13 @@ func (db *Database) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlo } func (db *Database) Close() error { - // Close the pending changes channel to signal the background goroutine to stop - close(db.pendingChanges) - // Wait for the async writes to finish processing all buffered items - db.asyncWriteWG.Wait() if db.streamHandler != nil { + // Close the pending changes channel to signal the background goroutine to stop + close(db.pendingChanges) + // Wait for the async writes to finish processing all buffered items + db.asyncWriteWG.Wait() // Close the changelog stream first - db.streamHandler.Close() + _ = db.streamHandler.Close() // Only set to nil after background goroutine has finished db.streamHandler = nil } From 2f2afd76dce6d0b4b41159da02aa16eb6f8c1445 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Thu, 9 Oct 2025 09:14:16 -0700 Subject: [PATCH 4/4] Fix anotehr flaky test --- ss/pebbledb/hash_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ss/pebbledb/hash_test.go b/ss/pebbledb/hash_test.go index f61039b8..5b52acf4 100644 --- a/ss/pebbledb/hash_test.go +++ b/ss/pebbledb/hash_test.go @@ -339,7 +339,7 @@ func TestAsyncComputeMissingRanges(t *testing.T) { require.NoError(t, err) // Wait a bit for the async computation to complete - time.Sleep(200 * time.Millisecond) + time.Sleep(500 * time.Millisecond) // We should now have hashed up to version 30 (3 complete ranges) lastHashed, err := db.GetLastRangeHashed()