Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use new API for background replication #2674

Merged
merged 2 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Changelog for NeoFS Node
- CLI now allows to create and print eACL with numeric filters (#2742)
- gRPC connection limits per endpoint (#1240)
- `neofs-lens object link` command for the new link object inspection (#2799)
- Storage nodes serve new `ObjectService.Replicate` RPC (#2674)

### Fixed
- Access to `PUT` objects no longer grants `DELETE` rights (#2261)
Expand All @@ -24,6 +25,7 @@ Changelog for NeoFS Node
- Storage nodes no longer accept objects with header larger than 16KB (#2749)
- IR sends NeoFS chain GAS to netmap nodes every epoch, not per a configurable blocks number (#2777)
- Big objects are split with the new split scheme (#2667)
- Background replicator transfers objects using new `ObjectService.Replicate` RPC (#2317)

### Removed
- Object notifications incl. NATS (#2750)
Expand Down
41 changes: 41 additions & 0 deletions cmd/neofs-node/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,63 @@
"crypto/tls"
"errors"
"fmt"
"math"
"net"
"time"

grpcconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/grpc"
"github.com/nspcc-dev/neofs-sdk-go/object"
"go.uber.org/zap"
"golang.org/x/net/netutil"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

func initGRPC(c *cfg) {
if c.cfgMorph.client == nil {
initMorphComponents(c)

Check warning on line 21 in cmd/neofs-node/grpc.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/grpc.go#L20-L21

Added lines #L20 - L21 were not covered by tests
}

// limit max size of single messages received by the gRPC servers up to max
// object size setting of the NeoFS network: this is needed to serve
// ObjectService.Replicate RPC transmitting the entire stored object in one
// message
roman-khimov marked this conversation as resolved.
Show resolved Hide resolved
maxObjSize, err := c.nCli.MaxObjectSize()
fatalOnErrDetails("read max object size network setting to determine gRPC recv message limit", err)

Check warning on line 29 in cmd/neofs-node/grpc.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/grpc.go#L28-L29

Added lines #L28 - L29 were not covered by tests

maxRecvSize := maxObjSize

Check warning on line 31 in cmd/neofs-node/grpc.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/grpc.go#L31

Added line #L31 was not covered by tests
// don't forget about meta fields
if maxRecvSize < uint64(math.MaxUint64-object.MaxHeaderLen) { // just in case, always true in practice
maxRecvSize += object.MaxHeaderLen
} else {
maxRecvSize = math.MaxUint64

Check warning on line 36 in cmd/neofs-node/grpc.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/grpc.go#L33-L36

Added lines #L33 - L36 were not covered by tests
}

var maxRecvMsgSizeOpt grpc.ServerOption
if maxRecvSize > maxMsgSize { // do not decrease default value
if maxRecvSize > math.MaxInt {

Check warning on line 41 in cmd/neofs-node/grpc.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/grpc.go#L39-L41

Added lines #L39 - L41 were not covered by tests
// ^2GB for 32-bit systems which is currently enough in practice. If at some
// point this is not enough, we'll need to expand the option
fatalOnErr(fmt.Errorf("cannot serve NeoFS API over gRPC: object of max size is bigger than gRPC server is able to support %d>%d",
maxRecvSize, math.MaxInt))

Check warning on line 45 in cmd/neofs-node/grpc.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/grpc.go#L44-L45

Added lines #L44 - L45 were not covered by tests
}
maxRecvMsgSizeOpt = grpc.MaxRecvMsgSize(int(maxRecvSize))
c.log.Debug("limit max recv gRPC message size to fit max stored objects",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be info, imo

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can ofc, do or not?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd leave it to Debug for now.

zap.Uint64("max object size", maxObjSize), zap.Uint64("max recv msg", maxRecvSize))

Check warning on line 49 in cmd/neofs-node/grpc.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/grpc.go#L47-L49

Added lines #L47 - L49 were not covered by tests
}

var successCount int
grpcconfig.IterateEndpoints(c.cfgReader, func(sc *grpcconfig.Config) {
serverOpts := []grpc.ServerOption{
grpc.MaxSendMsgSize(maxMsgSize),
}
if maxRecvMsgSizeOpt != nil {

Check warning on line 57 in cmd/neofs-node/grpc.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/grpc.go#L57

Added line #L57 was not covered by tests
// TODO(@cthulhu-rider): the setting can be server-global only now, support
// per-RPC limits
// TODO(@cthulhu-rider): max object size setting may change in general,
// but server configuration is static now
serverOpts = append(serverOpts, maxRecvMsgSizeOpt)

Check warning on line 62 in cmd/neofs-node/grpc.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/grpc.go#L62

Added line #L62 was not covered by tests
}

tlsCfg := sc.TLS()

Expand Down
49 changes: 48 additions & 1 deletion cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,10 @@
firstSvc = objectService.NewMetricCollector(signSvc, c.metricsCollector)
}

server := objectTransportGRPC.New(firstSvc)
objNode, err := newNodeForObjects(c.cfgObject.cnrSource, c.netMapSource, sPut, c.IsLocalKey)
fatalOnErr(err)

Check warning on line 363 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L362-L363

Added lines #L362 - L363 were not covered by tests

server := objectTransportGRPC.New(firstSvc, objNode)

Check warning on line 365 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L365

Added line #L365 was not covered by tests

for _, srv := range c.cfgGRPC.servers {
objectGRPC.RegisterObjectServiceServer(srv, server)
Expand Down Expand Up @@ -601,3 +604,47 @@

return hw.h, nil
}

// nodeForObjects represents NeoFS storage node for object storage.
type nodeForObjects struct {
putObjectService *putsvc.Service
containerNodes *containerNodes
isLocalPubKey func([]byte) bool
}

func newNodeForObjects(containers containercore.Source, network netmap.Source, putObjectService *putsvc.Service, isLocalPubKey func([]byte) bool) (*nodeForObjects, error) {
cnrNodes, err := newContainerNodes(containers, network)
if err != nil {
return nil, err

Check warning on line 618 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L615-L618

Added lines #L615 - L618 were not covered by tests
}
return &nodeForObjects{
putObjectService: putObjectService,
containerNodes: cnrNodes,
isLocalPubKey: isLocalPubKey,
}, nil

Check warning on line 624 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L620-L624

Added lines #L620 - L624 were not covered by tests
}

// ForEachContainerNodePublicKeyInLastTwoEpochs passes binary-encoded public key
// of each node match the referenced container's storage policy at two latest
// epochs into f. When f returns false, nil is returned instantly.
//
// Implements [object.Node] interface.
func (x *nodeForObjects) ForEachContainerNodePublicKeyInLastTwoEpochs(id cid.ID, f func(pubKey []byte) bool) error {
return x.containerNodes.forEachContainerNodePublicKeyInLastTwoEpochs(id, f)

Check warning on line 633 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L632-L633

Added lines #L632 - L633 were not covered by tests
}

// IsOwnPublicKey checks whether given binary-encoded public key is assigned to
// local storage node in the network map.
//
// Implements [object.Node] interface.
func (x *nodeForObjects) IsOwnPublicKey(pubKey []byte) bool {
return x.isLocalPubKey(pubKey)

Check warning on line 641 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L640-L641

Added lines #L640 - L641 were not covered by tests
}

// VerifyAndStoreObject checks given object's format and, if it is correct,
// saves the object in the node's local object storage.
//
// Implements [object.Node] interface.
func (x *nodeForObjects) VerifyAndStoreObject(obj objectSDK.Object) error {
return x.putObjectService.ValidateAndStoreObjectLocally(obj)

Check warning on line 649 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L648-L649

Added lines #L648 - L649 were not covered by tests
}
83 changes: 83 additions & 0 deletions cmd/neofs-node/policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package main

import (
"fmt"

"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap"
)

// containerNodes wraps NeoFS network state to apply container storage policies.
type containerNodes struct {
roman-khimov marked this conversation as resolved.
Show resolved Hide resolved
containers container.Source
network netmap.Source
}

func newContainerNodes(containers container.Source, network netmap.Source) (*containerNodes, error) {
return &containerNodes{
containers: containers,
network: network,
}, nil
}

// forEachNodePubKeyInSets passes binary-encoded public key of each node into f.
// When f returns false, forEachNodePubKeyInSets returns false instantly.
// Otherwise, true is returned.
func forEachNodePubKeyInSets(nodeSets [][]netmapsdk.NodeInfo, f func(pubKey []byte) bool) bool {
for i := range nodeSets {
for j := range nodeSets[i] {
if !f(nodeSets[i][j].PublicKey()) {
return false

Check warning on line 32 in cmd/neofs-node/policy.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/policy.go#L32

Added line #L32 was not covered by tests
}
}
}
return true
}

// forEachContainerNodePublicKeyInLastTwoEpochs passes binary-encoded public key
// of each node match the referenced container's storage policy at two latest
// epochs into f. When f returns false, nil is returned instantly.
func (x *containerNodes) forEachContainerNodePublicKeyInLastTwoEpochs(cnrID cid.ID, f func(pubKey []byte) bool) error {
epoch, err := x.network.Epoch()
if err != nil {
return fmt.Errorf("read current NeoFS epoch: %w", err)
}

cnr, err := x.containers.Get(cnrID)
if err != nil {
return fmt.Errorf("read container by ID: %w", err)
}

networkMap, err := x.network.GetNetMapByEpoch(epoch)
if err != nil {
return fmt.Errorf("read network map at epoch #%d: %w", epoch, err)
}
// TODO(#2692): node sets remain unchanged for fixed container and network map,
// so recently calculated results worth caching
ns, err := networkMap.ContainerNodes(cnr.Value.PlacementPolicy(), cnrID)
if err != nil {
return fmt.Errorf("apply container's storage policy to the network map at epoch #%d: %w", epoch, err)

Check warning on line 61 in cmd/neofs-node/policy.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/policy.go#L61

Added line #L61 was not covered by tests
}

if !forEachNodePubKeyInSets(ns, f) || epoch == 0 {
return nil
}

epoch--

networkMap, err = x.network.GetNetMapByEpoch(epoch)
if err != nil {
return fmt.Errorf("read network map at epoch #%d: %w", epoch, err)
}

ns, err = networkMap.ContainerNodes(cnr.Value.PlacementPolicy(), cnrID)
if err != nil {
return fmt.Errorf("apply container's storage policy to the network map at epoch #%d: %w", epoch, err)

Check warning on line 77 in cmd/neofs-node/policy.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/policy.go#L77

Added line #L77 was not covered by tests
}

forEachNodePubKeyInSets(ns, f)

return nil
}
Loading
Loading