From 6c50e19b1ace440897d3143f3879c072be64cbb2 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Sat, 29 Jul 2023 19:58:31 +0400 Subject: [PATCH] Introduce new storage component Peapod Currently, storage node saves relatively small NeoFS objects in Blobovnicza tree component: group of BoltDB wrappers managed as a tree. This component has pretty complex data structure, code implementation and dubious performance results. Peapod is a new storage component introduced to replace Blobovnicza one as more simple and effective. It also bases on single BoltDB instance, but organizes batch writes in a specific way. In future, Peapod is going to be used as a storage of small objects by the BlobStor. Signed-off-by: Leonard Lyubich --- .../blobstor/peapod/peapod.go | 184 ++++++++++++++++++ .../blobstor/peapod/peapod_test.go | 33 ++++ .../peapod/common_test.go | 18 ++ pkg/local_object_storage/peapod/peapod.go | 174 +++++++++++++++++ pkg/local_object_storage/peapod/read.go | 89 +++++++++ pkg/local_object_storage/peapod/read_test.go | 74 +++++++ pkg/local_object_storage/peapod/write.go | 70 +++++++ pkg/local_object_storage/peapod/write_test.go | 88 +++++++++ 8 files changed, 730 insertions(+) create mode 100644 pkg/local_object_storage/blobstor/peapod/peapod.go create mode 100644 pkg/local_object_storage/blobstor/peapod/peapod_test.go create mode 100644 pkg/local_object_storage/peapod/common_test.go create mode 100644 pkg/local_object_storage/peapod/peapod.go create mode 100644 pkg/local_object_storage/peapod/read.go create mode 100644 pkg/local_object_storage/peapod/read_test.go create mode 100644 pkg/local_object_storage/peapod/write.go create mode 100644 pkg/local_object_storage/peapod/write_test.go diff --git a/pkg/local_object_storage/blobstor/peapod/peapod.go b/pkg/local_object_storage/blobstor/peapod/peapod.go new file mode 100644 index 00000000000..491f06a6068 --- /dev/null +++ b/pkg/local_object_storage/blobstor/peapod/peapod.go @@ -0,0 +1,184 @@ +package peapod + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/peapod" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.etcd.io/bbolt" +) + +type storage struct { + path string + + ppd *peapod.Peapod + + compress compression.Config +} + +func New(path string) common.Storage { + return &storage{ + path: path, + } +} + +func (x *storage) Open(readOnly bool) error { + var err error + + if x.ppd != nil { + err := x.ppd.Close() + if err != nil { + return fmt.Errorf("close Peapod: %w", err) + } + } + + x.ppd, err = peapod.New(x.path, readOnly) + return err +} + +func (x *storage) Init() error { + // no-op because peapod.New initializes everything + return nil +} + +func (x *storage) Close() error { + err := x.ppd.Close() + x.ppd = nil + return err +} + +// Type is peapod storage type used in logs and configuration. +const Type = "peapod" + +func (x *storage) Type() string { + return Type +} + +func (x *storage) Path() string { + return x.path +} + +func (x *storage) SetCompressor(cc *compression.Config) { + x.compress = *cc +} + +func (x *storage) SetReportErrorFunc(func(string, error)) { + // no-op like FSTree +} + +func (x *storage) Get(prm common.GetPrm) (common.GetRes, error) { + data, err := x.ppd.Get(prm.Address) + if err != nil { + if errors.Is(err, apistatus.ErrObjectNotFound) { + return common.GetRes{}, logicerr.Wrap(err) + } + return common.GetRes{}, err + } + + // copy-paste from FSTree + data, err = x.compress.Decompress(data) + if err != nil { + return common.GetRes{}, fmt.Errorf("decompress data: %w", err) + } + + obj := objectSDK.New() + if err := obj.Unmarshal(data); err != nil { + return common.GetRes{}, fmt.Errorf("decode object from binary: %w", err) + } + + return common.GetRes{Object: obj, RawData: data}, err +} + +func (x *storage) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) { + // copy-paste from FSTree + res, err := x.Get(common.GetPrm{Address: prm.Address}) + if err != nil { + return common.GetRangeRes{}, err + } + + payload := res.Object.Payload() + from := prm.Range.GetOffset() + to := from + prm.Range.GetLength() + + if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to { + return common.GetRangeRes{}, logicerr.Wrap(apistatus.ObjectOutOfRange{}) + } + + return common.GetRangeRes{ + Data: payload[from:to], + }, nil +} + +func (x *storage) Exists(prm common.ExistsPrm) (common.ExistsRes, error) { + var res common.ExistsRes + var err error + res.Exists, err = x.ppd.Exists(prm.Address) + return res, err +} + +func (x *storage) Put(prm common.PutPrm) (common.PutRes, error) { + if !prm.DontCompress { + prm.RawData = x.compress.Compress(prm.RawData) + } + + // TODO: create issue to support Put op context + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + + err := x.ppd.Put(ctx, prm.Address, prm.RawData) + if err != nil && errors.Is(err, bbolt.ErrDatabaseReadOnly) { + return common.PutRes{}, common.ErrReadOnly + } + + return common.PutRes{}, err +} + +func (x *storage) Delete(prm common.DeletePrm) (common.DeleteRes, error) { + // TODO: create issue to support Put op context + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + + err := x.ppd.Delete(ctx, prm.Address) + if err != nil { + if errors.Is(err, bbolt.ErrDatabaseReadOnly) { + return common.DeleteRes{}, common.ErrReadOnly + } + if errors.Is(err, apistatus.ErrObjectNotFound) { + return common.DeleteRes{}, logicerr.Wrap(err) + } + return common.DeleteRes{}, err + } + + return common.DeleteRes{}, err +} + +func (x *storage) Iterate(prm common.IteratePrm) (common.IterateRes, error) { + var e error + err := x.ppd.Iterate(func(addr oid.Address, data []byte) bool { + if prm.LazyHandler != nil { + e = prm.LazyHandler(addr, func() ([]byte, error) { + return data, nil + }) + return e == nil + } + + e = prm.Handler(common.IterationElement{ + ObjectData: data, + Address: addr, + }) + return e == nil + }) + if err != nil { + return common.IterateRes{}, err + } + + return common.IterateRes{}, e +} diff --git a/pkg/local_object_storage/blobstor/peapod/peapod_test.go b/pkg/local_object_storage/blobstor/peapod/peapod_test.go new file mode 100644 index 00000000000..63d4e585870 --- /dev/null +++ b/pkg/local_object_storage/blobstor/peapod/peapod_test.go @@ -0,0 +1,33 @@ +package peapod_test + +import ( + "path/filepath" + "testing" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/internal/blobstortest" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod" +) + +func TestGeneric(t *testing.T) { + newPath := func() string { + return filepath.Join(t.TempDir(), "peapod.db") + } + + blobstortest.TestAll(t, func(t *testing.T) common.Storage { + return peapod.New(newPath()) + }, 2048, 16*1024) + + t.Run("info", func(t *testing.T) { + path := newPath() + blobstortest.TestInfo(t, func(t *testing.T) common.Storage { + return peapod.New(path) + }, peapod.Type, path) + }) +} + +func TestControl(t *testing.T) { + blobstortest.TestControl(t, func(t *testing.T) common.Storage { + return peapod.New(filepath.Join(t.TempDir(), "peapod.db")) + }, 2048, 2048) +} diff --git a/pkg/local_object_storage/peapod/common_test.go b/pkg/local_object_storage/peapod/common_test.go new file mode 100644 index 00000000000..3665891bc36 --- /dev/null +++ b/pkg/local_object_storage/peapod/common_test.go @@ -0,0 +1,18 @@ +package peapod_test + +import ( + "path/filepath" + "testing" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/peapod" + "github.com/stretchr/testify/require" +) + +func newTestPeapod(tb testing.TB) *peapod.Peapod { + ppd, err := peapod.New(filepath.Join(tb.TempDir(), "peapod.db"), false) + require.NoError(tb, err) + + tb.Cleanup(func() { _ = ppd.Close() }) + + return ppd +} diff --git a/pkg/local_object_storage/peapod/peapod.go b/pkg/local_object_storage/peapod/peapod.go new file mode 100644 index 00000000000..ccb4b4650c2 --- /dev/null +++ b/pkg/local_object_storage/peapod/peapod.go @@ -0,0 +1,174 @@ +package peapod + +import ( + "crypto/sha256" + "fmt" + "sync" + "time" + + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.etcd.io/bbolt" +) + +type batch struct { + initErr error + + tx *bbolt.Tx + + nonIdle bool + + commitErr error + chCommitted chan struct{} + + bktRootMtx sync.Mutex + bktRoot *bbolt.Bucket +} + +// Peapod provides storage for relatively small NeoFS binary object (peas). +// Peapod is a single low-level key/value database optimized to work with big +// number of stored units. +type Peapod struct { + bolt *bbolt.DB + + currentBatchMtx sync.RWMutex + currentBatch *batch + + chClose chan struct{} + chFlushDone chan struct{} +} + +var rootBucket = []byte("root") + +// New returns initialized Peapod instance located at the given path. Resulting Peapod +// must be finally closed. +func New(path string, readOnly bool) (*Peapod, error) { + db, err := bbolt.Open(path, 0600, &bbolt.Options{ + ReadOnly: readOnly, + Timeout: 100 * time.Millisecond, // to handle flock + }) + if err != nil { + return nil, fmt.Errorf("open BoltDB instance: %w", err) + } + + if !readOnly { + err = db.Update(func(tx *bbolt.Tx) error { + _, err = tx.CreateBucketIfNotExists(rootBucket) + return err + }) + if err != nil { + return nil, fmt.Errorf("create root bucket in BoltDB instance: %w", err) + } + } + + res := &Peapod{ + bolt: db, + chClose: make(chan struct{}), + chFlushDone: make(chan struct{}), + } + + res.beginNewBatch() + + go res.flushLoop() + + return res, nil +} + +// Close syncs data and closes the database. +func (x *Peapod) Close() error { + close(x.chClose) + <-x.chFlushDone + return x.bolt.Close() +} + +func (x *Peapod) flushLoop() { + defer close(x.chFlushDone) + + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-x.chClose: + // commit current transaction to prevent bbolt.DB.Close blocking + x.flushCurrentBatch(false) + return + case <-ticker.C: + x.flushCurrentBatch(true) + } + } +} + +func (x *Peapod) flushCurrentBatch(beginNew bool) { + x.currentBatchMtx.Lock() + + if !x.currentBatch.nonIdle { + if !beginNew { + _ = x.currentBatch.tx.Rollback() + } + x.currentBatchMtx.Unlock() + return + } + + err := x.currentBatch.tx.Commit() + if err != nil { + err = fmt.Errorf("commit BoltDB batch transaction: %w", err) + } + + x.currentBatch.commitErr = err + close(x.currentBatch.chCommitted) + + if beginNew { + x.beginNewBatch() + } + + x.currentBatchMtx.Unlock() +} + +func (x *Peapod) beginNewBatch() { + var err error + x.currentBatch = new(batch) + + x.currentBatch.tx, x.currentBatch.initErr = x.bolt.Begin(true) + if x.currentBatch.initErr != nil { + x.currentBatch.initErr = fmt.Errorf("begin new BoltDB writable transaction: %w", x.currentBatch.initErr) + return + } + + x.currentBatch.bktRoot, x.currentBatch.initErr = x.currentBatch.tx.CreateBucketIfNotExists(rootBucket) + if err != nil { + x.currentBatch.initErr = fmt.Errorf("create BoltDB bucket for containers: %w", x.currentBatch.initErr) + return + } + + x.currentBatch.chCommitted = make(chan struct{}) +} + +func keyForObject(addr oid.Address) []byte { + b := make([]byte, 2*sha256.Size) + addr.Container().Encode(b) + addr.Object().Encode(b[sha256.Size:]) + return b +} + +func decodeKeyForObject(addr *oid.Address, key []byte) bool { + if len(key) != 2*sha256.Size { + return false + } + + var cnr cid.ID + var obj oid.ID + + if cnr.Decode(key[:sha256.Size]) != nil { + return false + } + + if obj.Decode(key[sha256.Size:]) != nil { + return false + } + + addr.SetContainer(cnr) + addr.SetObject(obj) + + return true +} diff --git a/pkg/local_object_storage/peapod/read.go b/pkg/local_object_storage/peapod/read.go new file mode 100644 index 00000000000..2b2f31a1feb --- /dev/null +++ b/pkg/local_object_storage/peapod/read.go @@ -0,0 +1,89 @@ +package peapod + +import ( + "errors" + "fmt" + + "github.com/nspcc-dev/neo-go/pkg/util/slice" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.etcd.io/bbolt" +) + +// Get reads data from the underlying database by the given object address. +// Returns apistatus.ErrObjectNotFound if object is missing in the Peapod. +func (x *Peapod) Get(addr oid.Address) ([]byte, error) { + var data []byte + + err := x.bolt.View(func(tx *bbolt.Tx) error { + bktRoot := tx.Bucket(rootBucket) + if bktRoot == nil { + return fmt.Errorf("%w: missing root bucket", apistatus.ErrObjectNotFound) + } + + val := bktRoot.Get(keyForObject(addr)) + if val == nil { + return apistatus.ErrObjectNotFound + } + + data = slice.Copy(val) + + return nil + }) + if err != nil { + return nil, fmt.Errorf("exec read-only BoltDB transaction: %w", err) + } + + return data, nil +} + +// Exists checks presence of the object in the underlying database by the given +// address. +func (x *Peapod) Exists(addr oid.Address) (bool, error) { + var res bool + + err := x.bolt.View(func(tx *bbolt.Tx) error { + bktRoot := tx.Bucket(rootBucket) + if bktRoot == nil { + return fmt.Errorf("%w: missing root bucket", apistatus.ErrObjectNotFound) + } + + res = bktRoot.Get(keyForObject(addr)) != nil + + return nil + }) + if err != nil { + return false, fmt.Errorf("exec read-only BoltDB transaction: %w", err) + } + + return res, nil +} + +var errIterateBreak = errors.New("break iterations") + +// Iterate iterates over all objects stored in the underlying database and +// passes them into f. Break on f's false return. +func (x *Peapod) Iterate(f func(oid.Address, []byte) bool) error { + var addr oid.Address + + err := x.bolt.View(func(tx *bbolt.Tx) error { + bktRoot := tx.Bucket(rootBucket) + if bktRoot == nil { + return fmt.Errorf("%w: missing root bucket", apistatus.ErrObjectNotFound) + } + + return bktRoot.ForEach(func(k, v []byte) error { + if decodeKeyForObject(&addr, k) { + if !f(addr, v) { + return errIterateBreak + } + } + return nil + }) + }) + if err != nil && !errors.Is(err, errIterateBreak) { + return fmt.Errorf("exec read-only BoltDB transaction: %w", err) + } + + return nil +} diff --git a/pkg/local_object_storage/peapod/read_test.go b/pkg/local_object_storage/peapod/read_test.go new file mode 100644 index 00000000000..965faab52ee --- /dev/null +++ b/pkg/local_object_storage/peapod/read_test.go @@ -0,0 +1,74 @@ +package peapod_test + +import ( + "context" + "testing" + + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" + "github.com/stretchr/testify/require" +) + +func TestPeapod_Get(t *testing.T) { + ppd := newTestPeapod(t) + addr := oidtest.Address() + data := []byte("Hello, world!") + + _, err := ppd.Get(addr) + require.ErrorIs(t, err, apistatus.ErrObjectNotFound) + + err = ppd.Put(context.Background(), addr, data) + require.NoError(t, err) + + res, err := ppd.Get(addr) + require.NoError(t, err) + require.Equal(t, data, res) +} + +func TestPeapod_Exists(t *testing.T) { + ppd := newTestPeapod(t) + addr := oidtest.Address() + data := []byte("Hello, world!") + + res, err := ppd.Exists(addr) + require.NoError(t, err) + require.False(t, res) + + err = ppd.Put(context.Background(), addr, data) + require.NoError(t, err) + + res, err = ppd.Exists(addr) + require.NoError(t, err) + require.True(t, res) +} + +func TestPeapod_Iterate(t *testing.T) { + ppd := newTestPeapod(t) + + mSrc := map[oid.Address][]byte{ + oidtest.Address(): {1, 2, 3}, + oidtest.Address(): {4, 5, 6}, + oidtest.Address(): {7, 8, 9}, + } + + mDst := make(map[oid.Address][]byte) + + f := func(addr oid.Address, data []byte) bool { + mDst[addr] = data + return true + } + + err := ppd.Iterate(f) + require.NoError(t, err) + require.Empty(t, mDst) + + for addr, data := range mSrc { + err = ppd.Put(context.Background(), addr, data) + require.NoError(t, err) + } + + err = ppd.Iterate(f) + require.NoError(t, err) + require.Equal(t, mSrc, mDst) +} diff --git a/pkg/local_object_storage/peapod/write.go b/pkg/local_object_storage/peapod/write.go new file mode 100644 index 00000000000..19720e3ccca --- /dev/null +++ b/pkg/local_object_storage/peapod/write.go @@ -0,0 +1,70 @@ +package peapod + +import ( + "context" + "fmt" + + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.etcd.io/bbolt" +) + +// Put saves given data in the underlying database by specified object address. +// The data can be anything, but in practice a binary NeoFS object is expected. +// Operation is executed within provided context: if the context is done, Put +// returns its error (in this case data may be saved). +// +// Put returns bbolt.ErrDatabaseReadOnly if Peadpod is read-only (see New). +func (x *Peapod) Put(ctx context.Context, addr oid.Address, data []byte) error { + return x.batch(ctx, func(bktRoot *bbolt.Bucket) error { + return bktRoot.Put(keyForObject(addr), data) + }) +} + +// Delete removes data associated with the given object address from the +// underlying database. Delete returns apistatus.ErrObjectNotFound if object is +// missing. +// +// Delete returns bbolt.ErrDatabaseReadOnly if Peadpod is read-only (see New). +func (x *Peapod) Delete(ctx context.Context, addr oid.Address) error { + return x.batch(ctx, func(bktRoot *bbolt.Bucket) error { + key := keyForObject(addr) + if bktRoot.Get(key) == nil { + return apistatus.ErrObjectNotFound + } + + return bktRoot.Delete(key) + }) +} + +func (x *Peapod) batch(ctx context.Context, fBktRoot func(bktRoot *bbolt.Bucket) error) error { + x.currentBatchMtx.RLock() + + currentBatch := x.currentBatch + + if currentBatch.initErr != nil { + x.currentBatchMtx.RUnlock() + return currentBatch.initErr + } + + // bbolt.Bucket.Put MUST NOT be called concurrently. This is not obvious from + // the docs, but panic occurs in practice + currentBatch.bktRootMtx.Lock() + err := fBktRoot(currentBatch.bktRoot) + currentBatch.bktRootMtx.Unlock() + if err != nil { + x.currentBatchMtx.RUnlock() + return fmt.Errorf("put object into BoltDB bucket for container: %w", err) + } + + currentBatch.nonIdle = true + + x.currentBatchMtx.RUnlock() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-currentBatch.chCommitted: + return currentBatch.commitErr + } +} diff --git a/pkg/local_object_storage/peapod/write_test.go b/pkg/local_object_storage/peapod/write_test.go new file mode 100644 index 00000000000..61b9b0790b5 --- /dev/null +++ b/pkg/local_object_storage/peapod/write_test.go @@ -0,0 +1,88 @@ +package peapod_test + +import ( + "context" + "crypto/rand" + "fmt" + "sync" + "testing" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/peapod" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" + "github.com/stretchr/testify/require" +) + +func benchmark(b *testing.B, ppd *peapod.Peapod, objSize uint64, nThreads int) { + data := make([]byte, objSize) + rand.Read(data) + + ctx := context.Background() + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + var wg sync.WaitGroup + + for i := 0; i < nThreads; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + addr := oidtest.Address() + + err := ppd.Put(ctx, addr, data) + require.NoError(b, err) + }() + } + + wg.Wait() + } +} + +func BenchmarkPeapod_Put(b *testing.B) { + ppd := newTestPeapod(b) + + for _, tc := range []struct { + objSize uint64 + nThreads int + }{ + {1, 1}, + {1, 20}, + {1, 100}, + {1 << 10, 1}, + {1 << 10, 20}, + {1 << 10, 100}, + {100 << 10, 1}, + {100 << 10, 20}, + {100 << 10, 100}, + } { + b.Run(fmt.Sprintf("size=%d,thread=%d", tc.objSize, tc.nThreads), func(b *testing.B) { + benchmark(b, ppd, tc.objSize, tc.nThreads) + }) + } +} + +func TestPeapod_Delete(t *testing.T) { + ppd := newTestPeapod(t) + ctx := context.Background() + addr := oidtest.Address() + data := []byte("Hello, world!") + + err := ppd.Delete(ctx, addr) + require.ErrorIs(t, err, apistatus.ErrObjectNotFound) + + err = ppd.Put(context.Background(), addr, data) + require.NoError(t, err) + + res, err := ppd.Get(addr) + require.NoError(t, err) + require.Equal(t, data, res) + + err = ppd.Delete(ctx, addr) + require.NoError(t, err) + + res, err = ppd.Get(addr) + require.ErrorIs(t, err, apistatus.ErrObjectNotFound) +}