Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ type StateStoreConfig struct {
// default to empty
DBDirectory string `mapstructure:"db-directory"`

// DedicatedChangelog defines if we should use a separate changelog for SS store other than sharing with SC
DedicatedChangelog bool `mapstructure:"dedicated-changelog"`

// Backend defines the backend database used for state-store
// Supported backends: pebbledb, rocksdb
// defaults to pebbledb
Expand Down Expand Up @@ -101,7 +98,6 @@ func DefaultStateCommitConfig() StateCommitConfig {
return StateCommitConfig{
Enable: true,
AsyncCommitBuffer: DefaultAsyncCommitBuffer,
CacheSize: DefaultCacheSize,
SnapshotInterval: DefaultSnapshotInterval,
SnapshotKeepRecent: DefaultSnapshotKeepRecent,
}
Expand Down
41 changes: 21 additions & 20 deletions ss/pebbledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,34 +152,35 @@
}
database.lastRangeHashedCache = lastHashed

if config.DedicatedChangelog {
if config.KeepRecent < 0 {
return nil, errors.New("KeepRecent must be non-negative")
}
streamHandler, _ := changelog.NewStream(
logger.NewNopLogger(),
utils.GetChangelogPath(dataDir),
changelog.Config{
DisableFsync: true,
ZeroCopy: true,
KeepRecent: uint64(config.KeepRecent),
PruneInterval: 300 * time.Second,
},
)
database.streamHandler = streamHandler
go database.writeAsyncInBackground()
}
if config.KeepRecent < 0 {
return nil, errors.New("KeepRecent must be non-negative")
}
streamHandler, _ := changelog.NewStream(
logger.NewNopLogger(),
utils.GetChangelogPath(dataDir),
changelog.Config{
DisableFsync: true,
ZeroCopy: true,
KeepRecent: uint64(config.KeepRecent),
PruneInterval: time.Duration(config.PruneIntervalSeconds) * time.Second,
},
)
database.streamHandler = streamHandler
go database.writeAsyncInBackground()

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism

return database, nil
}

func (db *Database) Close() error {
if db.streamHandler != nil {
// First, stop accepting new pending changes and drain the worker
close(db.pendingChanges)
// Wait for the async writes to finish
db.asyncWriteWG.Wait()
// Now close the WAL stream
_ = db.streamHandler.Close()
db.streamHandler = nil
close(db.pendingChanges)
}
// Wait for the async writes to finish
db.asyncWriteWG.Wait()
err := db.storage.Close()
db.storage = nil
return err
Expand Down
13 changes: 6 additions & 7 deletions ss/pebbledb/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ func setupTestDB(t *testing.T) (*Database, string) {

// Set up config with hash range enabled
cfg := config.StateStoreConfig{
HashRange: 10, // 10 blocks per hash range
AsyncWriteBuffer: 100,
KeepRecent: 100,
KeepLastVersion: true,
ImportNumWorkers: 4,
DedicatedChangelog: false,
HashRange: 10, // 10 blocks per hash range
AsyncWriteBuffer: 100,
KeepRecent: 100,
KeepLastVersion: true,
ImportNumWorkers: 4,
}

db, err := New(tempDir, cfg)
Expand Down Expand Up @@ -340,7 +339,7 @@ func TestAsyncComputeMissingRanges(t *testing.T) {
require.NoError(t, err)

// Wait a bit for the async computation to complete
time.Sleep(200 * time.Millisecond)
time.Sleep(500 * time.Millisecond)

// We should now have hashed up to version 30 (3 complete ranges)
lastHashed, err := db.GetLastRangeHashed()
Expand Down
31 changes: 14 additions & 17 deletions ss/rocksdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,18 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) {
pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer),
}

if config.DedicatedChangelog {
streamHandler, _ := changelog.NewStream(
logger.NewNopLogger(),
utils.GetChangelogPath(dataDir),
changelog.Config{
DisableFsync: true,
ZeroCopy: true,
KeepRecent: uint64(config.KeepRecent),
PruneInterval: 300 * time.Second,
},
)
database.streamHandler = streamHandler
go database.writeAsyncInBackground()
}
streamHandler, _ := changelog.NewStream(
logger.NewNopLogger(),
utils.GetChangelogPath(dataDir),
changelog.Config{
DisableFsync: true,
ZeroCopy: true,
KeepRecent: uint64(config.KeepRecent),
PruneInterval: time.Duration(config.PruneIntervalSeconds) * time.Second,
},
)
database.streamHandler = streamHandler
go database.writeAsyncInBackground()

return database, nil
}
Expand Down Expand Up @@ -495,16 +493,15 @@ func (db *Database) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlo

