Skip to content

Commit

Permalink
Merge pull request #8628 from vegaprotocol/fix/use-vegapath-snapshot-…
Browse files Browse the repository at this point in the history
…engine

Use vega home to locate snapshot databases
  • Loading branch information
jeremyletang committed Jun 29, 2023
2 parents 3daabbb + 7b7fa23 commit 6bc4498
Show file tree
Hide file tree
Showing 11 changed files with 365 additions and 242 deletions.
55 changes: 14 additions & 41 deletions core/snapshot/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,21 @@
package snapshot

import (
"errors"
"os"

"code.vegaprotocol.io/vega/core/types"
"code.vegaprotocol.io/vega/libs/config/encoding"
"code.vegaprotocol.io/vega/logging"
"code.vegaprotocol.io/vega/paths"
)

const (
namedLogger = "snapshot"
goLevelDB = "GOLevelDB"
memDB = "memory"
LevelDB = "GOLevelDB"
InMemoryDB = "memory"
)

type Config struct {
Level encoding.LogLevel `choice:"debug" choice:"info" choice:"warning" choice:"error" choice:"panic" choice:"fatal" description:"Logging level (default: info)" long:"log-level"`
KeepRecent int `description:"Number of historic snapshots to keep on disk. Limited to the 10 most recent ones" long:"snapshot-keep-recent"`
RetryLimit int `description:"Maximum number of times to try and apply snapshot chunk" long:"max-retries"`
KeepRecent uint `description:"Number of historic snapshots to keep on disk. Limited to the 10 most recent ones" long:"snapshot-keep-recent"`
RetryLimit uint `description:"Maximum number of times to try and apply snapshot chunk" long:"max-retries"`
Storage string `choice:"GOLevelDB" choice:"memory" description:"Storage type to use" long:"storage"`
DBPath string `description:"Path to database" long:"db-path"`
StartHeight int64 `description:"Load from a snapshot at the given block-height. Setting to -1 will load from the latest snapshot available, 0 will force the chain to replay if not using statesync" long:"load-from-block-height"`
}

Expand All @@ -44,44 +38,23 @@ func NewDefaultConfig() Config {
Level: encoding.LogLevel{Level: logging.InfoLevel},
KeepRecent: 10,
RetryLimit: 5,
Storage: goLevelDB,
Storage: LevelDB,
StartHeight: -1,
}
}

func NewTestConfig() Config {
cfg := NewDefaultConfig()
cfg.Storage = memDB
return cfg
}

