Skip to content

Commit

Permalink
[#234] services/object: Support netmap epoch and lookup dead in read ops
Browse files Browse the repository at this point in the history
Support processing of NetmapEpoch and NetmapLookupDepth X-headers when
processing object read operations. Placement for operations
Get/Head/GetRange/GetRangeHash/Search is built for the epoch specified in
NetmapEpoch X-header (by default latest). Also the specified operations are
processed until success is achieved for network maps from the past up to
NetmapLookupDepth value. Behavior for default values (zero or missing) left
unchanged.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
  • Loading branch information
Leonard Lyubich committed Jan 12, 2021
1 parent 6e793ba commit d143b0f
Show file tree
Hide file tree
Showing 22 changed files with 615 additions and 92 deletions.
2 changes: 1 addition & 1 deletion cmd/neofs-cli/modules/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func deleteObject(cmd *cobra.Command, _ []string) error {
return err
}

tombstoneAddr, err := client.DeleteObject(cli, ctx,
tombstoneAddr, err := client.DeleteObject(ctx, cli,
new(client.DeleteObjectParams).WithAddress(objAddr),
append(globalCallOptions(),
client.WithSession(tok),
Expand Down
2 changes: 1 addition & 1 deletion cmd/neofs-cli/modules/storagegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func delSG(cmd *cobra.Command, _ []string) error {
addr.SetContainerID(cid)
addr.SetObjectID(id)

tombstone, err := client.DeleteObject(cli, ctx,
tombstone, err := client.DeleteObject(ctx, cli,
new(client.DeleteObjectParams).
WithAddress(addr),
client.WithSession(tok))
Expand Down
2 changes: 2 additions & 0 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ func initObjectService(c *cfg) {
placement.WithoutSuccessTracking(),
),
),
searchsvc.WithNetMapSource(c.cfgNetmap.wrapper),
)

sSearchV2 := searchsvcV2.NewService(
Expand All @@ -261,6 +262,7 @@ func initObjectService(c *cfg) {
placement.SuccessAfter(1),
),
),
getsvc.WithNetMapSource(c.cfgNetmap.wrapper),
)

sGetV2 := getsvcV2.NewService(
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/multiformats/go-multihash v0.0.13 // indirect
github.com/nspcc-dev/hrw v1.0.9
github.com/nspcc-dev/neo-go v0.92.0
github.com/nspcc-dev/neofs-api-go v1.22.0
github.com/nspcc-dev/neofs-api-go v1.22.1-0.20210112152207-43c579f6704d
github.com/nspcc-dev/neofs-crypto v0.3.0
github.com/nspcc-dev/tzhash v1.4.0
github.com/panjf2000/ants/v2 v2.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ github.com/nspcc-dev/neo-go v0.73.1-pre.0.20200303142215-f5a1b928ce09/go.mod h1:
github.com/nspcc-dev/neo-go v0.91.0/go.mod h1:G6HdOWvzQ6tlvFdvFSN/PgCzLPN/X/X4d5hTjFRUDcc=
github.com/nspcc-dev/neo-go v0.92.0 h1:iKHpKLzpwE6RSXnQb0BoYWi+H1P/hNyQbMpPG0mY57Q=
github.com/nspcc-dev/neo-go v0.92.0/go.mod h1:L7PyTzjK1j/PCAxvbKiVFkCMZDvsv82JbXlPxaH1t0Q=
github.com/nspcc-dev/neofs-api-go v1.22.0 h1:tMFlxDTzoNnSMMXHUHTCJ9DGPO2FUkOQxb/iSuk1kiU=
github.com/nspcc-dev/neofs-api-go v1.22.0/go.mod h1:G7dqincfdjBrAbL5nxVp82emF05fSVEqe59ICsoRDI8=
github.com/nspcc-dev/neofs-api-go v1.22.1-0.20210112152207-43c579f6704d h1:xAuc5NORZZLsvHybK91j/drMR2Gf9yzXRT6itPopoGA=
github.com/nspcc-dev/neofs-api-go v1.22.1-0.20210112152207-43c579f6704d/go.mod h1:G7dqincfdjBrAbL5nxVp82emF05fSVEqe59ICsoRDI8=
github.com/nspcc-dev/neofs-crypto v0.2.0/go.mod h1:F/96fUzPM3wR+UGsPi3faVNmFlA9KAEAUQR7dMxZmNA=
github.com/nspcc-dev/neofs-crypto v0.2.3/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw=
github.com/nspcc-dev/neofs-crypto v0.3.0 h1:zlr3pgoxuzrmGCxc5W8dGVfA9Rro8diFvVnBg0L4ifM=
Expand Down
7 changes: 6 additions & 1 deletion pkg/services/object/delete/v2/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@ func (s *Service) toPrm(req *objectV2.DeleteRequest, respBody *objectV2.DeleteRe
return nil, err
}

commonPrm, err := util.CommonPrmFromV2(req)
if err != nil {
return nil, err
}

p := new(deletesvc.Prm)
p.SetCommonParameters(util.CommonPrmFromV2(req).
p.SetCommonParameters(commonPrm.
WithPrivateKey(key),
)

Expand Down
45 changes: 39 additions & 6 deletions pkg/services/object/get/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,56 @@ func (exec *execCtx) executeOnContainer() {
return
}

exec.log.Debug("trying to execute in container...")
lookupDepth := exec.netmapLookupDepth()

traverser, ok := exec.generateTraverser(exec.address())
exec.log.Debug("trying to execute in container...",
zap.Uint64("netmap lookup depth", lookupDepth),
)

// initialize epoch number
ok := exec.initEpoch()
if !ok {
return
}

for {
if exec.processCurrentEpoch() {
break
}

// check the maximum depth has been reached
if lookupDepth == 0 {
break
}

lookupDepth--

// go to the previous epoch
exec.curProcEpoch--
}
}

func (exec *execCtx) processCurrentEpoch() bool {
exec.log.Debug("process epoch",
zap.Uint64("number", exec.curProcEpoch),
)

traverser, ok := exec.generateTraverser(exec.address())
if !ok {
return true
}

ctx, cancel := context.WithCancel(exec.context())
defer cancel()

exec.status = statusUndefined

loop:
for {
addrs := traverser.Next()
if len(addrs) == 0 {
exec.log.Debug("no more nodes, abort placement iteration")
break

return false
}

for i := range addrs {
Expand All @@ -38,7 +70,8 @@ loop:
exec.log.Debug("interrupt placement iteration by context",
zap.String("error", ctx.Err().Error()),
)
break loop

return true
default:
}

Expand All @@ -47,7 +80,7 @@ loop:
// we reach the best result - split info with linking object ID.
if exec.processNode(ctx, addrs[i]) {
exec.log.Debug("completing the operation")
break loop
return true
}
}
}
Expand Down
41 changes: 39 additions & 2 deletions pkg/services/object/get/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"go.uber.org/zap"
Expand Down Expand Up @@ -38,6 +39,8 @@ type execCtx struct {
curOff uint64

head bool

curProcEpoch uint64
}

type execOption func(*execCtx)
Expand Down Expand Up @@ -106,7 +109,9 @@ func (exec execCtx) key() *ecdsa.PrivateKey {
}

func (exec execCtx) callOptions() []client.CallOption {
return exec.prm.common.RemoteCallOptions()
return exec.prm.common.RemoteCallOptions(
util.WithNetmapEpoch(exec.curProcEpoch),
)
}

func (exec execCtx) remotePrm() *client.GetObjectParams {
Expand Down Expand Up @@ -135,8 +140,40 @@ func (exec *execCtx) headOnly() bool {
return exec.head
}

func (exec *execCtx) netmapEpoch() uint64 {
return exec.prm.common.NetmapEpoch()
}

func (exec *execCtx) netmapLookupDepth() uint64 {
return exec.prm.common.NetmapLookupDepth()
}

func (exec *execCtx) initEpoch() bool {
exec.curProcEpoch = exec.netmapEpoch()
if exec.curProcEpoch > 0 {
return true
}

e, err := exec.svc.currentEpochReceiver.currentEpoch()

switch {
default:
exec.status = statusUndefined
exec.err = err

exec.log.Debug("could not get current epoch number",
zap.String("error", err.Error()),
)

return false
case err == nil:
exec.curProcEpoch = e
return true
}
}

func (exec *execCtx) generateTraverser(addr *objectSDK.Address) (*placement.Traverser, bool) {
t, err := exec.svc.traverserGenerator.GenerateTraverser(addr)
t, err := exec.svc.traverserGenerator.GenerateTraverser(addr, exec.curProcEpoch)

switch {
default:
Expand Down
Loading

0 comments on commit d143b0f

Please sign in to comment.