Skip to content

Commit

Permalink
Merge 20a00ef into f38069e
Browse files Browse the repository at this point in the history
  • Loading branch information
carpawell authored Aug 9, 2023
2 parents f38069e + 20a00ef commit 0f0db4b
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Changelog for NeoFS Node
- CLI `--timeout` flag configures whole execution timeout from now (#2124)
- CLI default timeout for commands with `--await` flag increased to 1m (#2124)
- BlobStor tries to store object in any sub-storage with free space (#2450)
- SN does not store container estimations in-mem forever (#2472)

### Updated
- `neofs-sdk-go` to `v1.0.0-rc.10`
Expand Down
21 changes: 15 additions & 6 deletions cmd/neofs-node/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (
"github.com/nspcc-dev/hrw"
containerV2 "github.com/nspcc-dev/neofs-api-go/v2/container"
containerGRPC "github.com/nspcc-dev/neofs-api-go/v2/container/grpc"
containercontract "github.com/nspcc-dev/neofs-contract/container"
"github.com/nspcc-dev/neofs-node/pkg/core/client"
containerCore "github.com/nspcc-dev/neofs-node/pkg/core/container"
netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container"
netmapEv "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
containerTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/container/grpc"
containerService "github.com/nspcc-dev/neofs-node/pkg/services/container"
loadcontroller "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/controller"
Expand Down Expand Up @@ -133,23 +135,30 @@ func initContainerService(c *cfg) {
cnrWrt.eacls = cachedEACLStorage
}

estimationsLogger := c.log.With(zap.String("component", "container_estimations"))

Check warning on line 138 in cmd/neofs-node/container.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/container.go#L138

Added line #L138 was not covered by tests

localMetrics := &localStorageLoad{
log: c.log,
log: estimationsLogger,

Check warning on line 141 in cmd/neofs-node/container.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/container.go#L141

Added line #L141 was not covered by tests
engine: c.cfgObject.cfgLocalStorage.localStorage,
}

pubKey := c.key.PublicKey().Bytes()

resultWriter := &morphLoadWriter{
log: c.log,
log: estimationsLogger,

Check warning on line 148 in cmd/neofs-node/container.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/container.go#L148

Added line #L148 was not covered by tests
cnrMorphClient: wrapperNoNotary,
key: pubKey,
}

loadAccumulator := loadstorage.New(loadstorage.Prm{})
loadAccumulator := loadstorage.New(containercontract.CleanupDelta)

Check warning on line 153 in cmd/neofs-node/container.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/container.go#L153

Added line #L153 was not covered by tests

addNewEpochAsyncNotificationHandler(c, func(e event.Event) {
ev := e.(netmapEv.NewEpoch)
loadAccumulator.EpochEvent(ev.EpochNumber())
})

Check warning on line 158 in cmd/neofs-node/container.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/container.go#L155-L158

Added lines #L155 - L158 were not covered by tests

loadPlacementBuilder := &loadPlacementBuilder{
log: c.log,
log: estimationsLogger,

Check warning on line 161 in cmd/neofs-node/container.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/container.go#L161

Added line #L161 was not covered by tests
nmSrc: c.netMapSource,
cnrSrc: cnrSrc,
}
Expand All @@ -169,7 +178,7 @@ func initContainerService(c *cfg) {
},
Builder: routeBuilder,
},
loadroute.WithLogger(c.log),
loadroute.WithLogger(estimationsLogger),

Check warning on line 181 in cmd/neofs-node/container.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/container.go#L181

Added line #L181 was not covered by tests
)

ctrl := loadcontroller.New(
Expand All @@ -179,7 +188,7 @@ func initContainerService(c *cfg) {
LocalAnnouncementTarget: loadRouter,
ResultReceiver: loadcontroller.SimpleWriterProvider(resultWriter),
},
loadcontroller.WithLogger(c.log),
loadcontroller.WithLogger(estimationsLogger),

Check warning on line 191 in cmd/neofs-node/container.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/container.go#L191

Added line #L191 was not covered by tests
)

setContainerNotificationParser(c, startEstimationNotifyEvent, containerEvent.ParseStartEstimation)
Expand Down
5 changes: 4 additions & 1 deletion pkg/innerring/blocktimer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,11 @@ func newEpochTimer(args *epochTimerArgs) *timer.BlockTimer {
return
}

estimationEpoch := epochN - 1
args.l.Info("stop estimation collections", zap.Uint64("epoch", estimationEpoch))

Check warning on line 99 in pkg/innerring/blocktimer.go

View check run for this annotation

Codecov / codecov/patch

pkg/innerring/blocktimer.go#L98-L99

Added lines #L98 - L99 were not covered by tests

prm := container.StopEstimationPrm{}
prm.SetEpoch(epochN - 1)
prm.SetEpoch(estimationEpoch)

Check warning on line 102 in pkg/innerring/blocktimer.go

View check run for this annotation

Codecov / codecov/patch

pkg/innerring/blocktimer.go#L102

Added line #L102 was not covered by tests

err := args.cnrWrapper.StopEstimation(prm)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion pkg/innerring/processors/netmap/process_epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,16 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) {
return
}

estimationEpoch := epoch - 1

Check warning on line 48 in pkg/innerring/processors/netmap/process_epoch.go

View check run for this annotation

Codecov / codecov/patch

pkg/innerring/processors/netmap/process_epoch.go#L48

Added line #L48 was not covered by tests

prm := cntClient.StartEstimationPrm{}

prm.SetEpoch(epoch - 1)
prm.SetEpoch(estimationEpoch)

Check warning on line 52 in pkg/innerring/processors/netmap/process_epoch.go

View check run for this annotation

Codecov / codecov/patch

pkg/innerring/processors/netmap/process_epoch.go#L52

Added line #L52 was not covered by tests
prm.SetHash(ev.TxHash())

if epoch > 0 { // estimates are invalid in genesis epoch
np.log.Info("start estimation collection", zap.Uint64("epoch", estimationEpoch))

Check warning on line 56 in pkg/innerring/processors/netmap/process_epoch.go

View check run for this annotation

Codecov / codecov/patch

pkg/innerring/processors/netmap/process_epoch.go#L56

Added line #L56 was not covered by tests

err = np.containerWrp.StartEstimation(prm)

if err != nil {
Expand Down
10 changes: 7 additions & 3 deletions pkg/services/container/announcement/load/controller/calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (c *Controller) Start(prm StartPrm) {
}

func (c *announceContext) announce() {
c.log.Debug("starting to announce the values of the metrics")
c.log.Debug("starting to announce local metrics")

var (
metricsIterator Iterator
Expand Down Expand Up @@ -85,6 +85,8 @@ func (c *announceContext) announce() {
return true // local metrics don't know about epochs
},
func(a container.SizeEstimation) error {
c.log.Debug("sending local metrics", zap.String("cid", a.Container().EncodeToString()))

a.SetEpoch(c.epoch) // set epoch explicitly
return targetWriter.Put(a)
},
Expand All @@ -107,7 +109,7 @@ func (c *announceContext) announce() {
return
}

c.log.Debug("load announcement successfully finished")
c.log.Debug("local load announcement successfully finished")
}

func (c *Controller) acquireAnnouncement(prm StartPrm) *announceContext {
Expand All @@ -126,10 +128,11 @@ func (c *Controller) acquireAnnouncement(prm StartPrm) *announceContext {

log := c.opts.log.With(
zap.Uint64("epoch", prm.Epoch),
zap.String("stage", "p2p"),
)

if ctx == nil {
log.Debug("announcement is already started")
log.Debug("local announcement is already started")

Check warning on line 135 in pkg/services/container/announcement/load/controller/calls.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/container/announcement/load/controller/calls.go#L135

Added line #L135 was not covered by tests
return nil
}

Expand Down Expand Up @@ -218,6 +221,7 @@ func (c *Controller) acquireReport(prm StopPrm) *stopContext {

log := c.opts.log.With(
zap.Uint64("epoch", prm.Epoch),
zap.String("stage", "report"),
)

if ctx == nil {
Expand Down
31 changes: 23 additions & 8 deletions pkg/services/container/announcement/load/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,22 @@ type storageKey struct {
type Storage struct {
mtx sync.RWMutex

mItems map[storageKey]*usedSpaceEstimations
estLifeCycle uint64
mItems map[storageKey]*usedSpaceEstimations
}

// Prm groups the required parameters of the Storage's constructor.
//
// The component is not parameterizable at the moment.
type Prm struct{}

// New creates a new instance of the Storage.
//
// The created Storage does not require additional
// initialization and is completely ready for work.
func New(_ Prm) *Storage {
//
// estimationsLifeCycle is a longevity (in epochs) of estimations
// that are kept in the [Storage] instance. Note, current epoch
// is controlled with [Storage.EpochEvent].
func New(estimationsLifeCycle uint64) *Storage {
return &Storage{
mItems: make(map[storageKey]*usedSpaceEstimations),
mItems: make(map[storageKey]*usedSpaceEstimations),
estLifeCycle: estimationsLifeCycle,
}
}

Expand Down Expand Up @@ -118,6 +119,20 @@ func (s *Storage) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.Use
return
}

// EpochEvent notifies [Storage] about epoch counter updating.
// Used to remove unused estimations. See [Prm.EstimationsLifeCycle].
// Blocking operation.
func (s *Storage) EpochEvent(e uint64) {
s.mtx.Lock()
defer s.mtx.Unlock()

for k := range s.mItems {
if k.epoch+s.estLifeCycle < e {
delete(s.mItems, k)
}
}
}

func finalEstimation(vals []uint64) uint64 {
sort.Slice(vals, func(i, j int) bool {
return vals[i] < vals[j]
Expand Down
45 changes: 44 additions & 1 deletion pkg/services/container/announcement/load/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestStorage(t *testing.T) {

const opinionsNum = 100

s := New(Prm{})
s := New(0)

opinions := make([]uint64, opinionsNum)
for i := range opinions {
Expand Down Expand Up @@ -48,3 +48,46 @@ func TestStorage(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, iterCounter)
}

func TestStorage_NewEpoch(t *testing.T) {
const epoch uint64 = 13
const lifeCycle = 5

s := New(lifeCycle)

var a container.SizeEstimation
a.SetContainer(cidtest.ID())
a.SetEpoch(epoch)

err := s.Put(a)
require.NoError(t, err)

for i := uint64(1); i <= lifeCycle+1; i++ {
ee := getEstimations(t, s)
require.NoError(t, err)
require.Len(t, ee, 1)

s.EpochEvent(epoch + i)
}

ee := getEstimations(t, s)
require.NoError(t, err)
require.Empty(t, ee)
}

func getEstimations(t *testing.T, s *Storage) []container.SizeEstimation {
var ee []container.SizeEstimation

err := s.Iterate(
func(e container.SizeEstimation) bool {
return true
},
func(e container.SizeEstimation) error {
ee = append(ee, e)
return nil
},
)
require.NoError(t, err)

return ee
}

0 comments on commit 0f0db4b

Please sign in to comment.