// validate checks the values in the config file are sensible, and returns the path
// which is create/load the snapshots from.
func (c *Config) validate(vegapath paths.Paths) (string, error) {
if len(c.DBPath) != 0 && c.Storage == memDB {
return "", errors.New("dbpath cannot be set when storage method is in-memory")
// Validate checks the values in the config file are sensible.
func (c *Config) Validate() error {
// default to min 1 version, just so we don't have to account for nil slice.
// A single version kept in memory is pretty harmless.
if c.KeepRecent < 1 {
c.KeepRecent = 1
}

switch c.Storage {
case memDB:
return "", nil
case goLevelDB:

if len(c.DBPath) == 0 {
return vegapath.StatePathFor(paths.SnapshotStateHome), nil
}

stat, err := os.Stat(c.DBPath)
if err != nil {
return "", err
}

if !stat.IsDir() {
return "", errors.New("snapshot DB path is not a directory")
}

return c.DBPath, nil
case InMemoryDB, LevelDB:
return nil
default:
return "", types.ErrInvalidSnapshotStorageMethod
return types.ErrInvalidSnapshotStorageMethod
}
}
52 changes: 52 additions & 0 deletions core/snapshot/databases/metadata/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package metadata

import (
"strconv"

"code.vegaprotocol.io/vega/libs/proto"

tmtypes "github.com/tendermint/tendermint/abci/types"
)

type Adapter interface {
Save(version []byte, state []byte) error
Load(version []byte) (state []byte, err error)
Close() error
Clear() error
}

type Database struct {
Adapter
}

func (d *Database) Save(version int64, state *tmtypes.Snapshot) error {
bufV := strconv.FormatInt(version, 10)
bufS, err := proto.Marshal(state)
if err != nil {
return err
}

return d.Adapter.Save([]byte(bufV), bufS)
}

func (d *Database) Load(version int64) (*tmtypes.Snapshot, error) {
bufV := strconv.FormatInt(version, 10)
state, err := d.Adapter.Load([]byte(bufV))
if err != nil {
return nil, err
}

out := &tmtypes.Snapshot{}
err = proto.Unmarshal(state, out)
if err != nil {
return nil, err
}

return out, nil
}

func NewDatabase(adapter Adapter) *Database {
return &Database{
Adapter: adapter,
}
}
37 changes: 37 additions & 0 deletions core/snapshot/databases/metadata/in_memory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package metadata

import "errors"

var ErrUnknownSnapshotVersion = errors.New("unknown snapshot version")

type InMemoryAdapter struct {
store map[string][]byte
}

func (a *InMemoryAdapter) Save(version []byte, state []byte) error {
a.store[string(version)] = state
return nil
}

func (a *InMemoryAdapter) Load(version []byte) (state []byte, err error) {
s, ok := a.store[string(version)]
if !ok {
return nil, ErrUnknownSnapshotVersion
}
return s, nil
}

func (a *InMemoryAdapter) Close() error {
return nil
}

func (a *InMemoryAdapter) Clear() error {
a.store = map[string][]byte{}
return nil
}

func NewInMemoryAdapter() *InMemoryAdapter {
return &InMemoryAdapter{
store: map[string][]byte{},
}
}
77 changes: 77 additions & 0 deletions core/snapshot/databases/metadata/level_db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package metadata

import (
"fmt"
"os"

"code.vegaprotocol.io/vega/paths"
"github.com/syndtr/goleveldb/leveldb/opt"
db "github.com/tendermint/tm-db"
)

const metaDBName = "snapshot_meta"

type LevelDBAdapter struct {
filePath string

underlyingAdapter *db.GoLevelDB
}

func (a *LevelDBAdapter) Save(version []byte, state []byte) error {
return a.underlyingAdapter.Set(version, state)
}

func (a *LevelDBAdapter) Load(version []byte) ([]byte, error) {
return a.underlyingAdapter.Get(version)
}

func (a *LevelDBAdapter) Close() error {
return a.underlyingAdapter.Close()
}

func (a *LevelDBAdapter) Clear() error {
if err := a.underlyingAdapter.Close(); err != nil {
return fmt.Errorf("could not close the connection: %w", err)
}

if err := os.RemoveAll(a.filePath); err != nil {
return fmt.Errorf("could not remove the database file: %w", err)
}

underlyingAdapter, err := initializeUnderlyingAdapter(a.filePath)
if err != nil {
return err
}
a.underlyingAdapter = underlyingAdapter

return nil
}

func NewLevelDBAdapter(vegaPaths paths.Paths) (*LevelDBAdapter, error) {
filePath := vegaPaths.StatePathFor(paths.SnapshotMetadataDBStateFile)

underlyingAdapter, err := initializeUnderlyingAdapter(filePath)
if err != nil {
return nil, err
}

return &LevelDBAdapter{
filePath: filePath,
underlyingAdapter: underlyingAdapter,
}, nil
}

func initializeUnderlyingAdapter(filePath string) (*db.GoLevelDB, error) {
underlyingAdapter, err := db.NewGoLevelDBWithOpts(
metaDBName,
filePath,
&opt.Options{
BlockCacher: opt.NoCacher,
OpenFilesCacher: opt.NoCacher,
},
)
if err != nil {
return nil, fmt.Errorf("could not initialize LevelDB adapter: %w", err)
}
return underlyingAdapter, nil
}
20 changes: 20 additions & 0 deletions core/snapshot/databases/snapshot/in_memory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package metadata

import (
db "github.com/tendermint/tm-db"
)

type InMemoryDatabase struct {
*db.MemDB
}

func (d *InMemoryDatabase) Clear() error {
d.MemDB = db.NewMemDB()
return nil
}

func NewInMemoryDatabase() *InMemoryDatabase {
return &InMemoryDatabase{
MemDB: db.NewMemDB(),
}
}
67 changes: 67 additions & 0 deletions core/snapshot/databases/snapshot/level_db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package metadata

import (
"fmt"
"os"

"code.vegaprotocol.io/vega/paths"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
db "github.com/tendermint/tm-db"
)

const dbName = "snapshot"

type LevelDBDatabase struct {
*db.GoLevelDB

filePath string
}

func (d *LevelDBDatabase) Clear() error {
if err := d.GoLevelDB.Close(); err != nil {
return fmt.Errorf("could not close the connection: %w", err)
}

if err := os.RemoveAll(d.filePath); err != nil {
return fmt.Errorf("could not remove the database file: %w", err)
}

adapter, err := initializeUnderlyingAdapter(d.filePath)
if err != nil {
return err
}
d.GoLevelDB = adapter

return nil
}

func NewLevelDBDatabase(vegaPaths paths.Paths) (*LevelDBDatabase, error) {
filePath := vegaPaths.StatePathFor(paths.SnapshotDBStateFile)

adapter, err := initializeUnderlyingAdapter(filePath)
if err != nil {
return nil, err
}

return &LevelDBDatabase{
filePath: filePath,
GoLevelDB: adapter,
}, nil
}

func initializeUnderlyingAdapter(filePath string) (*db.GoLevelDB, error) {
adapter, err := db.NewGoLevelDBWithOpts(
dbName, filePath,
&opt.Options{
Filter: filter.NewBloomFilter(10),
BlockCacher: opt.NoCacher,
OpenFilesCacher: opt.NoCacher,
},
)
if err != nil {
return nil, fmt.Errorf("could not initialize LevelDB adapter: %w", err)
}

return adapter, nil
}
Loading

0 comments on commit 6bc4498

Please sign in to comment.