func (db *Database) Close() error {
if db.streamHandler != nil {
// Close the changelog stream first
db.streamHandler.Close()
// Close the pending changes channel to signal the background goroutine to stop
close(db.pendingChanges)
// Wait for the async writes to finish processing all buffered items
db.asyncWriteWG.Wait()
// Close the changelog stream first
_ = db.streamHandler.Close()
// Only set to nil after background goroutine has finished
db.streamHandler = nil
}

db.storage.Close()
db.storage = nil
db.cfHandle = nil
Expand Down
39 changes: 20 additions & 19 deletions ss/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,16 @@ func NewStateStore(logger logger.Logger, homeDir string, ssConfig config.StateSt
if err != nil {
return nil, err
}

// Handle auto recovery for DB running with async mode
if ssConfig.DedicatedChangelog {
changelogPath := utils.GetChangelogPath(utils.GetStateStorePath(homeDir, ssConfig.Backend))
if ssConfig.DBDirectory != "" {
changelogPath = utils.GetChangelogPath(ssConfig.DBDirectory)
}
err := RecoverStateStore(logger, changelogPath, stateStore)
if err != nil {
return nil, err
}
changelogPath := utils.GetChangelogPath(utils.GetStateStorePath(homeDir, ssConfig.Backend))
if ssConfig.DBDirectory != "" {
changelogPath = utils.GetChangelogPath(ssConfig.DBDirectory)
}
if err := RecoverStateStore(logger, changelogPath, stateStore); err != nil {
return nil, err
}

// Start the pruning manager for DB
pruningManager := pruning.NewPruningManager(logger, stateStore, int64(ssConfig.KeepRecent), int64(ssConfig.PruneIntervalSeconds))
pruningManager.Start()
Expand All @@ -62,9 +61,6 @@ func NewStateStore(logger logger.Logger, homeDir string, ssConfig config.StateSt
func RecoverStateStore(logger logger.Logger, changelogPath string, stateStore types.StateStore) error {
ssLatestVersion := stateStore.GetLatestVersion()
logger.Info(fmt.Sprintf("Recovering from changelog %s with latest SS version %d", changelogPath, ssLatestVersion))
if ssLatestVersion <= 0 {
return nil
}
streamHandler, err := changelog.NewStream(logger, changelogPath, changelog.Config{})
if err != nil {
return err
Expand All @@ -84,15 +80,20 @@ func RecoverStateStore(logger logger.Logger, changelogPath string, stateStore ty
// Look backward to find where we should start replay from
curVersion := lastEntry.Version
curOffset := lastOffset
for curVersion > ssLatestVersion && curOffset > firstOffset {
curOffset--
curEntry, errRead := streamHandler.ReadAt(curOffset)
if errRead != nil {
return err
if ssLatestVersion > 0 {
for curVersion > ssLatestVersion && curOffset > firstOffset {
curOffset--
curEntry, errRead := streamHandler.ReadAt(curOffset)
if errRead != nil {
return err
}
curVersion = curEntry.Version
}
curVersion = curEntry.Version
} else {
// Fresh store (or no applied versions) – start from the first offset
curOffset = firstOffset
}
// Replay from the offset where the offset where the version is larger than SS store latest version
// Replay from the offset where the version is larger than SS store latest version
targetStartOffset := curOffset
logger.Info(fmt.Sprintf("Start replaying changelog to recover StateStore from offset %d to %d", targetStartOffset, lastOffset))
if targetStartOffset < lastOffset {
Expand Down
13 changes: 6 additions & 7 deletions ss/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@ import (

func TestNewStateStore(t *testing.T) {
tempDir := os.TempDir()
homeDir := filepath.Join(tempDir, "seidb")
homeDir := filepath.Join(tempDir, "pebbledb")
ssConfig := config.StateStoreConfig{
DedicatedChangelog: true,
Backend: string(PebbleDBBackend),
AsyncWriteBuffer: 50,
KeepRecent: 500,
Backend: string(PebbleDBBackend),
AsyncWriteBuffer: 100,
KeepRecent: 500,
}
stateStore, err := NewStateStore(logger.NewNopLogger(), homeDir, ssConfig)
require.NoError(t, err)
for i := 1; i < 20; i++ {
for i := 1; i < 50; i++ {
var changesets []*proto.NamedChangeSet
kvPair := &iavl.KVPair{
Delete: false,
Expand All @@ -51,7 +50,7 @@ func TestNewStateStore(t *testing.T) {
require.NoError(t, err)

// Make sure key and values can be found
for i := 1; i < 20; i++ {
for i := 1; i < 50; i++ {
value, err := stateStore.Get("storeA", int64(i), []byte(fmt.Sprintf("key%d", i)))
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("value%d", i), string(value))
Expand Down
Loading