Skip to content

Commit

Permalink
[#1974] shard: Do not panic in degraded mode
Browse files Browse the repository at this point in the history
Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
  • Loading branch information
fyrchik committed Oct 26, 2022
1 parent ec9b58c commit 72c66f2
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 4 deletions.
8 changes: 6 additions & 2 deletions pkg/local_object_storage/engine/container.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package engine

import (
"errors"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"go.uber.org/zap"
Expand Down Expand Up @@ -75,7 +77,7 @@ func (e *StorageEngine) containerSize(prm ContainerSizePrm) (res ContainerSizeRe
csPrm.SetContainerID(prm.cnr)

csRes, err := sh.Shard.ContainerSize(csPrm)
if err != nil {
if err != nil && !errors.Is(err, shard.ErrDegradedMode) {
e.reportShardError(sh, "can't get container size", err,
zap.Stringer("container_id", prm.cnr),
)
Expand Down Expand Up @@ -124,7 +126,9 @@ func (e *StorageEngine) listContainers() (ListContainersRes, error) {
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
res, err := sh.Shard.ListContainers(shard.ListContainersPrm{})
if err != nil {
e.reportShardError(sh, "can't get list of containers", err)
if !errors.Is(err, shard.ErrDegradedMode) {
e.reportShardError(sh, "can't get list of containers", err)
}
return false
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/local_object_storage/engine/evacuate.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ mainLoop:
// because ListWithCursor works only with the metabase.
listRes, err := sh.ListWithCursor(listPrm)
if err != nil {
if errors.Is(err, meta.ErrEndOfListing) {
if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
continue mainLoop
}
return res, err
Expand Down
6 changes: 5 additions & 1 deletion pkg/local_object_storage/engine/select.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package engine

import (
"errors"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
Expand Down Expand Up @@ -109,7 +111,9 @@ func (e *StorageEngine) list(limit uint64) (SelectRes, error) {
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
res, err := sh.List() // consider limit result of shard iterator
if err != nil {
e.reportShardError(sh, "could not select objects from shard", err)
if !errors.Is(err, shard.ErrDegradedMode) {
e.reportShardError(sh, "could not select objects from shard", err)
}
} else {
for _, addr := range res.AddressList() { // save only unique values
if _, ok := uniqueMap[addr.EncodeToString()]; !ok {
Expand Down
4 changes: 4 additions & 0 deletions pkg/local_object_storage/metabase/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ func (db *DB) Containers() (list []cid.ID, err error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return nil, ErrDegradedMode
}

err = db.boltDB.View(func(tx *bbolt.Tx) error {
list, err = db.containers(tx)

Expand Down
4 changes: 4 additions & 0 deletions pkg/local_object_storage/metabase/inhume.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return InhumeRes{}, ErrDegradedMode
}

currEpoch := db.epochState.CurrentEpoch()
var inhumed uint64

Expand Down
7 changes: 7 additions & 0 deletions pkg/local_object_storage/shard/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ func (r ContainerSizeRes) Size() uint64 {
}

func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) {
s.m.RLock()
defer s.m.RUnlock()

if s.info.Mode.NoMetabase() {
return ContainerSizeRes{}, ErrDegradedMode
}

size, err := s.metaBase.ContainerSize(prm.cnr)
if err != nil {
return ContainerSizeRes{}, fmt.Errorf("could not get container size: %w", err)
Expand Down
21 changes: 21 additions & 0 deletions pkg/local_object_storage/shard/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,11 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
for {
log.Debug("iterating tombstones")

if s.GetMode().NoMetabase() {
s.log.Debug("shard is in a degraded mode, skip collecting expired tombstones")
return
}

err := s.metaBase.IterateOverGraveyard(iterPrm)
if err != nil {
log.Error("iterator over graveyard failed", zap.Error(err))
Expand Down Expand Up @@ -327,6 +332,10 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
}

func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, typeCond func(object.Type) bool) ([]oid.Address, error) {
if s.GetMode().NoMetabase() {
return nil, ErrDegradedMode
}

var expired []oid.Address

err := s.metaBase.IterateExpired(epoch, func(expiredObject *meta.ExpiredObject) error {
Expand All @@ -351,6 +360,10 @@ func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, typeCond fu
//
// Does not modify tss.
func (s *Shard) HandleExpiredTombstones(tss []meta.TombstonedObject) {
if s.GetMode().NoMetabase() {
return
}

// Mark tombstones as garbage.
var pInhume meta.InhumePrm

Expand Down Expand Up @@ -385,6 +398,10 @@ func (s *Shard) HandleExpiredTombstones(tss []meta.TombstonedObject) {
// HandleExpiredLocks unlocks all objects which were locked by lockers.
// If successful, marks lockers themselves as garbage.
func (s *Shard) HandleExpiredLocks(lockers []oid.Address) {
if s.GetMode().NoMetabase() {
return
}

err := s.metaBase.FreeLockedBy(lockers)
if err != nil {
s.log.Warn("failure to unlock objects",
Expand Down Expand Up @@ -412,6 +429,10 @@ func (s *Shard) HandleExpiredLocks(lockers []oid.Address) {

// HandleDeletedLocks unlocks all objects which were locked by lockers.
func (s *Shard) HandleDeletedLocks(lockers []oid.Address) {
if s.GetMode().NoMetabase() {
return
}

err := s.metaBase.FreeLockedBy(lockers)
if err != nil {
s.log.Warn("failure to unlock objects",
Expand Down
8 changes: 8 additions & 0 deletions pkg/local_object_storage/shard/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func (r ListWithCursorRes) Cursor() *Cursor {

// List returns all objects physically stored in the Shard.
func (s *Shard) List() (res SelectRes, err error) {
if s.GetMode().NoMetabase() {
return SelectRes{}, ErrDegradedMode
}

lst, err := s.metaBase.Containers()
if err != nil {
return res, fmt.Errorf("can't list stored containers: %w", err)
Expand Down Expand Up @@ -93,6 +97,10 @@ func (s *Shard) List() (res SelectRes, err error) {
}

func (s *Shard) ListContainers(_ ListContainersPrm) (ListContainersRes, error) {
if s.GetMode().NoMetabase() {
return ListContainersRes{}, ErrDegradedMode
}

containers, err := s.metaBase.Containers()
if err != nil {
return ListContainersRes{}, fmt.Errorf("could not get list of containers: %w", err)
Expand Down

0 comments on commit 72c66f2

Please sign in to comment.