diff --git a/config/config.go b/config/config.go index 5fda62a..905e7a5 100644 --- a/config/config.go +++ b/config/config.go @@ -62,9 +62,6 @@ type StateStoreConfig struct { // default to empty DBDirectory string `mapstructure:"db-directory"` - // DedicatedChangelog defines if we should use a separate changelog for SS store other than sharing with SC - DedicatedChangelog bool `mapstructure:"dedicated-changelog"` - // Backend defines the backend database used for state-store // Supported backends: pebbledb, rocksdb // defaults to pebbledb @@ -101,7 +98,6 @@ func DefaultStateCommitConfig() StateCommitConfig { return StateCommitConfig{ Enable: true, AsyncCommitBuffer: DefaultAsyncCommitBuffer, - CacheSize: DefaultCacheSize, SnapshotInterval: DefaultSnapshotInterval, SnapshotKeepRecent: DefaultSnapshotKeepRecent, } diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index b55fdb4..280c35a 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -152,34 +152,35 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { } database.lastRangeHashedCache = lastHashed - if config.DedicatedChangelog { - if config.KeepRecent < 0 { - return nil, errors.New("KeepRecent must be non-negative") - } - streamHandler, _ := changelog.NewStream( - logger.NewNopLogger(), - utils.GetChangelogPath(dataDir), - changelog.Config{ - DisableFsync: true, - ZeroCopy: true, - KeepRecent: uint64(config.KeepRecent), - PruneInterval: 300 * time.Second, - }, - ) - database.streamHandler = streamHandler - go database.writeAsyncInBackground() - } + if config.KeepRecent < 0 { + return nil, errors.New("KeepRecent must be non-negative") + } + streamHandler, _ := changelog.NewStream( + logger.NewNopLogger(), + utils.GetChangelogPath(dataDir), + changelog.Config{ + DisableFsync: true, + ZeroCopy: true, + KeepRecent: uint64(config.KeepRecent), + PruneInterval: time.Duration(config.PruneIntervalSeconds) * time.Second, + }, + ) + database.streamHandler = streamHandler + go database.writeAsyncInBackground() + return database, nil } func (db *Database) Close() error { if db.streamHandler != nil { + // First, stop accepting new pending changes and drain the worker + close(db.pendingChanges) + // Wait for the async writes to finish + db.asyncWriteWG.Wait() + // Now close the WAL stream _ = db.streamHandler.Close() db.streamHandler = nil - close(db.pendingChanges) } - // Wait for the async writes to finish - db.asyncWriteWG.Wait() err := db.storage.Close() db.storage = nil return err diff --git a/ss/pebbledb/hash_test.go b/ss/pebbledb/hash_test.go index 76e0d23..5b52acf 100644 --- a/ss/pebbledb/hash_test.go +++ b/ss/pebbledb/hash_test.go @@ -22,12 +22,11 @@ func setupTestDB(t *testing.T) (*Database, string) { // Set up config with hash range enabled cfg := config.StateStoreConfig{ - HashRange: 10, // 10 blocks per hash range - AsyncWriteBuffer: 100, - KeepRecent: 100, - KeepLastVersion: true, - ImportNumWorkers: 4, - DedicatedChangelog: false, + HashRange: 10, // 10 blocks per hash range + AsyncWriteBuffer: 100, + KeepRecent: 100, + KeepLastVersion: true, + ImportNumWorkers: 4, } db, err := New(tempDir, cfg) @@ -340,7 +339,7 @@ func TestAsyncComputeMissingRanges(t *testing.T) { require.NoError(t, err) // Wait a bit for the async computation to complete - time.Sleep(200 * time.Millisecond) + time.Sleep(500 * time.Millisecond) // We should now have hashed up to version 30 (3 complete ranges) lastHashed, err := db.GetLastRangeHashed() diff --git a/ss/rocksdb/db.go b/ss/rocksdb/db.go index e16c05a..62fd244 100644 --- a/ss/rocksdb/db.go +++ b/ss/rocksdb/db.go @@ -110,20 +110,18 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer), } - if config.DedicatedChangelog { - streamHandler, _ := changelog.NewStream( - logger.NewNopLogger(), - utils.GetChangelogPath(dataDir), - changelog.Config{ - DisableFsync: true, - ZeroCopy: true, - KeepRecent: uint64(config.KeepRecent), - PruneInterval: 300 * time.Second, - }, - ) - database.streamHandler = streamHandler - go database.writeAsyncInBackground() - } + streamHandler, _ := changelog.NewStream( + logger.NewNopLogger(), + utils.GetChangelogPath(dataDir), + changelog.Config{ + DisableFsync: true, + ZeroCopy: true, + KeepRecent: uint64(config.KeepRecent), + PruneInterval: time.Duration(config.PruneIntervalSeconds) * time.Second, + }, + ) + database.streamHandler = streamHandler + go database.writeAsyncInBackground() return database, nil } @@ -495,16 +493,15 @@ func (db *Database) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlo func (db *Database) Close() error { if db.streamHandler != nil { - // Close the changelog stream first - db.streamHandler.Close() // Close the pending changes channel to signal the background goroutine to stop close(db.pendingChanges) // Wait for the async writes to finish processing all buffered items db.asyncWriteWG.Wait() + // Close the changelog stream first + _ = db.streamHandler.Close() // Only set to nil after background goroutine has finished db.streamHandler = nil } - db.storage.Close() db.storage = nil db.cfHandle = nil diff --git a/ss/store.go b/ss/store.go index b48f324..cd64d3e 100644 --- a/ss/store.go +++ b/ss/store.go @@ -41,17 +41,16 @@ func NewStateStore(logger logger.Logger, homeDir string, ssConfig config.StateSt if err != nil { return nil, err } + // Handle auto recovery for DB running with async mode - if ssConfig.DedicatedChangelog { - changelogPath := utils.GetChangelogPath(utils.GetStateStorePath(homeDir, ssConfig.Backend)) - if ssConfig.DBDirectory != "" { - changelogPath = utils.GetChangelogPath(ssConfig.DBDirectory) - } - err := RecoverStateStore(logger, changelogPath, stateStore) - if err != nil { - return nil, err - } + changelogPath := utils.GetChangelogPath(utils.GetStateStorePath(homeDir, ssConfig.Backend)) + if ssConfig.DBDirectory != "" { + changelogPath = utils.GetChangelogPath(ssConfig.DBDirectory) + } + if err := RecoverStateStore(logger, changelogPath, stateStore); err != nil { + return nil, err } + // Start the pruning manager for DB pruningManager := pruning.NewPruningManager(logger, stateStore, int64(ssConfig.KeepRecent), int64(ssConfig.PruneIntervalSeconds)) pruningManager.Start() @@ -62,9 +61,6 @@ func NewStateStore(logger logger.Logger, homeDir string, ssConfig config.StateSt func RecoverStateStore(logger logger.Logger, changelogPath string, stateStore types.StateStore) error { ssLatestVersion := stateStore.GetLatestVersion() logger.Info(fmt.Sprintf("Recovering from changelog %s with latest SS version %d", changelogPath, ssLatestVersion)) - if ssLatestVersion <= 0 { - return nil - } streamHandler, err := changelog.NewStream(logger, changelogPath, changelog.Config{}) if err != nil { return err @@ -84,15 +80,20 @@ func RecoverStateStore(logger logger.Logger, changelogPath string, stateStore ty // Look backward to find where we should start replay from curVersion := lastEntry.Version curOffset := lastOffset - for curVersion > ssLatestVersion && curOffset > firstOffset { - curOffset-- - curEntry, errRead := streamHandler.ReadAt(curOffset) - if errRead != nil { - return err + if ssLatestVersion > 0 { + for curVersion > ssLatestVersion && curOffset > firstOffset { + curOffset-- + curEntry, errRead := streamHandler.ReadAt(curOffset) + if errRead != nil { + return err + } + curVersion = curEntry.Version } - curVersion = curEntry.Version + } else { + // Fresh store (or no applied versions) – start from the first offset + curOffset = firstOffset } - // Replay from the offset where the offset where the version is larger than SS store latest version + // Replay from the offset where the version is larger than SS store latest version targetStartOffset := curOffset logger.Info(fmt.Sprintf("Start replaying changelog to recover StateStore from offset %d to %d", targetStartOffset, lastOffset)) if targetStartOffset < lastOffset { diff --git a/ss/store_test.go b/ss/store_test.go index 081e8a9..225efc3 100644 --- a/ss/store_test.go +++ b/ss/store_test.go @@ -15,16 +15,15 @@ import ( func TestNewStateStore(t *testing.T) { tempDir := os.TempDir() - homeDir := filepath.Join(tempDir, "seidb") + homeDir := filepath.Join(tempDir, "pebbledb") ssConfig := config.StateStoreConfig{ - DedicatedChangelog: true, - Backend: string(PebbleDBBackend), - AsyncWriteBuffer: 50, - KeepRecent: 500, + Backend: string(PebbleDBBackend), + AsyncWriteBuffer: 100, + KeepRecent: 500, } stateStore, err := NewStateStore(logger.NewNopLogger(), homeDir, ssConfig) require.NoError(t, err) - for i := 1; i < 20; i++ { + for i := 1; i < 50; i++ { var changesets []*proto.NamedChangeSet kvPair := &iavl.KVPair{ Delete: false, @@ -51,7 +50,7 @@ func TestNewStateStore(t *testing.T) { require.NoError(t, err) // Make sure key and values can be found - for i := 1; i < 20; i++ { + for i := 1; i < 50; i++ { value, err := stateStore.Get("storeA", int64(i), []byte(fmt.Sprintf("key%d", i))) require.NoError(t, err) require.Equal(t, fmt.Sprintf("value%d", i), string(value))