Skip to content

Commit

Permalink
replicator: Send local objects using new replication API
Browse files Browse the repository at this point in the history
It's more lightweight and supports binary copying without additional
decode-encode round.

Based on the fact that if the object is fixed, the request remains
unchanged. According to this, transport message is encoded once and sent
to all nodes.

Closes #2317. Refs #2316.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Mar 5, 2024
1 parent efe5eb1 commit 0a8d61a
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 9 deletions.
3 changes: 3 additions & 0 deletions pkg/core/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package client

import (
"context"
"io"

rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/container"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
reputationSDK "github.com/nspcc-dev/neofs-sdk-go/reputation"
Expand All @@ -19,6 +21,7 @@ import (
type Client interface {
ContainerAnnounceUsedSpace(ctx context.Context, announcements []container.SizeEstimation, prm client.PrmAnnounceSpace) error
ObjectPutInit(ctx context.Context, header object.Object, signer user.Signer, prm client.PrmObjectPutInit) (client.ObjectWriter, error)
ReplicateObject(ctx context.Context, src io.ReadSeeker, signer neofscrypto.Signer) error
ObjectDelete(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectDelete) (oid.ID, error)
ObjectGetInit(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectGet) (object.Object, *client.PayloadReader, error)
ObjectHead(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHead) (*object.Object, error)
Expand Down
20 changes: 20 additions & 0 deletions pkg/network/cache/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"

Expand All @@ -13,6 +14,7 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/container"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
reputationSDK "github.com/nspcc-dev/neofs-sdk-go/reputation"
Expand Down Expand Up @@ -231,6 +233,24 @@ func (x *multiClient) ObjectPutInit(ctx context.Context, header objectSDK.Object
return
}

func (x *multiClient) ReplicateObject(ctx context.Context, src io.ReadSeeker, signer neofscrypto.Signer) error {
var errSeek error
err := x.iterateClients(ctx, func(c clientcore.Client) error {
err := c.ReplicateObject(ctx, src, signer)
if err != nil {
_, errSeek = src.Seek(0, io.SeekStart)
if errSeek != nil {
return nil // to break the iterator

Check warning on line 243 in pkg/network/cache/multi.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/cache/multi.go#L236-L243

Added lines #L236 - L243 were not covered by tests
}
}
return err

Check warning on line 246 in pkg/network/cache/multi.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/cache/multi.go#L246

Added line #L246 was not covered by tests
})
if err != nil {
return err

Check warning on line 249 in pkg/network/cache/multi.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/cache/multi.go#L248-L249

Added lines #L248 - L249 were not covered by tests
}
return errSeek

Check warning on line 251 in pkg/network/cache/multi.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/cache/multi.go#L251

Added line #L251 was not covered by tests
}

func (x *multiClient) ContainerAnnounceUsedSpace(ctx context.Context, announcements []container.SizeEstimation, prm client.PrmAnnounceSpace) error {
return x.iterateClients(ctx, func(c clientcore.Client) error {
return c.ContainerAnnounceUsedSpace(ctx, announcements, prm)
Expand Down
31 changes: 31 additions & 0 deletions pkg/services/object/put/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package putsvc
import (
"context"
"fmt"
"io"

clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
Expand Down Expand Up @@ -134,3 +136,32 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {

return nil
}

// ReplicateObjectToNode copies binary-encoded NeoFS object from the given
// [io.ReadSeeker] into local storage of the node described by specified
// [netmap.NodeInfo].
func (s *RemoteSender) ReplicateObjectToNode(ctx context.Context, src io.ReadSeeker, nodeInfo netmap.NodeInfo) error {
var nodeInfoForCons clientcore.NodeInfo

Check warning on line 144 in pkg/services/object/put/remote.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/remote.go#L143-L144

Added lines #L143 - L144 were not covered by tests

err := clientcore.NodeInfoFromRawNetmapElement(&nodeInfoForCons, netmapCore.Node(nodeInfo))
if err != nil {
return fmt.Errorf("parse remote node info: %w", err)

Check warning on line 148 in pkg/services/object/put/remote.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/remote.go#L146-L148

Added lines #L146 - L148 were not covered by tests
}

key, err := s.keyStorage.GetKey(nil)
if err != nil {
return fmt.Errorf("fetch local node's private key: %w", err)

Check warning on line 153 in pkg/services/object/put/remote.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/remote.go#L151-L153

Added lines #L151 - L153 were not covered by tests
}

c, err := s.clientConstructor.Get(nodeInfoForCons)
if err != nil {
return fmt.Errorf("init NeoFS API client of the remote node: %w", err)

Check warning on line 158 in pkg/services/object/put/remote.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/remote.go#L156-L158

Added lines #L156 - L158 were not covered by tests
}

err = c.ReplicateObject(ctx, src, (*neofsecdsa.Signer)(key))
if err != nil {
return fmt.Errorf("copy object using NeoFS API client of the remote node: %w", err)

Check warning on line 163 in pkg/services/object/put/remote.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/remote.go#L161-L163

Added lines #L161 - L163 were not covered by tests
}

return nil

Check warning on line 166 in pkg/services/object/put/remote.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/remote.go#L166

Added line #L166 was not covered by tests
}
32 changes: 23 additions & 9 deletions pkg/services/replicator/process.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package replicator

import (
"bytes"
"context"
"io"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"go.uber.org/zap"
)
Expand All @@ -25,21 +27,27 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
)
}()

if task.obj == nil {
var err error
task.obj, err = engine.Get(p.localStorage, task.addr)
var err error
var prm *putsvc.RemotePutPrm
var stream io.ReadSeeker
binReplication := task.obj == nil
if binReplication {
b, err := p.localStorage.GetBytes(task.addr)

Check warning on line 35 in pkg/services/replicator/process.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/replicator/process.go#L30-L35

Added lines #L30 - L35 were not covered by tests
if err != nil {
p.log.Error("could not get object from local storage",
zap.Stringer("object", task.addr),
zap.Error(err))

return
}
stream = io.ReadSeeker(bytes.NewReader(b))
if len(task.nodes) > 1 {
stream = client.DemuxReplicatedObject(stream)

Check warning on line 45 in pkg/services/replicator/process.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/replicator/process.go#L43-L45

Added lines #L43 - L45 were not covered by tests
}
} else {
prm = new(putsvc.RemotePutPrm).WithObject(task.obj)

Check warning on line 48 in pkg/services/replicator/process.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/replicator/process.go#L47-L48

Added lines #L47 - L48 were not covered by tests
}

prm := new(putsvc.RemotePutPrm).
WithObject(task.obj)

for i := 0; task.quantity > 0 && i < len(task.nodes); i++ {
select {
case <-ctx.Done():
Expand All @@ -54,7 +62,13 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)

callCtx, cancel := context.WithTimeout(ctx, p.putTimeout)

err := p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i]))
if binReplication {
err = p.remoteSender.ReplicateObjectToNode(callCtx, stream, task.nodes[i])

Check warning on line 66 in pkg/services/replicator/process.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/replicator/process.go#L65-L66

Added lines #L65 - L66 were not covered by tests
// note that we don't need to reset stream because it is used exactly once
// according to the client.DemuxReplicatedObject above
} else {
err = p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i]))

Check warning on line 70 in pkg/services/replicator/process.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/replicator/process.go#L69-L70

Added lines #L69 - L70 were not covered by tests
}

cancel()

Expand All @@ -63,7 +77,7 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
zap.String("error", err.Error()),
)
} else {
log.Debug("object successfully replicated")
log.Info("object successfully replicated")

Check warning on line 80 in pkg/services/replicator/process.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/replicator/process.go#L80

Added line #L80 was not covered by tests

task.quantity--

Expand Down

0 comments on commit 0a8d61a

Please sign in to comment.