Skip to content

Commit

Permalink
replicator: Send local objects using new replication API
Browse files Browse the repository at this point in the history
It's more lightweight and supports binary copying without additional
decode-encode round.

Based on the fact that if the object is fixed, the request remains
unchanged. According to this, transport message is encoded once and sent
to all nodes.

Closes #2317. Refs #2316.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Mar 22, 2024
1 parent e5d3dc5 commit fbac66d
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Changelog for NeoFS Node
- Storage nodes no longer accept objects with header larger than 16KB (#2749)
- IR sends NeoFS chain GAS to netmap nodes every epoch, not per a configurable blocks number (#2777)
- Big objects are split with the new split scheme (#2667)
- Background replicator transfers objects using new `ObjectService.Replicate` RPC (#2317)

### Removed
- Object notifications incl. NATS (#2750)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/nspcc-dev/neo-go v0.105.1
github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240228163253-cb87bbd5e4eb
github.com/nspcc-dev/neofs-contract v0.19.1
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240228185329-d1bb0881274a
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240305154614-871e23ecd9bd
github.com/nspcc-dev/tzhash v1.8.0
github.com/olekukonko/tablewriter v0.0.5
github.com/panjf2000/ants/v2 v2.9.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ github.com/nspcc-dev/neofs-contract v0.19.1 h1:U1Uh+MlzfkalO0kRJ2pADZyHrmAOroC6K
github.com/nspcc-dev/neofs-contract v0.19.1/go.mod h1:ZOGouuwuHpgvYkx/LCGufGncIzEUhYEO18LL4cWEbyw=
github.com/nspcc-dev/neofs-crypto v0.4.1 h1:B6S0zXMWrVFlf/GlII6xKRGWU0VE7dHM+QkoKAO7AQA=
github.com/nspcc-dev/neofs-crypto v0.4.1/go.mod h1:0SHn+sSn+lrrIvonLR8MgbOlBhXZKhc4rw/l2htYeA0=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240228185329-d1bb0881274a h1:YbLj8AtTVGvQ5Mi482dmftKIimqTsI5OXxjIPJefzEo=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240228185329-d1bb0881274a/go.mod h1:0WwnMTpMvbeKkU57+aLRtpOB7vu0eIpz7bu342ng8Gk=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240305154614-871e23ecd9bd h1:ncLz0cc1A2qVDq/TURvQghr9PI7tMVve9LUSpi13COs=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240305154614-871e23ecd9bd/go.mod h1:0WwnMTpMvbeKkU57+aLRtpOB7vu0eIpz7bu342ng8Gk=
github.com/nspcc-dev/rfc6979 v0.2.1 h1:8wWxkamHWFmO790GsewSoKUSJjVnL1fmdRpokU/RgRM=
github.com/nspcc-dev/rfc6979 v0.2.1/go.mod h1:Tk7h5kyUWkhjyO3zUgFFhy1v2vQv3BvQEntakdtqrWc=
github.com/nspcc-dev/tzhash v1.8.0 h1:pJvzME2mZzP/h5rcy/Wb6amT9FJBFeKbJ3HEnWEeUpY=
Expand Down
3 changes: 3 additions & 0 deletions pkg/core/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package client

import (
"context"
"io"

rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/container"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
reputationSDK "github.com/nspcc-dev/neofs-sdk-go/reputation"
Expand All @@ -19,6 +21,7 @@ import (
type Client interface {
ContainerAnnounceUsedSpace(ctx context.Context, announcements []container.SizeEstimation, prm client.PrmAnnounceSpace) error
ObjectPutInit(ctx context.Context, header object.Object, signer user.Signer, prm client.PrmObjectPutInit) (client.ObjectWriter, error)
ReplicateObject(ctx context.Context, id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer) error
ObjectDelete(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectDelete) (oid.ID, error)
ObjectGetInit(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectGet) (object.Object, *client.PayloadReader, error)
ObjectHead(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHead) (*object.Object, error)
Expand Down
20 changes: 20 additions & 0 deletions pkg/network/cache/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"

Expand All @@ -13,6 +14,7 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/container"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
reputationSDK "github.com/nspcc-dev/neofs-sdk-go/reputation"
Expand Down Expand Up @@ -231,6 +233,24 @@ func (x *multiClient) ObjectPutInit(ctx context.Context, header objectSDK.Object
return
}

func (x *multiClient) ReplicateObject(ctx context.Context, id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer) error {
var errSeek error
err := x.iterateClients(ctx, func(c clientcore.Client) error {
err := c.ReplicateObject(ctx, id, src, signer)
if err != nil {
_, errSeek = src.Seek(0, io.SeekStart)
if errSeek != nil {
return nil // to break the iterator

Check warning on line 243 in pkg/network/cache/multi.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/cache/multi.go#L236-L243

Added lines #L236 - L243 were not covered by tests
}
}
return err

Check warning on line 246 in pkg/network/cache/multi.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/cache/multi.go#L246

Added line #L246 was not covered by tests
})
if err != nil {
return err

Check warning on line 249 in pkg/network/cache/multi.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/cache/multi.go#L248-L249

Added lines #L248 - L249 were not covered by tests
}
return errSeek

Check warning on line 251 in pkg/network/cache/multi.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/cache/multi.go#L251

Added line #L251 was not covered by tests
}

func (x *multiClient) ContainerAnnounceUsedSpace(ctx context.Context, announcements []container.SizeEstimation, prm client.PrmAnnounceSpace) error {
return x.iterateClients(ctx, func(c clientcore.Client) error {
return c.ContainerAnnounceUsedSpace(ctx, announcements, prm)
Expand Down
31 changes: 31 additions & 0 deletions pkg/services/object/put/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package putsvc
import (
"context"
"fmt"
"io"

clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
Expand Down Expand Up @@ -134,3 +136,32 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {

return nil
}

// ReplicateObjectToNode copies binary-encoded NeoFS object from the given
// [io.ReadSeeker] into local storage of the node described by specified
// [netmap.NodeInfo].
func (s *RemoteSender) ReplicateObjectToNode(ctx context.Context, id oid.ID, src io.ReadSeeker, nodeInfo netmap.NodeInfo) error {
var nodeInfoForCons clientcore.NodeInfo

Check warning on line 144 in pkg/services/object/put/remote.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/remote.go#L143-L144

Added lines #L143 - L144 were not covered by tests

err := clientcore.NodeInfoFromRawNetmapElement(&nodeInfoForCons, netmapCore.Node(nodeInfo))
if err != nil {
return fmt.Errorf("parse remote node info: %w", err)

Check warning on line 148 in pkg/services/object/put/remote.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/remote.go#L146-L148

Added lines #L146 - L148 were not covered by tests
}

key, err := s.keyStorage.GetKey(nil)
if err != nil {
return fmt.Errorf("fetch local node's private key: %w", err)

Check warning on line 153 in pkg/services/object/put/remote.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/remote.go#L151-L153

Added lines #L151 - L153 were not covered by tests
}

c, err := s.clientConstructor.Get(nodeInfoForCons)
if err != nil {
return fmt.Errorf("init NeoFS API client of the remote node: %w", err)

Check warning on line 158 in pkg/services/object/put/remote.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/remote.go#L156-L158

Added lines #L156 - L158 were not covered by tests
}

err = c.ReplicateObject(ctx, id, src, (*neofsecdsa.Signer)(key))
if err != nil {
return fmt.Errorf("copy object using NeoFS API client of the remote node: %w", err)

Check warning on line 163 in pkg/services/object/put/remote.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/remote.go#L161-L163

Added lines #L161 - L163 were not covered by tests
}

return nil

Check warning on line 166 in pkg/services/object/put/remote.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/remote.go#L166

Added line #L166 was not covered by tests
}
30 changes: 22 additions & 8 deletions pkg/services/replicator/process.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package replicator

import (
"bytes"
"context"
"io"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"go.uber.org/zap"
)
Expand All @@ -25,21 +27,27 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
)
}()

if task.obj == nil {
var err error
task.obj, err = engine.Get(p.localStorage, task.addr)
var err error
var prm *putsvc.RemotePutPrm
var stream io.ReadSeeker
binReplication := task.obj == nil
if binReplication {
b, err := p.localStorage.GetBytes(task.addr)

Check warning on line 35 in pkg/services/replicator/process.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/replicator/process.go#L30-L35

Added lines #L30 - L35 were not covered by tests
if err != nil {
p.log.Error("could not get object from local storage",
zap.Stringer("object", task.addr),
zap.Error(err))

return
}
stream = io.ReadSeeker(bytes.NewReader(b))
if len(task.nodes) > 1 {
stream = client.DemuxReplicatedObject(stream)

Check warning on line 45 in pkg/services/replicator/process.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/replicator/process.go#L43-L45

Added lines #L43 - L45 were not covered by tests
}
} else {
prm = new(putsvc.RemotePutPrm).WithObject(task.obj)

Check warning on line 48 in pkg/services/replicator/process.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/replicator/process.go#L47-L48

Added lines #L47 - L48 were not covered by tests
}

prm := new(putsvc.RemotePutPrm).
WithObject(task.obj)

for i := 0; task.quantity > 0 && i < len(task.nodes); i++ {
select {
case <-ctx.Done():
Expand All @@ -54,7 +62,13 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)

callCtx, cancel := context.WithTimeout(ctx, p.putTimeout)

err := p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i]))
if binReplication {
err = p.remoteSender.ReplicateObjectToNode(callCtx, task.addr.Object(), stream, task.nodes[i])

Check warning on line 66 in pkg/services/replicator/process.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/replicator/process.go#L65-L66

Added lines #L65 - L66 were not covered by tests
// note that we don't need to reset stream because it is used exactly once
// according to the client.DemuxReplicatedObject above
} else {
err = p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i]))

Check warning on line 70 in pkg/services/replicator/process.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/replicator/process.go#L69-L70

Added lines #L69 - L70 were not covered by tests
}

cancel()

Expand Down

0 comments on commit fbac66d

Please sign in to comment.